Flume日志采集工具使用详解9(Sink处理器:负载均衡、故障转移)
Sink 处理器类型包括如下三种:
(2)接着配置 bigdata02 上的 Agent:
(3)接着配置 bigdata03 上的 Agent:
(3)连接上后发送两条数据:
(4)到 hdfs 上验证数据,可以看到由于轮询这两条数据分别存放到不同的 path 上:
(2)接着配置 bigdata02 上的 Agent:
(3)接着配置 bigdata03 上的 Agent:
(3)连接上后发送两条数据:
(4)然后到 hdfs 上验证数据,发现两条数据是通过 bigdata02 这台机器写出去的,因为对应 bigdata02 这台机器的 sink 组件的优先级比较高:
(5)接下来我们将 bigdata02 的 agent 停止模拟这台机器上的的 Agent 挂掉,也就意味着 k1 这个 sink 写不出去数据了,此时,我们再通过 socket 发送一条数据:
(6)此时再到 hdfs 上查看一下,会发现新采集的数据会通过 bigdata03 上的 Agent 写出去:
(7)此时如果把 bigdata02 上的 Agent 再启动的话,会发现新采集的数据会通过 bigdata02 上的 Agent 写出去,这是因为它的优先级比较高。这就是 Sink 故障转移的应用。
- Default Sink Processor 是默认的,不用配置 sinkgroup,就是我们之前一直使用的最普通的形式,一个 channel 后面接一个 sink 的形式
- Load balancing Sink Processor 是负载均衡处理器,一个 channel 后面可以接多个 sink,这多个 sink 属于一个 sink group,根据指定的算法进行轮询或者随机发送,减轻单个 sink 的压力
- Failover Sink Processor 是故障转移处理器,一个 channel 后面可以接多个 sink,这多个 sink 属于一个 sink group,按照 sink 的优先级,默认先让优先级高的 sink 来处理数据,如果这个 sink 出现了故障,则用优先级低一点的 sink 处理数据,可以保证数据不丢失。
我在之前的文章中都是使用默认的 Default Sink Processor,本文接着演示另外两个 Sink Processor 的使用。
九、Sink 处理器1:Load balancing Sink Processor(负载均衡)
1,整体架构
(1)该案例中一个 channel 后面接了两个 sink,这两个 sink 属于一个 sink group,通过轮询的方式处理数据,从而达到负载均衡的效果。
(2)这个负载均衡案例可以解决之前单节点输出能力有限的问题,可以通过多个 sink 后面连接多个 Agent 实现负载均衡,如果后面的 Agent 挂掉 1 个,也不会影响整体流程,只是处理效率又恢复到了之前的状态。
2,配置 Agent
(1)首先配置 bigdata04 节点上的这个 Agent:参数说明:
- processor.type:针对负载均衡的 sink 处理器,这里需要指定 load_balance
- processor.selector:此参数的值内置支持两个,round_robin 和 random,round_robin 表示轮询,按照 sink 的顺序,轮流处理数据,random 表示随机。
- processor.backoff:默认为 false,设置为 true 后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长;一直到达到最大的时间。如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率。
- processor.selector.maxTimeOut:最大的黑名单时间,默认是 30 秒
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # 配置source组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件,[为了方便演示效果,把batch-size设置为1] a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.60.101 a1.sinks.k1.port=41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.60.102 a1.sinks.k2.port=41414 a1.sinks.k2.batch-size = 1 # 配置sink策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
(2)接着配置 bigdata02 上的 Agent:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node1:9000/load_balance a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data101 a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(3)接着配置 bigdata03 上的 Agent:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node1:9000/load_balance a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data102 a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3,测试 Agent
(1)首先我们依次启动 bigdata02、bigdata03、bigdata04 上的 Agent:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666
(3)连接上后发送两条数据:
(4)到 hdfs 上验证数据,可以看到由于轮询这两条数据分别存放到不同的 path 上:
[root@bigdata01 soft]# hdfs dfs -ls /load_balance Found 2 items -rw-r--r-- 2 root supergroup 6 2024-02-03 12:44 /load_balance/data101.1588481094383.log.tmp -rw-r--r-- 2 root supergroup 6 2024-02-03 12:44 /load_balance/data102.1588481087463.log.tmp [root@bigdata01 soft]# hdfs dfs -cat /load_balance/data101.1588481094383.log.tmp hangge [root@bigdata01 soft]# hdfs dfs -cat /load_balance/data102.1588481087463.log.tmp baidu
十、Sink 处理器2:Failover Sink Processor(故障转移)
1,整体架构
(1)该案例也是一个 channel 后面接了两个 sink,但是这里和负载均衡架构不一样的是,这两个 sink 正常情况下只有一个干活,另一个是不干活的
(2)如果某一个 sink 输出功能失效,另一个还可以顶上来,同时只会存在一个真正输出数据的 sink。通过故障转移从而解决 sink 组件单点故障的问题。
2,配置 Agent
(1)首先配置 bigdata04 节点上的这个 Agent:参数说明:
- sinks:指定这个 sink groups 中有哪些 sink,指定 sink 的名称,多个的话中间使用空格隔开即可
- processor.type:针对故障转移的 sink 处理器,使用 failover
- processor.priority:指定 sink group 中每一个 sink 组件的优先级,默认情况下 channel 中的数据会被优先级比较高的 sink 取走
- processor.maxpenalty:sink 发生故障之后,最大等待时间
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # 配置source组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件,[为了方便演示效果,把batch-size设置为1] a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.60.101 a1.sinks.k1.port = 41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 192.168.60.102 a1.sinks.k2.port = 41414 a1.sinks.k2.batch-size = 1 # 配置sink策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 10 a1.sinkgroups.g1.processor.priority.k2 = 5 a1.sinkgroups.g1.processor.maxpenalty = 10000 # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
(2)接着配置 bigdata02 上的 Agent:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node1:9000/failover a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data101 a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(3)接着配置 bigdata03 上的 Agent:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node1:9000/failover a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data102 a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3,测试 Agent
(1)首先我们依次启动 bigdata02、bigdata03、bigdata04 上的 Agent:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(2)接着打开一个新的终端窗口首先使用 telnet 连接配置的 6666 端口:
telnet 192.168.60.9 6666
(3)连接上后发送两条数据:
(4)然后到 hdfs 上验证数据,发现两条数据是通过 bigdata02 这台机器写出去的,因为对应 bigdata02 这台机器的 sink 组件的优先级比较高:
[root@bigdata01 ~]# hdfs dfs -ls /failover Found 1 items -rw-r--r-- 2 root supergroup 12 2024-02-03 15:17 /failover/data101.1588490221243.log.tmp [root@bigdata01 ~]# hdfs dfs -cat /failover/data101.1588490221243.log.tmp hangge baidu
(5)接下来我们将 bigdata02 的 agent 停止模拟这台机器上的的 Agent 挂掉,也就意味着 k1 这个 sink 写不出去数据了,此时,我们再通过 socket 发送一条数据:
jojo
(6)此时再到 hdfs 上查看一下,会发现新采集的数据会通过 bigdata03 上的 Agent 写出去:
[root@bigdata01 ~]# hdfs dfs -ls /failover Found 2 items -rw-r--r-- 2 root supergroup 6 2023-02-03 15:17 /failover/data102.1588490267828.log.tmp -rw-r--r-- 2 root supergroup 12 2023-02-03 15:17 /failover/data101.1588490221243.log [root@bigdata01 ~]# hdfs dfs -cat /failover/data102.1588490267828.log.tmp jojo
(7)此时如果把 bigdata02 上的 Agent 再启动的话,会发现新采集的数据会通过 bigdata02 上的 Agent 写出去,这是因为它的优先级比较高。这就是 Sink 故障转移的应用。