Flume日志采集工具使用详解10(自定义组件:将数据写入到MySQL中)
十、自定义组件的实现
1,为什么需要自定义组件?
(1)在实际工作中,95% 以上的数据采集需求都是可以直接使用 Flume 内置的组件来实现,但是谁也不敢保证 100% 都能满足,因为什么奇葩的需求都会有:
- 例如:我们想把 flume 采集到的数据输出到 mysql 中,那这个时候就需要有针对 mysql 的 sink 组件了,但是 Flume 中并没有,因为这种需求不常见,往 mysql 中写的都是结构化数据,数据的格式是固定的,但是 flume 采集的一般都是日志数据,这种属于非结构化数据,不支持也是正常的。
(2)为了实现一些特殊的需求,我们可以自己写一个自定义的组件。
2,实现自定义组件的参考资料
(1)我们可以查看 Flume 官方的开发者文档(点击访问):
(2)只不过开发者文档里面目前还不算太完善,但是基本 source、sink 组件的自定义过程在这里都是有的:
注意:自定义 channel 的内容目前还没完善,如果我们确实想自定义这个组件,就需要到 Flume 源码中找到目前支持的那些 channel 的代码,参考着实现我们自定义的 channel 组件。
(3)例如下面是自定义 Sink 的说明文档:
附:通过自定义组件将数据写入到 MySQL 表中
1,创建 mysql 数据库表
首先我们创建一张表 flume2mysql,flume 采集到数据后会将其插入到该表中:
CREATE TABLE flume2mysql ( id INT(11) NOT NULL AUTO_INCREMENT, createTime VARCHAR(64) NOT NULL, content VARCHAR(255) NOT NULL, PRIMARY KEY (id) ) ENGINE=INNODB DEFAULT CHARSET=utf8;
2,创建自定义组件
(1)我们创建一个 Maven 项目,然后编辑项目的 pom.xml 文件,添加 flume 依赖:
(2)编写自定义的 MysqlSink 类:
(3)然后将项目代码打成 jar 包上传到 flume 的 lib 目录下:
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.11.0</version> </dependency>
(2)编写自定义的 MysqlSink 类:
package com.hangge; import org.apache.flume.conf.Configurable; import org.apache.flume.*; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; /** * 自定义MysqlSink */ public class MysqlSink extends AbstractSink implements Configurable { private String mysqlurl = ""; private String username = ""; private String password = ""; private String tableName = ""; Connection con = null; @Override public Status process(){ Status status = null; // Start transaction 获得Channel对象 Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if (event != null) { //获取body中的数据 String body = new String(event.getBody(), "UTF-8"); //如果日志中有以下关键字的不需要保存,过滤掉 if(body.contains("delete") || body.contains("drop") || body.contains("alert")){ status = Status.BACKOFF; }else { //存入Mysql SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); PreparedStatement stmt = con.prepareStatement("insert into " + tableName + " (createtime, content) values (?, ?)"); stmt.setString(1, createtime); stmt.setString(2, body); stmt.execute(); stmt.close(); status = Status.READY; } }else { status = Status.BACKOFF; } txn.commit(); } catch (Throwable t){ txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally{ txn.close(); } return status; } /** * 获取配置文件中指定的参数 * @param context */ @Override public void configure(Context context) { mysqlurl = context.getString("mysqlurl"); username = context.getString("username"); password = context.getString("password"); tableName = context.getString("tablename"); } @Override public synchronized void start() { try{ //初始化数据库连接 con = DriverManager.getConnection(mysqlurl, username, password); super.start(); System.out.println("finish start"); }catch (Exception ex){ ex.printStackTrace(); } } @Override public synchronized void stop(){ try{ con.close(); }catch(SQLException e) { e.printStackTrace(); } super.stop(); } }
(3)然后将项目代码打成 jar 包上传到 flume 的 lib 目录下:
(4)同时还要把 MySQL 的驱动程序包上传到 flume 的 lib 目录下:
3,自定义组件的使用
(1)首先我们执行如下命令启动 Agent:
(2)然后我们创建文件并写入两条数据:
(3)查看 MySQL 的 flume2mysql 表可以看到 Flume 已经成功采集到数据,并写入到表中:
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(2)然后我们创建文件并写入两条数据:
mkdir -p /data/flumeData echo "hello hangge" >> /data/flumeData/data.log echo "欢迎访问hangge.com" >> /data/flumeData/data.log
(3)查看 MySQL 的 flume2mysql 表可以看到 Flume 已经成功采集到数据,并写入到表中: