当前位置: > > > Flink - Kafka Connector使用详解1(读取kafka数据流)

Flink - Kafka Connector使用详解1(读取kafka数据流)

    Flink 社区提供了丰富的连接器(Connectors)以方便与不同的数据源进行交互,其 Flink-Kafka-ConnectorFlink 提供的一个专门用于与 Kafka 集成的组件。通过这个连接器,用户可以轻松地从 Kafka 中读取数据流 (Source)或将数据流写入到 KafkaSink)。本文首先介绍如何从 Kafka 中读取数据。

一、读取 Kafka 数据(Kafka Source)

1,准备工作

首先,我们创建一个 Maven 项目,然后在 pom.xml 文件中除了添加 Flink 相关依赖外,还需要添加 Kafka Consumer 依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.0</version>
</dependency>

2,使用样例

(1)我们读取 Kafka 的制定 topic 数据并进行单词统计,然后打印到控制台。下面是 Scala 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object StreamKafkaSourceScala {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    // 指定 KafkaSource 相关配置
    val source: KafkaSource[String] = KafkaSource.builder[String]
      .setBootstrapServers("192.168.121.128:9092")
      .setTopics("source_topic")
      .setGroupId("flink_kafka")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build

    // 指定 KafkaSource 相关配置
    val kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")

    // 转换
    val wordCount = kafkaDS.flatMap(_.split(" ")) //将每一行数据根据空格切分单词
      .map((_, 1)) //每一个单词转换为tuple2的形式(单词,1)
      .keyBy(tup => tup._1) //官方推荐使用keyselector选择器选择数据
      .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
      .sum(1) // 使用sum或者reduce都可以

    // 输出
    wordCount.print()

    //执行任务
    env.execute()
  }
}

(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamKafkaSourceJava {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指定 KafkaSource 相关配置
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.121.128:9092")
                .setTopics("source_topic")
                .setGroupId("flink_kafka")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 创建 Kafka 数据流
        DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(),
                "Kafka Source");

        // 转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = kafkaDS
                .flatMap(new FlatMapFunction<String, String>() {
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(word);
                        }
                    }
                }).map(new MapFunction<String, Tuple2<String, Integer>>() {
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                })
                .keyBy(tup -> tup.f0) // 使用 KeySelector 选择器选择数据
                .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
                .sum(1); // 使用 sum 或者 reduce 都可以

        // 输出
        wordCount.print();

        // 执行任务
        env.execute();
    }
}

3,运行测试

(1)首先我们创建好 Kafkatopic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic source_topic

(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic source_topic

(3)发送如下两条测试数据:
hangge com com
1 hangge

(4)可以看到控制台输出内容如下:

附:进阶技巧

1,Topic / Partition 订阅方式

(1)下面样例我们订阅 Topic 列表中所有 Partition 的消息:
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setTopics("source_topic1", "source_topic2", "source_topic3")
  .setGroupId("flink_kafka")
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build

(2)我们也可也订阅与正则表达式所匹配的 Topic 下的所有 Partition
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setTopics("source_topic*")
  .setGroupId("flink_kafka")
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build

(3)还可以通过 Partition 列表订阅指定的 Partition
// 指定 KafkaSource 相关配置
val partitionSet = Set(
  new TopicPartition("source_topic1", 0), // Partition 0 of topic "source_topic1"
  new TopicPartition("source_topic2", 3)  // Partition 3 of topic "source_topic2"
)
import scala.collection.JavaConverters._ // 用于转换 Set[TopicPartition] 为 Java Set
val source: KafkaSource[String] = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setPartitions(partitionSet.asJava)  // 使用转换后的 Java Set
  .setGroupId("flink_kafka")
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build

2,起始消费位点、自定提交 offset

(1)Kafka source 能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费 。内置的位点初始化器包括:
  • 从最早位点开始消费(默认)
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setTopics("source_topic")
  .setGroupId("flink_kafka")
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build
  • 从最末尾位点开始消费:
.setStartingOffsets(OffsetsInitializer.latest())
  • 从时间戳大于等于指定时间戳(毫秒)的数据开始消费:
.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
  • 从消费组提交的位点开始消费,不指定位点重置策略:
.setStartingOffsets(OffsetsInitializer.committedOffsets())
  • 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点:
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

(2)为了避免重复消费问题,确保程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
// 指定 KafkaSource 相关配置
val source = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setTopics("source_topic")
  .setGroupId("con")
  .setProperty("enable.auto.commit","true") // 启用自动提交消费位移(offset)的功能
  .setProperty("auto.commit.interval.ms","1000") // 设置自动提交消费位移的时间间隔为1秒
  // 消费者的起始位移被设置为已提交的最早位移
  .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build

3,动态分区检查

(1)为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。

(2)分区检查功能默认不开启。要启用动态分区检查,将 partition.discovery.interval.ms 设置为非负值即可。
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setTopics("source_topic")
  .setGroupId("flink_kafka")
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区
  .build
评论0