Flink - Kafka Connector使用详解1(读取kafka数据流)
Flink 社区提供了丰富的连接器(Connectors)以方便与不同的数据源进行交互,其 Flink-Kafka-Connector 是 Flink 提供的一个专门用于与 Kafka 集成的组件。通过这个连接器,用户可以轻松地从 Kafka 中读取数据流 (Source)或将数据流写入到 Kafka(Sink)。本文首先介绍如何从 Kafka 中读取数据。
(2)下面是实现同样功能的 Java 语言代码:
(2)接着创建一个该主题的消息生产者:
(3)发送如下两条测试数据:
(4)可以看到控制台输出内容如下:
(2)我们也可也订阅与正则表达式所匹配的 Topic 下的所有 Partition:
(3)还可以通过 Partition 列表订阅指定的 Partition:
(2)为了避免重复消费问题,确保程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
一、读取 Kafka 数据(Kafka Source)
1,准备工作
首先,我们创建一个 Maven 项目,然后在 pom.xml 文件中除了添加 Flink 相关依赖外,还需要添加 Kafka Consumer 依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency>
2,使用样例
(1)我们读取 Kafka 的制定 topic 数据并进行单词统计,然后打印到控制台。下面是 Scala 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object StreamKafkaSourceScala { def main(args: Array[String]): Unit = { //获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ // 指定 KafkaSource 相关配置 val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build // 指定 KafkaSource 相关配置 val kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") // 转换 val wordCount = kafkaDS.flatMap(_.split(" ")) //将每一行数据根据空格切分单词 .map((_, 1)) //每一个单词转换为tuple2的形式(单词,1) .keyBy(tup => tup._1) //官方推荐使用keyselector选择器选择数据 .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法 .sum(1) // 使用sum或者reduce都可以 // 输出 wordCount.print() //执行任务 env.execute() } }
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class StreamKafkaSourceJava { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定 KafkaSource 相关配置 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 创建 Kafka 数据流 DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); // 转换 SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = kafkaDS .flatMap(new FlatMapFunction<String, String>() { public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }).map(new MapFunction<String, Tuple2<String, Integer>>() { public Tuple2<String, Integer> map(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }) .keyBy(tup -> tup.f0) // 使用 KeySelector 选择器选择数据 .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法 .sum(1); // 使用 sum 或者 reduce 都可以 // 输出 wordCount.print(); // 执行任务 env.execute(); } }
3,运行测试
(1)首先我们创建好 Kafka 的 topic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic source_topic
(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic source_topic
(3)发送如下两条测试数据:
hangge com com 1 hangge
(4)可以看到控制台输出内容如下:

附:进阶技巧
1,Topic / Partition 订阅方式
(1)下面样例我们订阅 Topic 列表中所有 Partition 的消息:
// 指定 KafkaSource 相关配置 val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic1", "source_topic2", "source_topic3") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build
(2)我们也可也订阅与正则表达式所匹配的 Topic 下的所有 Partition:
// 指定 KafkaSource 相关配置 val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic*") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build
(3)还可以通过 Partition 列表订阅指定的 Partition:
// 指定 KafkaSource 相关配置 val partitionSet = Set( new TopicPartition("source_topic1", 0), // Partition 0 of topic "source_topic1" new TopicPartition("source_topic2", 3) // Partition 3 of topic "source_topic2" ) import scala.collection.JavaConverters._ // 用于转换 Set[TopicPartition] 为 Java Set val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setPartitions(partitionSet.asJava) // 使用转换后的 Java Set .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build
2,起始消费位点、自定提交 offset
(1)Kafka source 能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费 。内置的位点初始化器包括:
- 从最早位点开始消费(默认)
// 指定 KafkaSource 相关配置 val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build
- 从最末尾位点开始消费:
.setStartingOffsets(OffsetsInitializer.latest())
- 从时间戳大于等于指定时间戳(毫秒)的数据开始消费:
.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
- 从消费组提交的位点开始消费,不指定位点重置策略:
.setStartingOffsets(OffsetsInitializer.committedOffsets())
- 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点:
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
(2)为了避免重复消费问题,确保程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
// 指定 KafkaSource 相关配置 val source = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("con") .setProperty("enable.auto.commit","true") // 启用自动提交消费位移(offset)的功能 .setProperty("auto.commit.interval.ms","1000") // 设置自动提交消费位移的时间间隔为1秒 // 消费者的起始位移被设置为已提交的最早位移 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setValueOnlyDeserializer(new SimpleStringSchema()) .build
3,动态分区检查
(1)为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。
(2)分区检查功能默认不开启。要启用动态分区检查,将 partition.discovery.interval.ms 设置为非负值即可。
// 指定 KafkaSource 相关配置 val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区 .build