当前位置: > > > Flume日志采集工具使用详解4(样例2:日志汇总采集、输出至HDFS)

Flume日志采集工具使用详解4(样例2:日志汇总采集、输出至HDFS)

四、日志汇总采集并输出至 HDFS

1,整体架构

(1)具体需求如下:
  • AB 两台机器实时产生的日志数据汇总到机器 C 中。
  • 通过机器 C 将数据汇总输出到 HDFS 的指定目录下。
(2)整个架构中包含 3Agent
  • Agent1 负责采集机器 A 实时产生的日志数据。
  • Agent2 负责采集机器 B 实时产生的日志数据。
  • Agent3 负责将 Agent1Agent2 汇总过来的数据统一输出到 HDFS 中。

(3)由于 Agent1Agent2 采集到的数据需要汇总到 Agent3 中,所以为了快速发送我们可以通过网络直接传输。对于 Agent1Agent2Sink 组件都使用 Avro Sink,而 Agent3Source 则使用 Avro Source
提示Avro 是一种数据序列化系统,经过它序列化的数据传输起来效率很高,并且与它对应的有一个 Avro SourceAvro Sink 发送出去的数据可以直接被 Avro Source 接收,它们可以无缝衔接。

2,准备模拟数据

(1)在 Agent1Agent2 所在的机器上创建一个生成模拟数据的脚本 generate_log.sh,执行此脚本即可在指定文件中不断地生成新数据,此脚本内容如下:
#!/bin/bash
# 在文件中循环生成数据
while [ "1" = "1" ]
do
    # 获取当前时间戳
    curr_time=`date +%s`
    # 获取当前主机名
    name=`hostname`
    echo ${name}_${curr_time} >> /usr/local/access.log
    # 暂停 1s
    sleep 1
done

(2)比如 Agent1 所在机器的主机名为 node1,那么该机器上将会生成如下内容的数据文件:

3,创建 Agent 的配置文件

(1)Agent1Agent2 的配置文件都是 file-to-avro.conf,具体内容如下:
注意Agent1Agent2 中配置的 a1.sinks.k1.port 参数的值需要和 Agent3 中配置的 a1.sources.r1.port 参数的值一样,这样 Agent3 才可以接收到 Agent1Agent2 发送过来的数据。
# Agent 的名称是 a1
# 指定 Source 组件、Channel 组件和 Sink 组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置 Source 组件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/access.log

# 配置 Channel 组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置 Sink 组件
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.60.9
a1.sinks.k1.port = 45454

# 把这些组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)Agent3 的配置文件是 avro-to-hdfs.conf,具体内容如下:
# Agent 的名称是 a1
# 指定 Source 组件、Channel 组件和 Sink 组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置 Source 组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 45454

# 配置 Channel 组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置 Sink 组件
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/access/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = access
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 将 Source 组件、Sink 组件和 Channel 组件绑定到一起
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4,启动 Agent

(1)我们首先启动 Agent3
bin/flume-ng agent --name a1 --conf conf --conf-file conf/avro-to-hdfs.conf

(2)接着登录另外两台机器,执行如下命令分别启动 Agent1Agent2
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-avro.conf

5,验证结果

(1)最后到 HDFS 上验证结果,首先执行如下命令查看“/access/当前日期”路径下的内容,可以看到生成的文件:
hdfs dfs -ls /access/20231129

(2)查看文件内容,如果能查到两个 agent 发送的数据,则说明整个流程是通的。
hdfs dfs -cat /access/20231129/access.1701259979025.tmp

附:解决 HDFS 数据写入报错问题

1,问题描述

上面样例 Agent3 启动后,当要将日志输出到 HDFS 时可能会报如下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:244)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:456)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:109)
at org.apache.flume.node.Application.main(Application.java:491)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more

2,解决办法

    这是由于 flume 缺乏一些 hadoop 相关的 jar 包,我们从 Hadoop 程序包中将如下 5jar 包复制一份到 flumelib 目录下即可。
cp $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.1.jar $FLUME_HOME/lib/
cp $HADOOP_HOME/share/hadoop/common/lib/commons-configuration-1.6.jar $FLUME_HOME/lib/
cp $HADOOP_HOME/share/hadoop/common/lib/hadoop-auth-2.7.1.jar $FLUME_HOME/lib/
cp $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.7.1.jar $FLUME_HOME/lib/
cp $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/htrace-core-3.1.0-incubating.jar $FLUME_HOME/lib/
评论0