Kafka Connect的部署和使用详解1(安装配置、基本用法)
Kafka 是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为了集成其他系统和解耦应用,经常使用 Producer 来发送消息到 Broker,并使用 Consumer 来消费 Broker 中的消息。
Kafka Connect 是到 0.9 版本才开始提供,它极大的简化了其他系统与 Kafka 的集成。Kafka Connect 运用用户快速定义并实现各种 Connector(File、Jdbc、Hdfs 等),这些功能让大批量数据导入/导出 Kafka 很方便。
如下图所示,左侧的 Sources 负责从其他异构系统中读取数据并导入到 Kafka 中;右侧的 Sinks 是把 Kafka 中的数据写入到其他的系统中。
(3)下载执行如下命令进行解压:
(3)接着修改 connect-file-sink.properties 文件,内容如下,表示从 connect-test 这个 topic 中读取数据并写入到 output.txt 文件中。
(2)启动后可以通过 jps 命令检查是否启动成功,如出现图表示启动成功:
(2)接着在 Kafka 这边可以看到 connect-test 这个主题里已经出现了这些数据,说明 file source connector 已生效。
Kafka Connect 是到 0.9 版本才开始提供,它极大的简化了其他系统与 Kafka 的集成。Kafka Connect 运用用户快速定义并实现各种 Connector(File、Jdbc、Hdfs 等),这些功能让大批量数据导入/导出 Kafka 很方便。
如下图所示,左侧的 Sources 负责从其他异构系统中读取数据并导入到 Kafka 中;右侧的 Sinks 是把 Kafka 中的数据写入到其他的系统中。
一、基本介绍
1,什么是 Kafka Connect?
- Kafka Connect 是从 Kafka 0.9+ 版本开始增加了一个新的特性,可以更方便的创建和管理数据流管道。它使得能够快速定义将大量数据集合移入和移出 Kafka 的连接器变得简单。
- Kafka Connect 可以将完整的数据库注入到 Kafka 的 Topic 中,比如将服务器的系统监控指标注入到 Kafka,然后像正常的 Kafka 流处理机制一样进行数据流处理。
- 导出工作则是将数据从 Kafka Topic 中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、Elastic Search、Apache Ignite 等。
2,Kafka Connect 核心概念
Kafka Connect 有两个核心概念:Source 和 Sink。 Source 负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为 Connector。
3,Kafka Connect 特性
- Kafka connector 通用框架,提供统一的集成 API
- 同时支持分布式模式和单机模式
- REST 接口,用来查看和管理 Kafka connectors
- 自动化的 offset 管理,开发人员不必担心错误处理的影响
- 分布式、可扩展
- 流/批处理集成
4,Kafka Connect 工作模式
- standalone 模式:在该模式中,所有的 worker 都在一个独立的进程中完成
- distributed 模式:该模式具有高扩展性,以及提供自动容错机制。你可以使用一个 group.ip 来启动很多 worker 进程,在有效的 worker 进程中它们会自动的去协调执行 connector 和 task,如果你新加了一个 worker 或者挂了一个 worker,其他的 worker 会检测到然后在重新分配 connector 和 task。
二、部署使用
1,下载程序包
(1)首先我们从官网上下载 Kafka 安装包,地址如下:
(2)假设我这里下载的版本为 2.4.0:
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
(3)下载执行如下命令进行解压:
tar zxvf kafka_2.12-2.4.0.tgz -C ./
2,修改配置
(1)本次我们使用 standalone 模式启动 Kafka Connect,编辑 conf/connect-standalone.properties 文件,修改如下内容并保存。
提示:bootstrap.servers 设置为 Kafka 集群的地址。
(2)Kafka 自带了 FileStreamSinkConnector 和 FileStreamSourceConnector 这两个 Connector。首先修改 conf/connect-file-source.properties 文件,内容如下,表示从 test.txt 中读取并发布到 connect-test 这个 topic 中。
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/usr/local/test.txt topic=connect-test
(3)接着修改 connect-file-sink.properties 文件,内容如下,表示从 connect-test 这个 topic 中读取数据并写入到 output.txt 文件中。
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=/usr/local/output.txt topics=connect-test
3,启动服务
(1)执行如下命令启动 Kafka Connect,注意命令后面要带上相关的配置文件:
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
- 由于上面启动方式退出终端后会自动结束程序,为了确保 Kafka Connect 能够一直在后台运行,可以添加个 -daemon 参数:
./bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
(2)启动后可以通过 jps 命令检查是否启动成功,如出现图表示启动成功:
如果提示 jps 找不到命令,可以执行如下命令进行安装:
- yum install java-1.8.0-openjdk-devel.x86_64
(3)Kafka Connect 提供了 REST API 方便我们去管理(默认端口 8083),我们通过 /connector-plugins 接口可以看到里面确实已经包含了 FileStreamSinkConnector 和 FileStreamSourceConnector:
(4)通过 /connectors 接口可以看到 source 和 sink 的配置也已经添加成功:
4,开始测试
(1)执行如下命令往 text.txt 中写入一些数据:
echo '12345678' >> /usr/local/test.txt echo 'hangge.com' >> /usr/local/test.txt echo '航歌' >> /usr/local/test.txt
(2)接着在 Kafka 这边可以看到 connect-test 这个主题里已经出现了这些数据,说明 file source connector 已生效。
(3)接着查看 output.txt 文件,可以看到数据也过来了,说明 file sink connector 也已生效。
附:Kafka Connect 的 REST API 接口
由于 Kafka Connect 的意图是以服务的方式去运行,所以它提供了 REST API 去管理 connectors,默认的端口是 8083(我们也可以在启动 Kafka Connect 之前在配置文件中添加 rest.port 配置):
- GET /connectors:返回所有正在运行的 connector 名
- POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
- GET /connectors/{name}:获取指定 connetor 的信息
- GET /connectors/{name}/config:获取指定 connector 的配置信息
- PUT /connectors/{name}/config:更新指定 connector 的配置信息
- GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
- GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。
- GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息
- PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
- PUT /connectors/{name}/resume:恢复一个被暂停的 connector
- POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
- POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
- DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。