当前位置: > > > Flink - Kafka Connector使用详解2(将数据流写入Kafka)

Flink - Kafka Connector使用详解2(将数据流写入Kafka)

    前文我演示了如何利用 Flink-Kafka-ConnectorKafka 中读取数据流 (Source),本文接着通过样例演示如何利用 Kafka Sink 将数据流写入一个或多个 Kafka topic

二、将数据流写入 Kafka(Kafka Sink)

1,准备工作

首先,我们创建一个 Maven 项目,然后在 pom.xml 文件中除了添加 Flink 相关依赖外,还需要添加 Kafka Consumer 依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.0</version>
</dependency>

2,使用样例

(1)我们对上文的样例做个改进,将 WordCount 的结果保存到 Kafka 中。下面是 Scala 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object StreamKafkaSourceScala {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    // 指定 KafkaSource 相关配置
    val source: KafkaSource[String] = KafkaSource.builder[String]
      .setBootstrapServers("192.168.121.128:9092")
      .setTopics("source_topic")
      .setGroupId("flink_kafka")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build

    // 指定 KafkaSource 相关配置
    val kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")

    // 转换
    val wordCount = kafkaDS.flatMap(_.split(" ")) //将每一行数据根据空格切分单词
      .map((_, 1)) // 每一个单词转换为tuple2的形式(单词,1)
      .keyBy(tup => tup._1) // 官方推荐使用 keyselector 选择器选择数据
      .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
      .sum(1) // 使用sum或者reduce都可以

    // 创建 kafkaSink
    val kafkaSink = KafkaSink.builder[String]
      .setBootstrapServers("192.168.121.128:9092")
      .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("sink_topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build
      )
      .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .build

    // 输出到 kafka
    wordCount
      .map(tup => s"${tup._1}:${tup._2}") // 转换为字符串
      .sinkTo(kafkaSink) // 输出到kafka

    //执行任务
    env.execute()
  }
}

(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamKafkaSourceJava {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指定 KafkaSource 相关配置
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.121.128:9092")
                .setTopics("source_topic")
                .setGroupId("flink_kafka")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 创建 Kafka 数据流
        DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(),
                "Kafka Source");

        // 转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = kafkaDS
                .flatMap(new FlatMapFunction<String, String>() {
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(word);
                        }
                    }
                }).map(new MapFunction<String, Tuple2<String, Integer>>() {
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                })
                .keyBy(tup -> tup.f0) // 使用 KeySelector 选择器选择数据
                .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
                .sum(1); // 使用 sum 或者 reduce 都可以

        // 创建 KafkaSink
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.121.128:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic("sink_topic")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // 输出到 Kafka
        wordCount.map(tuple -> tuple.f0 + ":" + tuple.f1)
                .sinkTo(kafkaSink);

        // 执行任务
        env.execute();
    }
}

3,运行测试

(1)首先我们创建号 Kafkatopic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic source_topic

(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic source_topic

(3)发送如下两条测试数据:
hangge com com
1 hangge

(4)可以看到结果数据已经写入到 Kafka 中了。

附:进阶技巧

1,动态指定 topic

(1)setTopicSelectorFlinkKafka Sink API 提供的一个方法,允许动态地为每条消息选择一个目标主题。可以通过实现 KafkaRecordSerializationSchema.TopicSelector 接口(或使用 Lambda 表达式)来动态确定消息的目标主题。

(2)例如我们根据消息内容动态地决定写入的 Kafka 主题,例如:如果消息以 "error" 开头,则写入 error_topic;如果以 "info" 开头,则写入 info_topic。通过将数据流分流到不同的 Kafka 主题中,便于下游消费。
  • 下面是 Scala 语言代码:
// 创建 kafkaSink
val kafkaSink = KafkaSink.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    .setTopicSelector(new TopicSelector[String] {
      override def apply(value: String): String = {
        // 根据消息内容动态选择主题
        if (value.startsWith("error")) "error_topic"
        else if (value.startsWith("info")) "info_topic"
        else "default_topic"
      }
    })
    .setValueSerializationSchema(new SimpleStringSchema())
    .build
  )
  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  .build
  • 下面是使用 Java 语言实现同样的功能:
// 创建 KafkaSink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("192.168.121.128:9092")
    .setRecordSerializer(
            KafkaRecordSerializationSchema.builder()
                    .setTopicSelector(new TopicSelector<String>() {
                        @Override
                        public String apply(String value) {
                            // 根据消息内容动态选择主题
                            if (value.startsWith("error")) {
                                return "error_topic";
                            } else if (value.startsWith("info")) {
                                return "info_topic";
                            } else {
                                return "default_topic";
                            }
                        }
                    })
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

2,容错

(1)KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee):
  • DeliveryGuarantee.NONE:不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
  • DeliveryGuarantee.AT_LEAST_ONCEsinkcheckpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于“checkpoint 最大间隔 + 最大重启时间”,否则 Kafka 对未提交事务的过期处理会导致数据丢失。
(2)默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。我们可以通过 setDeliveryGuarantee 方法进行修改。
// 创建 kafkaSink
val kafkaSink = KafkaSink.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    .setTopic("sink_topic")
    .setValueSerializationSchema(new SimpleStringSchema())
    .build
  )
  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 设置 kafka 输出的保证
  .build
评论0