当前位置: > > > K8s - 使用Strimzi快速搭建Kafka全家桶教程3(Kafka Connect的安装使用)

K8s - 使用Strimzi快速搭建Kafka全家桶教程3(Kafka Connect的安装使用)

    本文通过一个 MQTT Source 的样例,即订阅 MQTT 指定主题消息,并将消息写入到 Kafka 指定主题中,演示如何在 K8s 集群下安装使用 Kafka Connnect

五、Kafka Connect 的安装使用

1,制作自定义的 Kafka Connect 镜像

由于不同项目的业务需求不同,需要的 Connect 也有差异。因此首先我们要根据实际情况制作合适的 Kafka Connect 镜像。

(1)我们可以制作一个包含所有常用 Connect 的镜像。但由于本文演示的是 MQTT -> Kafka,因此只集成 MQTT Connect 即可。这里我使用的是 Confluent 提供的 MQTT Connector,首先访问其官网(点击访问)进行下载。

(2)将压缩包解压后,libs 文件夹里就是我们需要的 jar 包:

(3)在服务器上创建一个 plugins 目录:
mkdir plugins

(4)接着进入 plugins 文件夹创建一个 kafka-connect-mqtt 目录:
cd plugins
mkdir kafka-connect-mqtt

(5)前面解压出来的 libs 文件下的所有 jar 包都上传到 kafka-connect-mqtt 目录下:

(6)退回到与 plugins 文件夹同一级目录位置,执行如下命令创建一个 Dockerfile 文件:
vi Dockerfile

(7)在文件中添加如下内容,然后保存退出。
FROM strimzi/kafka:0.15.0-kafka-2.3.1
USER root:root
COPY ./plugins/ /opt/kafka/plugins/
USER 1001

(8)最后执行如下命令制作镜像,镜像名为 kafka-connect-mqtt
docker build -t kafka-connect-mqtt .

2,部署 Kafka Connect

(1)首先在服务器上创建一个 my-kafka-connect.yaml 文件,内容如下:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  version: 2.5.0
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  image: kafka-connect-mqtt
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status

(2)然后执行如下命令开始创建:
kubectl apply -f my-kafka-connect.yaml -n kafka

(3)执行如下命令可以查看是否创建成功:
kubectl get pods -n kafka
kubectl get service -n kafka

(4)调用 kafka connect 服务的相关接口,查看已安装的 Connectors,可以发现里面确实已经包含了 MqttSinkConnectorMqttSourceConnector,当然本次样例我们只用到了后者:
提示:由于本样例 kafka connect 服务并未暴露端口供 k8s 集群外部访问,所以这里我们随便进入一个 Kafka 服务实例来访问 kafka connect 服务接口。
kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connector-plugins

3,添加 Connect

(1)首先我面在服务器上创建一个 mqtt-source.json 文件,内容如下:
提示: 大家根据自己实际情况主要改动高亮部分内容即可(来源 MQTTtopic、目标 Kafkatopic、来源 MQTT 地址、目标 Kafka 地址)。下面配置表示将所有满足 /mqtt/# 主题的消息都写入到 Kafkamy_mqtt 主题中。
{
  "name": "mqtt-source",
  "config": {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.topics" : "/mqtt/#",
    "kafka.topic" : "my_mqtt",
    "mqtt.server.uri" : "tcp://192.168.60.2:1883",
    "confluent.topic.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter.schemas.enable" : "false",
    "value.converter.schemas.enable" : "false"
  }
}

(2)然后执行如下命令将这个配置文件提交到 Kafka Connect
cat mqtt-source.json | kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-

(3)最后执行如下命令可以看到这个配置已经添加成功:
kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connectors

4,开始测试

(1)首先我们使用 MQTTBox 这个客户端工具往 MQTT 服务器发送一条消息:

(2)接着在 Kafka 这边可以看到 my_mqtt 这个主题里已经出现了这条数据,说明刚添加的 Connect 已经生效。
评论0