当前位置: > > > Flume日志采集工具使用详解8(样例2:Channel选择器、多Channel)

Flume日志采集工具使用详解8(样例2:Channel选择器、多Channel)

    一个 Source 支持给多个 Channel 发送数据,如果 source 后面接了多个 channel,到底是给所有的 channel 都发,还是根据规则发送到不同 channel,这些是由 Channel Selectors 来控制的。
    Channel Selectors 类型包括:Replicating Channel SelectorMultiplexing Channel Selector
  • Replicating Channel Selector 是默认的 channel 选择器,它会将 Source 采集过来的 Event 发往所有 Channel
  • Multiplexing Channel Selector 表示会根据 Eventheader 里面的值将 Event 发往不同的 Channel

八、Channel 选择器的使用1:Replicating Channel Selector

1,整体架构

(1)在这个案例中我们使用 Replicating 选择器,将 source 采集到的数据重复发送给两个 channel,最后每个 channel 后面接一个 sink,负责把数据存储到不同存储介质中,方便后期使用。

(2)在实际工作中这种需求还是比较常见的,就是我们希望把一份数据采集过来以后,分别存储到不同的存储介质中,不同存储介质的特点和应用场景是不一样的,典型的就是 hdfssinkkafkasink
  • 通过 hdfssink 实现离线数据落盘存储,方便后面进行离线数据计算。
  • 通过 kafkasink 实现实时数据存储,方便后面进行实时计算。

2,配置 Agent

(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf
vi example.conf

(2)接着在配置文件中添加如下内容。
# 定义 Agent 内部 3 大组件的名称
a1.sources = r1
a1.sinks = k1 k2
a1.channels =c1 c2
 
# 配置 Source 组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
 
# 配置 Channel 组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
 
#配置sink组件
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://node1:9000/replicating
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k2.hdfs.filePrefix = data
a1.sinks.k2.hdfs.fileSuffix = .log
 
#把组件连接起来
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

3,测试 Agent

(1)首先我们执行如下命令启动此 Agent
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &

(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666

(3)连接上后发送两条数据:

(4)查看 Flume 的日志文件,可以看到里面有相关数据的日志信息:

(5)查看 hdfs 中生成的文件中的内容也是正常,说明多 Channel 配置成功生效了。

九、Channel 选择器的使用2:Multiplexing Channel Selector

1,整体架构

    在这个案例中我们使用 Multiplexing 选择器,将 source 采集到的数据按照一定规则发送给两个 channel,最终再把不同 channel 中的数据存储到不同介质中。

2,配置 Agent

(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf
vi example.conf

(2)接着在配置文件中添加如下内容。
提示:在这里面我们需要用到正则抽取拦截器在 Eventheader 中生成 key-value 作为 Multiplexing 选择器的规则
# 定义 Agent 内部 3 大组件的名称
a1.sources = r1
a1.sinks = k1 k2
a1.channels =c1 c2
 
# 配置 Source 组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666

# 配置source拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = "city":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = city

# 配置channle选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = city
a1.sources.r1.selector.mapping.bj = c1
a1.sources.r1.selector.default = c2
 
# 配置 Channel 组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
 
#配置sink组件
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://node1:9000/replicating
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k2.hdfs.filePrefix = data
a1.sinks.k2.hdfs.fileSuffix = .log
 
#把组件连接起来
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

3,测试 Agent

(1)首先我们执行如下命令启动此 Agent
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &

(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666

(3)连接上后发送两条数据:
{"name":"jack","age":19,"city":"bj"}
{"name":"tom","age":26,"city":"sh"}

(4)查看 Flume 的日志文件,可以看到里面只有 citybj 的数据信息:

(5)查看 hdfs 中生成的文件,里面只有 city 不为 bj 的数据,说明多 Channel 选择器配置成功生效了。
评论0