当前位置: > > > Flume日志采集工具使用详解10(自定义组件:将数据写入到MySQL中)

Flume日志采集工具使用详解10(自定义组件:将数据写入到MySQL中)

十、自定义组件的实现

1,为什么需要自定义组件?

(1)在实际工作中,95% 以上的数据采集需求都是可以直接使用 Flume 内置的组件来实现,但是谁也不敢保证 100% 都能满足,因为什么奇葩的需求都会有:
  • 例如:我们想把 flume 采集到的数据输出到 mysql 中,那这个时候就需要有针对 mysql sink 组件了,但是 Flume 中并没有,因为这种需求不常见,往 mysql 中写的都是结构化数据,数据的格式是固定的,但是 flume 采集的一般都是日志数据,这种属于非结构化数据,不支持也是正常的。
(2)为了实现一些特殊的需求,我们可以自己写一个自定义的组件。

2,实现自定义组件的参考资料

(1)我们可以查看 Flume 官方的开发者文档(点击访问):

(2)只不过开发者文档里面目前还不算太完善,但是基本 sourcesink 组件的自定义过程在这里都是有的:
注意:自定义 channel 的内容目前还没完善,如果我们确实想自定义这个组件,就需要到 Flume 源码中找到目前支持的那些 channel 的代码,参考着实现我们自定义的 channel 组件。

(3)例如下面是自定义 Sink 的说明文档:

附:通过自定义组件将数据写入到 MySQL 表中

1,创建 mysql 数据库表

首先我们创建一张表 flume2mysqlflume 采集到数据后会将其插入到该表中:
CREATE TABLE flume2mysql (
  id INT(11) NOT NULL AUTO_INCREMENT,
  createTime VARCHAR(64) NOT NULL,
  content VARCHAR(255) NOT NULL,
  PRIMARY KEY (id)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

2,创建自定义组件

(1)我们创建一个 Maven 项目,然后编辑项目的 pom.xml 文件,添加 flume 依赖:
<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.11.0</version>
</dependency>

(2)编写自定义的 MysqlSink 类:
package com.hangge;
import org.apache.flume.conf.Configurable;
import org.apache.flume.*;
import org.apache.flume.sink.AbstractSink;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 自定义MysqlSink
 */
public class MysqlSink extends AbstractSink implements Configurable {
  private String mysqlurl = "";
  private String username = "";
  private String password = "";
  private String tableName = "";

  Connection con = null;

  @Override
  public Status process(){
    Status status = null;
    // Start transaction 获得Channel对象
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try
    {
      Event event = ch.take();

      if (event != null)
      {
        //获取body中的数据
        String body = new String(event.getBody(), "UTF-8");

        //如果日志中有以下关键字的不需要保存,过滤掉
        if(body.contains("delete") || body.contains("drop") || body.contains("alert")){
          status = Status.BACKOFF;
        }else {

          //存入Mysql
          SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          String createtime = df.format(new Date());

          PreparedStatement stmt = con.prepareStatement("insert into " + tableName
                  + " (createtime, content) values (?, ?)");
          stmt.setString(1, createtime);
          stmt.setString(2, body);
          stmt.execute();
          stmt.close();
          status = Status.READY;
        }
      }else {
        status = Status.BACKOFF;
      }

      txn.commit();
    } catch (Throwable t){
      txn.rollback();
      t.getCause().printStackTrace();
      status = Status.BACKOFF;
    } finally{
      txn.close();
    }

    return status;
  }
  /**
   * 获取配置文件中指定的参数
   * @param context
   */
  @Override
  public void configure(Context context) {
    mysqlurl = context.getString("mysqlurl");
    username = context.getString("username");
    password = context.getString("password");
    tableName = context.getString("tablename");
  }

  @Override
  public synchronized void start() {
    try{
      //初始化数据库连接
      con = DriverManager.getConnection(mysqlurl, username, password);
      super.start();
      System.out.println("finish start");
    }catch (Exception ex){
      ex.printStackTrace();
    }
  }

  @Override
  public synchronized void stop(){
    try{
      con.close();
    }catch(SQLException e) {
      e.printStackTrace();
    }
    super.stop();
  }
}

(3)然后将项目代码打成 jar 包上传到 flume lib 目录下:

(4)同时还要把 MySQL 的驱动程序包上传到 flumelib 目录下:

3,自定义组件的使用

(1)首先我们执行如下命令启动 Agent
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &

(2)然后我们创建文件并写入两条数据:
mkdir -p /data/flumeData
echo "hello hangge" >> /data/flumeData/data.log
echo "欢迎访问hangge.com" >> /data/flumeData/data.log

(3)查看 MySQL flume2mysql 表可以看到 Flume 已经成功采集到数据,并写入到表中:
评论0