当前位置: > > > Kafka Connect的部署和使用详解1(安装配置、基本用法)

Kafka Connect的部署和使用详解1(安装配置、基本用法)

    Kafka 是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为了集成其他系统和解耦应用,经常使用 Producer 来发送消息到 Broker,并使用 Consumer 来消费 Broker 中的消息。
    Kafka Connect 是到 0.9 版本才开始提供,它极大的简化了其他系统与 Kafka 的集成。Kafka Connect 运用用户快速定义并实现各种 ConnectorFileJdbcHdfs 等),这些功能让大批量数据导入/导出 Kafka 很方便。
    如下图所示,左侧的 Sources 负责从其他异构系统中读取数据并导入到 Kafka 中;右侧的 Sinks 是把 Kafka 中的数据写入到其他的系统中。

一、基本介绍

1,什么是 Kafka Connect?

  • Kafka Connect 是从 Kafka 0.9+ 版本开始增加了一个新的特性,可以更方便的创建和管理数据流管道。它使得能够快速定义将大量数据集合移入和移出 Kafka 的连接器变得简单。
  • Kafka Connect 可以将完整的数据库注入到 KafkaTopic 中,比如将服务器的系统监控指标注入到 Kafka,然后像正常的 Kafka 流处理机制一样进行数据流处理。
  • 导出工作则是将数据从 Kafka Topic 中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、Elastic SearchApache Ignite 等。

2,Kafka Connect 核心概念

    Kafka Connect 有两个核心概念:SourceSinkSource 负责导入数据到 KafkaSink 负责从 Kafka 导出数据,它们都被称为 Connector

3,Kafka Connect 特性

  • Kafka connector 通用框架,提供统一的集成 API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理 Kafka connectors
  • 自动化的 offset 管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

4,Kafka Connect 工作模式

  • standalone 模式:在该模式中,所有的 worker 都在一个独立的进程中完成
  • distributed 模式:该模式具有高扩展性,以及提供自动容错机制。你可以使用一个 group.ip 来启动很多 worker 进程,在有效的 worker 进程中它们会自动的去协调执行 connectortask,如果你新加了一个 worker 或者挂了一个 worker,其他的 worker 会检测到然后在重新分配 connectortask

二、部署使用

1,下载程序包

(1)首先我们从官网上下载 Kafka 安装包,地址如下:

(2)假设我这里下载的版本为 2.4.0
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz

(3)下载执行如下命令进行解压:
tar zxvf kafka_2.12-2.4.0.tgz -C ./

2,修改配置

(1)本次我们使用 standalone 模式启动 Kafka Connect,编辑 conf/connect-standalone.properties 文件,修改如下内容并保存。
提示bootstrap.servers 设置为 Kafka 集群的地址。

(2)Kafka 自带了 FileStreamSinkConnectorFileStreamSourceConnector 这两个 Connector。首先修改 conf/connect-file-source.properties 文件,内容如下,表示从 test.txt 中读取并发布到 connect-test 这个 topic 中。
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/usr/local/test.txt
topic=connect-test

(3)接着修改 connect-file-sink.properties 文件,内容如下,表示从 connect-test 这个 topic 中读取数据并写入到 output.txt 文件中。
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/usr/local/output.txt
topics=connect-test

3,启动服务

(1)执行如下命令启动 Kafka Connect,注意命令后面要带上相关的配置文件:
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 由于上面启动方式退出终端后会自动结束程序,为了确保 Kafka Connect 能够一直在后台运行,可以添加个 -daemon 参数:
./bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

(2)启动后可以通过 jps 命令检查是否启动成功,如出现图表示启动成功:
如果提示 jps 找不到命令,可以执行如下命令进行安装:
  • yum install java-1.8.0-openjdk-devel.x86_64

(3)Kafka Connect 提供了 REST API 方便我们去管理(默认端口 8083),我们通过 /connector-plugins 接口可以看到里面确实已经包含了 FileStreamSinkConnectorFileStreamSourceConnector

(4)通过 /connectors 接口可以看到 sourcesink 的配置也已经添加成功:

4,开始测试

(1)执行如下命令往 text.txt 中写入一些数据:
echo '12345678' >> /usr/local/test.txt
echo 'hangge.com' >> /usr/local/test.txt
echo '航歌' >> /usr/local/test.txt

(2)接着在 Kafka 这边可以看到 connect-test 这个主题里已经出现了这些数据,说明 file source connector 已生效。

(3)接着查看 output.txt 文件,可以看到数据也过来了,说明 file sink connector 也已生效。

附:Kafka Connect 的 REST API 接口

    由于 Kafka Connect 的意图是以服务的方式去运行,所以它提供了 REST API 去管理 connectors,默认的端口是 8083(我们也可以在启动 Kafka Connect 之前在配置文件中添加 rest.port 配置):
  • GET /connectors:返回所有正在运行的 connector
  • POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,nameconnector 的名字,configjson 格式,必须包含你的 connector 的配置信息。
  • GET /connectors/{name}:获取指定 connetor 的信息
  • GET /connectors/{name}/config:获取指定 connector 的配置信息
  • PUT /connectors/{name}/config:更新指定 connector 的配置信息
  • GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
  • GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task
  • GET /connectors/{name}/tasks/{taskid}/status:获取指定 connectortask 的状态信息
  • PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
  • PUT /connectors/{name}/resume:恢复一个被暂停的 connector
  • POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
  • POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
  • DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。
评论0