Flink - Kafka Connector使用详解2(将数据流写入Kafka)
前文我演示了如何利用 Flink-Kafka-Connector 从 Kafka 中读取数据流 (Source),本文接着通过样例演示如何利用 Kafka Sink 将数据流写入一个或多个 Kafka topic。
(2)下面是实现同样功能的 Java 语言代码:
(2)接着创建一个该主题的消息生产者:
(3)发送如下两条测试数据:
(4)可以看到结果数据已经写入到 Kafka 中了。
二、将数据流写入 Kafka(Kafka Sink)
1,准备工作
首先,我们创建一个 Maven 项目,然后在 pom.xml 文件中除了添加 Flink 相关依赖外,还需要添加 Kafka Consumer 依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency>
2,使用样例
(1)我们对上文的样例做个改进,将 WordCount 的结果保存到 Kafka 中。下面是 Scala 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.base.DeliveryGuarantee import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object StreamKafkaSourceScala { def main(args: Array[String]): Unit = { //获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ // 指定 KafkaSource 相关配置 val source: KafkaSource[String] = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build // 指定 KafkaSource 相关配置 val kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") // 转换 val wordCount = kafkaDS.flatMap(_.split(" ")) //将每一行数据根据空格切分单词 .map((_, 1)) // 每一个单词转换为tuple2的形式(单词,1) .keyBy(tup => tup._1) // 官方推荐使用 keyselector 选择器选择数据 .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法 .sum(1) // 使用sum或者reduce都可以 // 创建 kafkaSink val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.121.128:9092") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("sink_topic") .setValueSerializationSchema(new SimpleStringSchema()) .build ) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build // 输出到 kafka wordCount .map(tup => s"${tup._1}:${tup._2}") // 转换为字符串 .sinkTo(kafkaSink) // 输出到kafka //执行任务 env.execute() } }
(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class StreamKafkaSourceJava { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定 KafkaSource 相关配置 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("192.168.121.128:9092") .setTopics("source_topic") .setGroupId("flink_kafka") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 创建 Kafka 数据流 DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); // 转换 SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = kafkaDS .flatMap(new FlatMapFunction<String, String>() { public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }).map(new MapFunction<String, Tuple2<String, Integer>>() { public Tuple2<String, Integer> map(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }) .keyBy(tup -> tup.f0) // 使用 KeySelector 选择器选择数据 .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法 .sum(1); // 使用 sum 或者 reduce 都可以 // 创建 KafkaSink KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers("192.168.121.128:9092") .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("sink_topic") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // 输出到 Kafka wordCount.map(tuple -> tuple.f0 + ":" + tuple.f1) .sinkTo(kafkaSink); // 执行任务 env.execute(); } }
3,运行测试
(1)首先我们创建号 Kafka 的 topic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic source_topic
(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic source_topic
(3)发送如下两条测试数据:
hangge com com 1 hangge
(4)可以看到结果数据已经写入到 Kafka 中了。

附:进阶技巧
1,动态指定 topic
(1)setTopicSelector 是 Flink 的 Kafka Sink API 提供的一个方法,允许动态地为每条消息选择一个目标主题。可以通过实现 KafkaRecordSerializationSchema.TopicSelector 接口(或使用 Lambda 表达式)来动态确定消息的目标主题。
(2)例如我们根据消息内容动态地决定写入的 Kafka 主题,例如:如果消息以 "error" 开头,则写入 error_topic;如果以 "info" 开头,则写入 info_topic。通过将数据流分流到不同的 Kafka 主题中,便于下游消费。
- 下面是 Scala 语言代码:
// 创建 kafkaSink val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.121.128:9092") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopicSelector(new TopicSelector[String] { override def apply(value: String): String = { // 根据消息内容动态选择主题 if (value.startsWith("error")) "error_topic" else if (value.startsWith("info")) "info_topic" else "default_topic" } }) .setValueSerializationSchema(new SimpleStringSchema()) .build ) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build
- 下面是使用 Java 语言实现同样的功能:
// 创建 KafkaSink KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers("192.168.121.128:9092") .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopicSelector(new TopicSelector<String>() { @Override public String apply(String value) { // 根据消息内容动态选择主题 if (value.startsWith("error")) { return "error_topic"; } else if (value.startsWith("info")) { return "info_topic"; } else { return "default_topic"; } } }) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build();
2,容错
(1)KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee):
- DeliveryGuarantee.NONE:不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
- DeliveryGuarantee.AT_LEAST_ONCE:sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
- DeliveryGuarantee.EXACTLY_ONCE:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于“checkpoint 最大间隔 + 最大重启时间”,否则 Kafka 对未提交事务的过期处理会导致数据丢失。
(2)默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。我们可以通过 setDeliveryGuarantee 方法进行修改。
// 创建 kafkaSink val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.121.128:9092") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("sink_topic") .setValueSerializationSchema(new SimpleStringSchema()) .build ) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 设置 kafka 输出的保证 .build