Flume日志采集工具使用详解4(样例2:日志汇总采集、输出至HDFS)
四、日志汇总采集并输出至 HDFS
1,整体架构
(1)具体需求如下:
- 将 A 和 B 两台机器实时产生的日志数据汇总到机器 C 中。
- 通过机器 C 将数据汇总输出到 HDFS 的指定目录下。
(2)整个架构中包含 3 个 Agent:
- Agent1 负责采集机器 A 实时产生的日志数据。
- Agent2 负责采集机器 B 实时产生的日志数据。
- Agent3 负责将 Agent1 和 Agent2 汇总过来的数据统一输出到 HDFS 中。
(3)由于 Agent1 和 Agent2 采集到的数据需要汇总到 Agent3 中,所以为了快速发送我们可以通过网络直接传输。对于 Agent1 和 Agent2 的 Sink 组件都使用 Avro Sink,而 Agent3 的 Source 则使用 Avro Source。
提示:Avro 是一种数据序列化系统,经过它序列化的数据传输起来效率很高,并且与它对应的有一个 Avro Source,Avro Sink 发送出去的数据可以直接被 Avro Source 接收,它们可以无缝衔接。
2,准备模拟数据
(1)在 Agent1 和 Agent2 所在的机器上创建一个生成模拟数据的脚本 generate_log.sh,执行此脚本即可在指定文件中不断地生成新数据,此脚本内容如下:
#!/bin/bash # 在文件中循环生成数据 while [ "1" = "1" ] do # 获取当前时间戳 curr_time=`date +%s` # 获取当前主机名 name=`hostname` echo ${name}_${curr_time} >> /usr/local/access.log # 暂停 1s sleep 1 done
(2)比如 Agent1 所在机器的主机名为 node1,那么该机器上将会生成如下内容的数据文件:
3,创建 Agent 的配置文件
(1)Agent1 和 Agent2 的配置文件都是 file-to-avro.conf,具体内容如下:
注意:Agent1 和 Agent2 中配置的 a1.sinks.k1.port 参数的值需要和 Agent3 中配置的 a1.sources.r1.port 参数的值一样,这样 Agent3 才可以接收到 Agent1 和 Agent2 发送过来的数据。
# Agent 的名称是 a1 # 指定 Source 组件、Channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置 Source 组件 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/access.log # 配置 Channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 Sink 组件 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.60.9 a1.sinks.k1.port = 45454 # 把这些组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(2)Agent3 的配置文件是 avro-to-hdfs.conf,具体内容如下:
# Agent 的名称是 a1 # 指定 Source 组件、Channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置 Source 组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 45454 # 配置 Channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 Sink 组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/access/%Y%m%d a1.sinks.k1.hdfs.filePrefix = access a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 将 Source 组件、Sink 组件和 Channel 组件绑定到一起 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4,启动 Agent
(1)我们首先启动 Agent3:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/avro-to-hdfs.conf
(2)接着登录另外两台机器,执行如下命令分别启动 Agent1 和 Agent2:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-avro.conf
5,验证结果
(1)最后到 HDFS 上验证结果,首先执行如下命令查看“/access/当前日期”路径下的内容,可以看到生成的文件:hdfs dfs -ls /access/20231129
(2)查看文件内容,如果能查到两个 agent 发送的数据,则说明整个流程是通的。
hdfs dfs -cat /access/20231129/access.1701259979025.tmp
附:解决 HDFS 数据写入报错问题
1,问题描述
上面样例 Agent3 启动后,当要将日志输出到 HDFS 时可能会报如下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:244)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:456)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:109)
at org.apache.flume.node.Application.main(Application.java:491)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2,解决办法
这是由于 flume 缺乏一些 hadoop 相关的 jar 包,我们从 Hadoop 程序包中将如下 5 个 jar 包复制一份到 flume 的 lib 目录下即可。
cp $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.1.jar $FLUME_HOME/lib/ cp $HADOOP_HOME/share/hadoop/common/lib/commons-configuration-1.6.jar $FLUME_HOME/lib/ cp $HADOOP_HOME/share/hadoop/common/lib/hadoop-auth-2.7.1.jar $FLUME_HOME/lib/ cp $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.7.1.jar $FLUME_HOME/lib/ cp $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/htrace-core-3.1.0-incubating.jar $FLUME_HOME/lib/