Flink - Table API快速入门教程1(安装配置,文件的读写样例)
Flink 针对标准的流处理和批处理提供了两种关系型 API,Table API 和 SQL。Table API 允许用户以一种很直观的方式进行 select 、filter 和 join 操作。Flink SQL 基于 Apache Calcite 实现标准 SQL。针对批处理和流处理可以提供相同的处理语义和结果。本文首先通过样例演示 Flink Table API 的使用。
(2)由于部分 table 相关的代码是用 Scala 实现的,所以,下面这个依赖也是必须的。
(2)下面是使用 Java 语言实现同样的功能:
(2)运行程序,可以看到控制台输出内容如下:

1,安装配置
(1)如果我们想要使用 Table API 的话,需要在 pom.xml 添加如下的依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.16.0</version> </dependency>
(2)由于部分 table 相关的代码是用 Scala 实现的,所以,下面这个依赖也是必须的。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency>
2,样例代码
(1)下面我们使用 scala 语言编写一个 Table API 使用样例,从文本文件中读取表格数据,然后筛选后将结果输出至本地文件中。
import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.TableEnvironment import org.apache.flink.api.scala._ object TableAPIAndSQLOpScala { def main(args: Array[String]): Unit = { //获取TableEnvironment val sSettings = EnvironmentSettings.newInstance.inStreamingMode().build() val sTableEnv = TableEnvironment.create(sSettings) //创建输入表 /** * connector.type:指定connector的类型 * connector.path:指定文件或者目录地址 * format.type:文件数据格式化类型,现在只支持csv格式 * 注意:SQL语句如果出现了换行,行的末尾可以添加空格或者\n都可以,最后一行不用添 */ sTableEnv.executeSql("" + "create table myTable(\n" + "id int,\n" + "name string\n" + ") with (\n" + "'connector.type' = 'filesystem',\n" + "'connector.path' = 'D:/temp/input.txt',\n" + "'format.type' = 'csv'\n" + ")") //使用Table API实现数据查询和过滤等操作 /*import org.apache.flink.table.api._ val result = sTableEnv.from("myTable") .select($"id",$"name") .filter($"id" > 1)*/ //使用SQL实现数据查询和过滤等操作 val result = sTableEnv.sqlQuery("select id,name from myTable where id > 1") //输出结果到控制台 result.execute.print() //创建输出表 sTableEnv.executeSql("" + "create table newTable(\n" + "id int,\n" + "name string\n" + ") with (\n" + "'connector.type' = 'filesystem',\n" + "'connector.path' = 'D:/temp/output',\n" + "'format.type' = 'csv'\n" + ")") //输出结果到表newTable中 result.executeInsert("newTable") } }
- 注意:针对 SQL 建表语句的写法还有一种比较清晰的写法。
sTableEnv.executeSql( """ |create table myTable( |id int, |name string |) with ( |'connector.type' = 'filesystem', |'connector.path' = 'D:/temp/input.txt', |'format.type' = 'csv' |) |""".stripMargin)
(2)下面是使用 Java 语言实现同样的功能:
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class TableAPIAndSQLOpJava { public static void main(String[] args) { //获取TableEnvironment EnvironmentSettings sSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); TableEnvironment sTableEnv = TableEnvironment.create(sSettings); //创建输入表 sTableEnv.executeSql("create table myTable(\n" + "id int,\n" + "name string\n" + ") with (\n" + "'connector.type' = 'filesystem',\n" + "'connector.path' = 'D:/temp/input.txt',\n" + "'format.type' = 'csv'\n" + ")"); //使用Table API实现数据查询和过滤等操作 /*Table result = sTableEnv.from("myTable") .select($("id"), $("name")) .filter($("id").isGreater(1));*/ //使用SQL实现数据查询和过滤等操作 Table result = sTableEnv.sqlQuery("select id,name from myTable where id > 1"); //输出结果到控制台 result.execute().print(); //创建输出表 sTableEnv.executeSql("" + "create table newTable(\n" + "id int,\n" + "name string\n" + ") with (\n" + "'connector.type' = 'filesystem',\n" + "'connector.path' = 'D:/temp/output',\n" + "'format.type' = 'csv'\n" + ")"); //输出结果到表newTable中 result.executeInsert("newTable"); } }
3,运行测试
(1)首先我们在 D:/temp 目录下创建一个 input.txt 文件,文件内容如下:
1,hangge 2,小刘 3,老王
(2)运行程序,可以看到控制台输出内容如下:

(3)同时可以看到 D:\temp\output 目录下生成了相关的输出文件(16 个文件是因为我的 CPU 是 16 核)。当然,由于输出的内容只有两条数据,因此只有 11、16 文件里面各有一条,其它文件都是空。

(4)打开其中的 11 号文件,可以看到里面内容如下:
