当前位置: > > > Flink - Table API快速入门教程1(安装配置,文件的读写样例)

Flink - Table API快速入门教程1(安装配置,文件的读写样例)

    Flink 针对标准的流处理和批处理提供了两种关系型 APITable APISQLTable API 允许用户以一种很直观的方式进行 selectfilterjoin 操作。Flink SQL 基于 Apache Calcite 实现标准 SQL。针对批处理和流处理可以提供相同的处理语义和结果。本文首先通过样例演示 Flink Table API 的使用。

1,安装配置

(1)如果我们想要使用 Table API 的话,需要在 pom.xml 添加如下的依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.16.0</version>
</dependency>

(2)由于部分 table 相关的代码是用 Scala 实现的,所以,下面这个依赖也是必须的。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>

2,样例代码

(1)下面我们使用 scala 语言编写一个 Table API 使用样例,从文本文件中读取表格数据,然后筛选后将结果输出至本地文件中。
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._

object TableAPIAndSQLOpScala {
  def main(args: Array[String]): Unit = {
    //获取TableEnvironment
    val sSettings = EnvironmentSettings.newInstance.inStreamingMode().build()
    val sTableEnv = TableEnvironment.create(sSettings)
    //创建输入表
    /**
     * connector.type:指定connector的类型
     * connector.path:指定文件或者目录地址
     * format.type:文件数据格式化类型,现在只支持csv格式
     * 注意:SQL语句如果出现了换行,行的末尾可以添加空格或者\n都可以,最后一行不用添
     */
    sTableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:/temp/input.txt',\n" +
      "'format.type' = 'csv'\n" +
      ")")
    
    //使用Table API实现数据查询和过滤等操作
    /*import org.apache.flink.table.api._
    val result = sTableEnv.from("myTable")
        .select($"id",$"name")
        .filter($"id" > 1)*/

    //使用SQL实现数据查询和过滤等操作
    val result = sTableEnv.sqlQuery("select id,name from myTable where id > 1")

    //输出结果到控制台
    result.execute.print()

    //创建输出表
    sTableEnv.executeSql("" +
      "create table newTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:/temp/output',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //输出结果到表newTable中
    result.executeInsert("newTable")
  }
}
  • 注意:针对 SQL 建表语句的写法还有一种比较清晰的写法。
sTableEnv.executeSql(
  """
    |create table myTable(
    |id int,
    |name string
    |) with (
    |'connector.type' = 'filesystem',
    |'connector.path' = 'D:/temp/input.txt',
    |'format.type' = 'csv'
    |)
    |""".stripMargin)

(2)下面是使用 Java 语言实现同样的功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class TableAPIAndSQLOpJava {
    public static void main(String[] args) {
        //获取TableEnvironment
        EnvironmentSettings sSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment sTableEnv = TableEnvironment.create(sSettings);

        //创建输入表
        sTableEnv.executeSql("create table myTable(\n" +
                "id int,\n" +
                "name string\n" +
                ") with (\n" +
                "'connector.type' = 'filesystem',\n" +
                "'connector.path' = 'D:/temp/input.txt',\n" +
                "'format.type' = 'csv'\n" +
                ")");

        //使用Table API实现数据查询和过滤等操作

        /*Table result = sTableEnv.from("myTable")
                .select($("id"), $("name"))
                .filter($("id").isGreater(1));*/

        //使用SQL实现数据查询和过滤等操作
        Table result = sTableEnv.sqlQuery("select id,name from myTable where id > 1");

        //输出结果到控制台
        result.execute().print();

        //创建输出表
        sTableEnv.executeSql("" +
                "create table newTable(\n" +
                "id int,\n" +
                "name string\n" +
                ") with (\n" +
                "'connector.type' = 'filesystem',\n" +
                "'connector.path' = 'D:/temp/output',\n" +
                "'format.type' = 'csv'\n" +
                ")");

        //输出结果到表newTable中
        result.executeInsert("newTable");
    }
}

3,运行测试

(1)首先我们在 D:/temp 目录下创建一个 input.txt 文件,文件内容如下:
1,hangge
2,小刘
3,老王

(2)运行程序,可以看到控制台输出内容如下:

(3)同时可以看到 D:\temp\output 目录下生成了相关的输出文件(16 个文件是因为我的 CPU16 核)。当然,由于输出的内容只有两条数据,因此只有 1116 文件里面各有一条,其它文件都是空。


(4)打开其中的 11 号文件,可以看到里面内容如下:
评论0