当前位置: > > > Flink SQL - DDL语句使用详解2(读取Kafka创建表、计算输出至Kafka)

Flink SQL - DDL语句使用详解2(读取Kafka创建表、计算输出至Kafka)

    数据源和目的地都是 Kafka 这种情况再实际工作中比较常见,可以实现数据的实时计算。下面通过样例进行演示。

一、Kafka(Source) + Kafka(Sink)案例

1,准备工作

首先我们需要在项目的 pom.xml 中添加 Flink SQL 相关的依赖,以及 flink-connector-kafkaflink-json 依赖。
提示Kafka 这个 Connector 需要使用 flink-connector-kafka 这个依赖,在操作 json 格式数据的时候需要用到 flink-json 这个依赖。
<!-- Flink clients -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- Flink SQL -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- flink-json -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.0</version>
</dependency>

2,样例代码

(1)下面是 Scala 语言代码,具体逻辑为:
  • 使用 SQL 定义了一个名为 kafka_source 的输入表,绑定到 Kafka 的主题 dt001
  • 使用 SQL 定义了一个名为 kafka_sink 的输出表,绑定到 Kafka 的主题 dt002
  • kafka_source 表中读取数据。 过滤条件:仅保留 age > 10 的记录。 将符合条件的记录插入到 kafka_sink 表。
注意:在 FlinkSQL 中创建表的时候,需要注意,表名区分大小写。
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object KafkaSourceSinkSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE kafka_source(
        |  name STRING,
        |  age INT
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'dt001',
        |  'properties.bootstrap.servers' = '192.168.121.128:9092',
        |  'properties.group.id' = 'gid-sql-1',
        |  'scan.startup.mode' = 'group-offsets',
        |  'properties.auto.offset.reset' = 'latest',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //创建输出表
    val outTableSql =
      """
        |CREATE TABLE kafka_sink(
        |  name STRING,
        |  age INT
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'dt002',
        |  'properties.bootstrap.servers' = '192.168.121.128:9092',
        |  'format' = 'json',
        |  'sink.partitioner' = 'default'
        |)
        |""".stripMargin
    tEnv.executeSql(outTableSql)

    //业务逻辑
    val execSql_append =
      """
        |INSERT INTO kafka_sink
        |SELECT
        |  name,
        |  age
        |FROM kafka_source
        |WHERE age > 10
        |""".stripMargin

    tEnv.executeSql(execSql_append)
  }
}

(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class KafkaSourceSinkSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 创建输入表
        String inTableSql = "CREATE TABLE kafka_source (" +
                "  name STRING," +
                "  age INT" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'dt001'," +
                "  'properties.bootstrap.servers' = '192.168.121.128:9092'," +
                "  'properties.group.id' = 'gid-sql-1'," +
                "  'scan.startup.mode' = 'group-offsets'," +
                "  'properties.auto.offset.reset' = 'latest'," +
                "  'format' = 'json'," +
                "  'json.fail-on-missing-field' = 'false'," +
                "  'json.ignore-parse-errors' = 'true'" +
                ")";
        tEnv.executeSql(inTableSql);

        // 创建输出表
        String outTableSql = "CREATE TABLE kafka_sink (" +
                "  name STRING," +
                "  age INT" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'dt002'," +
                "  'properties.bootstrap.servers' = '192.168.121.128:9092'," +
                "  'format' = 'json'," +
                "  'sink.partitioner' = 'default'" +
                ")";
        tEnv.executeSql(outTableSql);

        // 业务逻辑
        String execSqlAppend = "INSERT INTO kafka_sink " +
                "SELECT " +
                "  name, " +
                "  age " +
                "FROM kafka_source " +
                "WHERE age > 10";
        tEnv.executeSql(execSqlAppend);
    }
}

3,运行测试

(1)首先我们创建 dt001dt002 这两个 topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt001
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt002

(2)然后我们开启一个基于控制台的生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic dt001

(3)接着我们开启一个基于控制台的消费者:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dt002

(4)启动程序,然后我们在生产者中依次模拟产生如下三条数据:
{"name":"hangge","age":19}
{"name":"小李","age":10}
{"name":"老王","age":99}

(5)然后在消费者这边可以看到消费的数据如下。这样就说明这个基于 KafkaFlinkSQL 任务正常执行了。

附:删除表

1,什么情况下需要删除表?

(1)针对本文的样例场景其实不需要删除表,因为这个表是临时的,只在当前 Flink 任务内部有效,任务执行结果后,我们定义的表就没有了,下次想要使用的话需要重启任务,这样就会自动重新创建表。
(2)如果我们能够将表的元数据信息,也就是表的 schema 信息持久化存储起来,那么删除表就有意义了

2,如何删除表?

如果在 Flink 任务中需要去删除某个表,可以使用 drop table 语句。
tEnv.executeSql("DROP TABLE kafka_sink")
评论0