Flume日志采集工具使用详解5(样例3:日志分发输出至HDFS、拦截器使用)
有时系统的日志文件中会包含各种类型的日志信息,为了便于数据后续的处理和分析,我们可以使用拦截器根据数据类型进行数据分发。比如将相同类型的数据输出到同一 HDFS 目录下,下面我将通过样例进行演示。
(2)接着在配置文件中添加如下内容。我们配置 Flume 的源、拦截器和目的地,以实现对日志数据的分发。其中拦截器使用逻辑如下:
(2)接着创建 /usr/local/moreType.log 日志文件,并在里面添加一些测试数据:
(3)最后到 HDFS 上验证结果,可以看到不同的日志数据也都保存在对应日期和类型的目录下了:
五、使用拦截器实现日志文件数据分发
1,拦截器说明
(1)系统中已经内置提供了很多 Source Interceptors,常见的有:
- Timestamp Interceptor:向 event 中的 header 里面添加 timestamp 时间戳信息
- Host Interceptor:向 event 中的 header 里面添加 host 属性,host 的值为当前机器的主机名或者 ip
- Search and Replace Interceptor:根据指定的规则查询 Event 中 body 里面的数据,然后进行替换,这个拦截器会修改 event 中 body 的值,也就是会修改原始采集到的数据内容
- Static Interceptor:向 event 中的 header 里面添加固定的 key 和 value
- Regex Extractor Interceptor:根据指定的规则从 Event 中的 body 里面抽取数据,生成 key 和 value,再把 key 和 value 添加到 header 中
(2)总结一下:
- Timestamp Interceptor、Host Interceptor、Static Interceptor、Regex Extractor Interceptor 是向 event 中的 header 里面添加 key-value 类型的数据,方便后面的 channel 和 sink 组件使用,对采集到的原始数据内容没有任何影响。
- Search and Replace Interceptor 是会根据规则修改 event 中 body 里面的原始数据内容,对 header 没有任何影响,使用这个拦截器需要特别小心,因为他会修改原始数据内容。
提示:这里面这几个拦截器中 Search and Replace Interceptor 和 Regex Extractor Interceptor 在工作中使用的比较多一些
2,需求说明
(1)直播 APP 产生的数据包括用户信息、直播信息和送礼信息。这 3 种数据会被记录到同一个日志文件中,具体数据格式如下(这些数据都是 JSON 格式的,可以通过 JSON 中的 type 字段的值来区分数据类型):
- 用户信息数据格式如下:
{ "uid": "861848974414839801", "nickname": "hangge", "usign": "", "sex": 1, "birthday": "", "face": "", "big_face": "", "email": "hangge@hangge.com", "mobile": "", "reg_type": "102", "last_login_time": "1494344580", "regtime": "1494344580", "last_update_time": "1494344580", "status": "5", "is_verified": "0", "verified_info": "", "is_seller": "0", "level": 1, "exp": 0, "anchor_level": 0, "anchor_exp": 0, "os": "android", "timestamp": 1494344580, "type": "user_info" }
- 直播信息数据格式如下:
{ "id": "14943445328940974601", "uid": "840717325115457536", "lat": "53.530598", "lnt": "-2.5620373", "hots": 0, "title": "0", "status": "1", "topicId": "0", "end_time": "1494344570", "watch_num": 0, "share_num": "1", "replay_url": null, "replay_num": 0, "start_time": "1494344544", "timestamp": 1494344571, "type": "video_info" }
- 送礼信息数据格式如下:
{ "send_id": "834688818270961664", "good_id": "223", "video_id": "14943443045138661356", "gold": "10", "timestamp": 1494344574, "type": "gift_record" }
(2)使用 Flume 按天把日志数据保存到 HDFS 中的对应日期目录下,需要先按“天”再按“类型”分目录存储,主要是因为后期的需求大部分都需要按天和类型分别统计分析。
3,配置 Agent
(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf vi example.conf
(2)接着在配置文件中添加如下内容。我们配置 Flume 的源、拦截器和目的地,以实现对日志数据的分发。其中拦截器使用逻辑如下:
- 先使用 Search and Replace Interceptor 对原始数据中 type 的值进行转换(去掉下划线,转化为驼峰形式)。
- 然后使用 Regex Extractor Interceptor 指定规则获取 type 字段的值,添加到 header 中。这样在 sink 阶段就可以获取到了。
提示:
- 此时 Flume 的 Source 可以使用 Exec Source,Channle 可以使用 Memory Channel,Sink 可以使用 HDFS Sink。
- 拦截器中 "type":"([^"]+)" 这个正则表达式的含义是匹配字符串 "type":",然后使用 ([^"]+) 来捕获非引号字符的序列,直到遇到下一个引号为止。这样就可以提取出 type 字段的值。
# 定义 Agent 内部 3 大组件的名称 a1.sources = r1 a1.sinks = k1 a1.channels =c1 #配置source组件源 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/moreType.log # 配置拦截器 [多个拦截器按照顺序依次执行] a1.sources.r1.interceptors = i1 i2 i3 i4 a1.sources.r1.interceptors.i1.type = search_replace a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info" a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo" a1.sources.r1.interceptors.i2.type = search_replace a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info" a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo" a1.sources.r1.interceptors.i3.type = search_replace a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record" a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord" a1.sources.r1.interceptors.i4.type = regex_extractor a1.sources.r1.interceptors.i4.regex = "type":"([^"]+)" a1.sources.r1.interceptors.i4.serializers = s1 a1.sources.r1.interceptors.i4.serializers.s1.name = logType #配置channel组件 a1.channels.c1.type = memory #配置sink组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node1:9000/moreType/%Y%m%d/%{logType} a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data a1.sinks.k1.hdfs.fileSuffix = .log #把组件连接起来 a1.sources.r1.channels =c1 a1.sinks.k1.channel = c1
4,运行测试
(1)我们首先启动 Agent:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf
(2)接着创建 /usr/local/moreType.log 日志文件,并在里面添加一些测试数据:
{"uid":"861848974414839801","nickname":"hangge","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"hangge@hangge.com","mobile":"","reg_type":"102","last_login_time":"1494344580","regtime":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"} {"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"} {"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"} {"send_id":"987654321098765432","good_id":"456","video_id":"98765432109876543210","gold":"20","timestamp":1632451202,"type":"gift_record"} {"uid":"789012345678901234","nickname":"test_user1","usign":"","sex":0,"birthday":"","face":"","big_face":"","email":"test1@example.com","mobile":"","reg_type":"101","last_login_time":"1627367800","regtime":"1627367800","last_update_time":"1627367800","status":"3","is_verified":"1","verified_info":"","is_seller":"1","level":3,"exp":500,"anchor_level":2,"anchor_exp":100,"os":"iOS","timestamp":1627367800,"type":"user_info"} {"id":"98765432109876543210","uid":"123456789012345678","lat":"40.7128","lnt":"-74.0060","hots":10,"title":"Amazing Sunset","status":"2","topicId":"123","end_time":"1632451200","watch_num":1000,"share_num":"50","replay_url":"https://example.com/replay1","replay_num":500,"start_time":"1632450000","timestamp":1632451201,"type":"video_info"}
(3)最后到 HDFS 上验证结果,可以看到不同的日志数据也都保存在对应日期和类型的目录下了: