Flume日志采集工具使用详解8(样例2:Channel选择器、多Channel)
一个 Source 支持给多个 Channel 发送数据,如果 source 后面接了多个 channel,到底是给所有的 channel 都发,还是根据规则发送到不同 channel,这些是由 Channel Selectors 来控制的。
(2)接着在配置文件中添加如下内容。
(3)连接上后发送两条数据:
(2)接着在配置文件中添加如下内容。
(3)连接上后发送两条数据:
Channel Selectors 类型包括:Replicating Channel Selector 和 Multiplexing Channel Selector
- Replicating Channel Selector 是默认的 channel 选择器,它会将 Source 采集过来的 Event 发往所有 Channel
- Multiplexing Channel Selector 表示会根据 Event 中 header 里面的值将 Event 发往不同的 Channel
八、Channel 选择器的使用1:Replicating Channel Selector
1,整体架构
(1)在这个案例中我们使用 Replicating 选择器,将 source 采集到的数据重复发送给两个 channel,最后每个 channel 后面接一个 sink,负责把数据存储到不同存储介质中,方便后期使用。
(2)在实际工作中这种需求还是比较常见的,就是我们希望把一份数据采集过来以后,分别存储到不同的存储介质中,不同存储介质的特点和应用场景是不一样的,典型的就是 hdfssink 和 kafkasink:
- 通过 hdfssink 实现离线数据落盘存储,方便后面进行离线数据计算。
- 通过 kafkasink 实现实时数据存储,方便后面进行实时计算。
2,配置 Agent
(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf vi example.conf
(2)接着在配置文件中添加如下内容。
# 定义 Agent 内部 3 大组件的名称 a1.sources = r1 a1.sinks = k1 k2 a1.channels =c1 c2 # 配置 Source 组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 # 配置 Channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 #配置sink组件 a1.sinks.k1.type = logger a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://node1:9000/replicating a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k2.hdfs.filePrefix = data a1.sinks.k2.hdfs.fileSuffix = .log #把组件连接起来 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
3,测试 Agent
(1)首先我们执行如下命令启动此 Agent:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666
(3)连接上后发送两条数据:
(4)查看 Flume 的日志文件,可以看到里面有相关数据的日志信息:
(5)查看 hdfs 中生成的文件中的内容也是正常,说明多 Channel 配置成功生效了。
九、Channel 选择器的使用2:Multiplexing Channel Selector
1,整体架构
在这个案例中我们使用 Multiplexing 选择器,将 source 采集到的数据按照一定规则发送给两个 channel,最终再把不同 channel 中的数据存储到不同介质中。
2,配置 Agent
(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf vi example.conf
(2)接着在配置文件中添加如下内容。
提示:在这里面我们需要用到正则抽取拦截器在 Event 的 header 中生成 key-value 作为 Multiplexing 选择器的规则
# 定义 Agent 内部 3 大组件的名称 a1.sources = r1 a1.sinks = k1 k2 a1.channels =c1 c2 # 配置 Source 组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 # 配置source拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_extractor a1.sources.r1.interceptors.i1.regex = "city":"(\\w+)" a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.name = city # 配置channle选择器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = city a1.sources.r1.selector.mapping.bj = c1 a1.sources.r1.selector.default = c2 # 配置 Channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 #配置sink组件 a1.sinks.k1.type = logger a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://node1:9000/replicating a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k2.hdfs.filePrefix = data a1.sinks.k2.hdfs.fileSuffix = .log #把组件连接起来 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
3,测试 Agent
(1)首先我们执行如下命令启动此 Agent:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666
(3)连接上后发送两条数据:
{"name":"jack","age":19,"city":"bj"} {"name":"tom","age":26,"city":"sh"}
(4)查看 Flume 的日志文件,可以看到里面只有 city 为 bj 的数据信息:
(5)查看 hdfs 中生成的文件,里面只有 city 不为 bj 的数据,说明多 Channel 选择器配置成功生效了。