Flink SQL - DDL语句使用详解2(读取Kafka创建表、计算输出至Kafka)
数据源和目的地都是 Kafka 这种情况再实际工作中比较常见,可以实现数据的实时计算。下面通过样例进行演示。
一、Kafka(Source) + Kafka(Sink)案例
1,准备工作
首先我们需要在项目的 pom.xml 中添加 Flink SQL 相关的依赖,以及 flink-connector-kafka、flink-json 依赖。
提示:Kafka 这个 Connector 需要使用 flink-connector-kafka 这个依赖,在操作 json 格式数据的时候需要用到 flink-json 这个依赖。
<!-- Flink clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <!-- Flink SQL --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.16.0</version> </dependency> <!-- flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.16.0</version> </dependency> <!-- flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency>
2,样例代码
(1)下面是 Scala 语言代码,具体逻辑为:
- 使用 SQL 定义了一个名为 kafka_source 的输入表,绑定到 Kafka 的主题 dt001。
- 使用 SQL 定义了一个名为 kafka_sink 的输出表,绑定到 Kafka 的主题 dt002。
- 从 kafka_source 表中读取数据。 过滤条件:仅保留 age > 10 的记录。 将符合条件的记录插入到 kafka_sink 表。
注意:在 FlinkSQL 中创建表的时候,需要注意,表名区分大小写。
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object KafkaSourceSinkSQL { def main(args: Array[String]): Unit = { //创建执行环境 val settings = EnvironmentSettings .newInstance() //指定执行模式,支持inBatchMode和inStreamingMode .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) //创建输入表 val inTableSql = """ |CREATE TABLE kafka_source( | name STRING, | age INT |)WITH( | 'connector' = 'kafka', | 'topic' = 'dt001', | 'properties.bootstrap.servers' = '192.168.121.128:9092', | 'properties.group.id' = 'gid-sql-1', | 'scan.startup.mode' = 'group-offsets', | 'properties.auto.offset.reset' = 'latest', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin tEnv.executeSql(inTableSql) //创建输出表 val outTableSql = """ |CREATE TABLE kafka_sink( | name STRING, | age INT |)WITH( | 'connector' = 'kafka', | 'topic' = 'dt002', | 'properties.bootstrap.servers' = '192.168.121.128:9092', | 'format' = 'json', | 'sink.partitioner' = 'default' |) |""".stripMargin tEnv.executeSql(outTableSql) //业务逻辑 val execSql_append = """ |INSERT INTO kafka_sink |SELECT | name, | age |FROM kafka_source |WHERE age > 10 |""".stripMargin tEnv.executeSql(execSql_append) } }
(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class KafkaSourceSinkSQLJava { public static void main(String[] args) { // 创建执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() // 指定执行模式,支持 inBatchMode 和 inStreamingMode .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 创建输入表 String inTableSql = "CREATE TABLE kafka_source (" + " name STRING," + " age INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'dt001'," + " 'properties.bootstrap.servers' = '192.168.121.128:9092'," + " 'properties.group.id' = 'gid-sql-1'," + " 'scan.startup.mode' = 'group-offsets'," + " 'properties.auto.offset.reset' = 'latest'," + " 'format' = 'json'," + " 'json.fail-on-missing-field' = 'false'," + " 'json.ignore-parse-errors' = 'true'" + ")"; tEnv.executeSql(inTableSql); // 创建输出表 String outTableSql = "CREATE TABLE kafka_sink (" + " name STRING," + " age INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'dt002'," + " 'properties.bootstrap.servers' = '192.168.121.128:9092'," + " 'format' = 'json'," + " 'sink.partitioner' = 'default'" + ")"; tEnv.executeSql(outTableSql); // 业务逻辑 String execSqlAppend = "INSERT INTO kafka_sink " + "SELECT " + " name, " + " age " + "FROM kafka_source " + "WHERE age > 10"; tEnv.executeSql(execSqlAppend); } }
3,运行测试
(1)首先我们创建 dt001、dt002 这两个 topic:
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt001 kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dt002
(2)然后我们开启一个基于控制台的生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic dt001
(3)接着我们开启一个基于控制台的消费者:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dt002
(4)启动程序,然后我们在生产者中依次模拟产生如下三条数据:
{"name":"hangge","age":19} {"name":"小李","age":10} {"name":"老王","age":99}

(5)然后在消费者这边可以看到消费的数据如下。这样就说明这个基于 Kafka 的 FlinkSQL 任务正常执行了。

附:删除表
1,什么情况下需要删除表?
(1)针对本文的样例场景其实不需要删除表,因为这个表是临时的,只在当前 Flink 任务内部有效,任务执行结果后,我们定义的表就没有了,下次想要使用的话需要重启任务,这样就会自动重新创建表。
(2)如果我们能够将表的元数据信息,也就是表的 schema 信息持久化存储起来,那么删除表就有意义了
2,如何删除表?
如果在 Flink 任务中需要去删除某个表,可以使用 drop table 语句。
tEnv.executeSql("DROP TABLE kafka_sink")