Flume日志采集工具使用详解3(样例2:监控目录下新增文件内容、输出至HDFS)
三、采集指定目录下所有文件内容上传至 HDFS
1,需求说明
使用 Flume 实现目录监控,并将目录下所有文件以及新增的文件上传至 HDFS。
2,配置 Agent
(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf vi example.conf
(2)接着在配置文件中添加如下内容,这里我们使用 Spooling Directory Source 可以实现目录监控:
注意:
- SpoolingDirSource 在读取一整个文件到 channel 之后,它会采取策略,要么删除文件(是否可以删除取决于配置),要么对文件进程一个完成状态的重命名,这样可以保证 source 持续监控新的文件!
- 特别注意的是:必须已经封闭的文件才能放入到 SpoolingDirSource,在同一个 SpoolingDirSource 中都不能出现重名的文件!
# 定义 Agent 内部 3 大组件的名称 a1.sources = r1 a1.sinks = k1 a1.channels =c1 #配置source组件源 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /data/log/studentDir #配置channel组件 a1.channels.c1.type = memory #配置sink组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node1:9000/studentDir a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀 a1.sinks.k1.hdfs.filePrefix = stu #把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3,测试 Agent
(1)在启动 agent 之前,先初始化一下测试数据,创建 /data/log/studentDir 目录,然后在里面添加一个文件 class1.dat:
(2)class1.dat 中存储的是学生信息,学生姓名、年龄、性别,内容如下:
(3)然后执行如下命令启动 Agent:
mkdir -p /data/log/studentDir cd /data/log/studentDir vi class1.dat
(2)class1.dat 中存储的是学生信息,学生姓名、年龄、性别,内容如下:
hangge 88 male xixi 12 female
(3)然后执行如下命令启动 Agent:
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(4)到 hdfs上验证结果,可以发现文件已经生成了,并且文件里面的内容其实就是 class1.dat 文件中的内容:
注意:
- 默认情况下现在的文件是 .tmp 结尾的,表示它在被使用,因为 Flume 只要采集到数据就会向里面写,这个后缀默认是由 hdfs.inUseSuffix 参数来控制的。
- 文件名上还拼接了一个当前时间戳,这个是默认文件名的格式,当达到文件切割时机的时候会给文件改名字,去掉 .tmp
hdfs dfs -ls /studentDir hdfs dfs -cat /studentDir/stu.1708160924097.tmp
(5)我们在查看下监控的目录,我们发现此时这个文件已经被加了一个后缀 .COMPLETED,表示这个文件已经被读取过了,所以 Flume 在读取的时候会忽略后缀为 .COMPLETED 的文件。这样就避免了 Flume 重复读取同一个文件的数据。
ll /data/log/studentDir
(6)我们再次在监控目录下创建一个新的文件 class2.dat:
cd /data/log/studentDir vi class2.dat
(7)文件内容如下:
jojo 33 male
(8)再次到 hdfs 上验证结果,可以看到 hdfs 上文件内也新增了 class2.dat 里面的内容:
hdfs dfs -cat /studentDir/stu.1708160924097.tmp