当前位置: > > > Flink - Table API快速入门教程2(DataStream和Table互相转换)

Flink - Table API快速入门教程2(DataStream和Table互相转换)

    Table API&SQL 可以很容易的和 DataStreamDataSet 程序集成到一块。通过 TableEnvironment ,可以把 DataStream 或者 DataSet 注册为 Table,这样就可以使用 Table APISQL 查询了。此外,通过 TableEnvironment 也可以把 Table 对象转换为 DataStream 或者 DataSet,这样就可以使用 DataStream 或者 DataSet 中的相关 API 了。
注意:由于 DataSet APIFlink 1.15 之后已被弃用,并逐步被 DataStream API 取代。因此本文仅演示 DataStreamTable 之间的互相转换。

一、DataStream 转换为 Table

1,基本说明

(1)FlinkDataStream API 支持多样的数据类型。例如 TupleScala 内置,Flink Java tuplePython tuples)、POJO 类型、Scala case class 类型以及 FlinkRow 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。

(2)当 DataStream 转换为 Table 时,原先的数据类型到 tableschema 的映射有两种方式:基于字段位置或基于字段名称。

2,Tuple 类型数据的映射

(1)如果我们需要将包含 Tuple 类型数据的 DataStream 转换为 Table,则需要使用基于字段位置映射方式。下面是 Scala 语言代码:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.Schema

object DataStreamToTableScala {
  def main(args: Array[String]): Unit = {
    // 创建流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    // 创建示例 DataStream(Tuple 类型)
    val dataStream = env.fromElements(
      (1, "Alice", 1000.0),
      (2, "Bob", 2000.0),
      (3, "Charlie", 3000.0)
    )
    
    // 将 DataStream 转换为 Table
    import org.apache.flink.table.api._
    val table = tableEnv.fromDataStream(dataStream, $("id"), $("name"), $("salary"))

    // 在 Table 上执行 SQL 查询
    tableEnv.createTemporaryView("Employee", table)
    val resultTable = tableEnv.sqlQuery("SELECT id, name, salary FROM Employee WHERE salary > 1500")

    // 打印结果
    resultTable.execute().print()
  }
}

(2)下面是使用 Java 代码实现同样的功能:
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class DataStreamToTableJava {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建示例 DataStream(Tuple 类型)
        DataStream<Tuple3<Integer, String, Double>> dataStream = env.fromElements(
                Tuple3.of(1, "Alice", 1000.0),
                Tuple3.of(2, "Bob", 2000.0),
                Tuple3.of(3, "Charlie", 3000.0)
        );

        // 将 DataStream 转换为 Table
        Table table = tableEnv.fromDataStream(dataStream, $("id"), $("name"), $("salary"));

        // 在 Table 上执行 SQL 查询
        tableEnv.createTemporaryView("Employee", table);
        Table resultTable = tableEnv.sqlQuery(
                "SELECT id, name, salary FROM Employee WHERE salary > 1500");

        // 打印结果
        resultTable.execute().print();
    }
}

3,自定义 POJO 类型数据的映射

(1)如果我们需要将包含自定义 POJO 类型数据的 DataStream 转换为 Table,则需要使用基于字段名称映射方式,或者直接使用默认的自动映射。下面是 Scala 语言代码:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

// 定义 POJO 类
case class Employee(id: Int, name: String, salary: Double)

object DataStreamToTableScala {
  def main(args: Array[String]): Unit = {
    // 创建流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    // 创建示例 DataStream
    val dataStream = env.fromElements(
      Employee(1, "Alice", 1000.0),
      Employee(2, "Bob", 2000.0),
      Employee(3, "Charlie", 3000.0)
    )

    // 将 DataStream 注册为 Table
    val table = tableEnv.fromDataStream(dataStream)

    // 在 Table 上执行 SQL 查询
    tableEnv.createTemporaryView("Employee", table)
    val resultTable = tableEnv.sqlQuery("SELECT id, name, salary FROM Employee WHERE salary > 1500")

    // 打印结果
    resultTable.execute().print()
  }
}

(2)下面是使用 Java 代码实现同样的功能:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DataStreamToTableJava {
    // 定义 POJO 类
    public static class Employee {
        public int id;
        public String name;
        public double salary;

        // 默认构造函数(Flink 必须要求)
        public Employee() {}

        // 参数构造函数
        public Employee(int id, String name, double salary) {
            this.id = id;
            this.name = name;
            this.salary = salary;
        }
    }

    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建示例 DataStream
        DataStream<Employee> dataStream = env.fromElements(
                new Employee(1, "Alice", 1000.0),
                new Employee(2, "Bob", 2000.0),
                new Employee(3, "Charlie", 3000.0)
        );

        // 将 DataStream 注册为 Table
        Table table = tableEnv.fromDataStream(dataStream);

        // 在 Table 上执行 SQL 查询
        tableEnv.createTemporaryView("Employee", table);
        Table resultTable = tableEnv.sqlQuery(
                "SELECT id, name, salary FROM Employee WHERE salary > 1500");

        // 打印结果
        resultTable.execute().print();
    }
}

二、Table 转换为 DataStream

1,基本介绍

(1)Flink 提供了以下两种主要方法将 Table 转换为 DataStream
  • toDataStream:将 Table 转换为 DataStream,输出类型可以是通用的 Row 或自定义类型。
  • toChangelogStream:用于处理动态表(Changelog Table),即增量数据流的场景,例如带有更新、删除的表。

(2)为方便接下来的演示,我们在 D:/temp 目录下创建一个 input.txt 文件,文件内容如下,用于创建 table
1,hangge
2,小刘
3,老王

2,Table 转换为 DataStream[Row]

(1)toChangelogStream 方法可以将 Table 转换为通用的 DataStream<Row> 类型,下面样例使用使用的是 Scala 语言:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object TableToDataStreamExample {
  def main(args: Array[String]): Unit = {
    //StreamTableEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    //创建输入表
    tableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:/temp/input.txt',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //获取table
    val table = tableEnv.from("myTable")

    // 将 Table 转换为 DataStream[Row]
    val rowStream = tableEnv.toDataStream(table)

    // 使用DataStream API进行数据处理,然后打印过滤后的数据
    rowStream.filter(_.getField(0).asInstanceOf[Int] > 1).print()

    env.execute("Table to DataStream Example")
  }
}
  • 下面是使用 Java 实现同样功能:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TableToDataStreamExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建输入表
        tableEnv.executeSql(
                "CREATE TABLE myTable (\n" +
                        "  id INT,\n" +
                        "  name STRING\n" +
                        ") WITH (\n" +
                        "  'connector.type' = 'filesystem',\n" +
                        "  'connector.path' = 'D:/temp/input.txt',\n" +
                        "  'format.type' = 'csv'\n" +
                        ")"
        );

        // 获取 Table
        Table table = tableEnv.from("myTable");

        // 将 Table 转换为 DataStream<Row>
        DataStream<Row> rowStream = tableEnv.toDataStream(table);

        // 使用 DataStream API 进行数据处理
        rowStream
                .filter(row -> (int) row.getField(0) > 1) // 过滤 id > 1 的数据
                .print();

        // 执行任务
        env.execute("Table to DataStream Example");
    }
}

(2)运行结果如下:

3,Table 转换为自定义类型

(1)toChangelogStream 方法还可以可以将 Table 转换为特定的类型,如 Case Class。下面样例使用 scala 语言将其转换为自定义 Employee 类型。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

case class Employee(id: Int, name: String)

object TableToDataStreamExample {
  def main(args: Array[String]): Unit = {
    //StreamTableEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    //创建输入表
    tableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:/temp/input.txt',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //获取table
    val table = tableEnv.from("myTable")

    // 将 Table 转换为 DataStream[Employee]
    val employeeStream = tableEnv.toDataStream(table, classOf[Employee])

    // 使用DataStream API进行数据处理,然后打印过滤后的数据
    employeeStream.filter(_.id > 1).print()

    env.execute("Table to DataStream Example")
  }
}
  • 下面是使用 Java 实现同样功能:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// 定义 POJO 类
public class TableToDataStreamExampleJava {

    public static class Employee {
        public int id;
        public String name;

        // 必须有无参构造方法
        public Employee() {}

        public Employee(int id, String name) {
            this.id = id;
            this.name = name;
        }
        @Override
        public String toString() {
            return "Employee{id=" + id + ", name=" + name + '}';
        }
    }

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建输入表
        tableEnv.executeSql(
                "CREATE TABLE myTable (\n" +
                        "  id INT,\n" +
                        "  name STRING\n" +
                        ") WITH (\n" +
                        "  'connector.type' = 'filesystem',\n" +
                        "  'connector.path' = 'D:/temp/input.txt',\n" +
                        "  'format.type' = 'csv'\n" +
                        ")"
        );

        // 获取 Table
        Table table = tableEnv.from("myTable");

        // 将 Table 转换为 DataStream<Employee>
        DataStream<Employee> employeeStream = tableEnv.toDataStream(table, Employee.class);

        // 使用 DataStream API 进行数据处理,然后打印过滤后的数据
        employeeStream
                .filter(employee -> employee.id > 1) // 过滤 id > 1 的数据
                .print();

        // 执行任务
        env.execute("Table to DataStream Example");
    }
}

(2)运行结果如下:

4,处理动态表(Changelog Table)

(1)如果 Table 是动态表(如增删改数据的结果),可以使用 toChangelogStream 方法。例如:通过 SQL 查询聚合数据后,输出增量更新。
(2)下面是 scala 语言样例:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object TableToDataStreamExample {
  def main(args: Array[String]): Unit = {
    //StreamTableEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    //创建输入表
    tableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string,\n" +
      "salary int\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:/temp/input.txt',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //获取table
    val table = tableEnv.from("myTable")

    //使用SQL实现聚合
    val result = tableEnv.sqlQuery("select name,SUM(salary) from myTable GROUP BY name")

    // 将 Table 转换为 DataStream[Row]
    val rowStream = tableEnv.toChangelogStream(result)

    // 打印DataStream数据
    rowStream.print()

    env.execute("Table to DataStream Example")
  }
}
  • 下面是使用 java 语言实现同样的功能:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TableToDataStreamExampleJava {
    public static void main(String[] args) throws Exception {
        // StreamTableEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建输入表
        tableEnv.executeSql(
                "CREATE TABLE myTable (\n" +
                        "id INT,\n" +
                        "name STRING,\n" +
                        "salary INT\n" +
                        ") WITH (\n" +
                        "'connector.type' = 'filesystem',\n" +
                        "'connector.path' = 'D:/temp/input.txt',\n" +
                        "'format.type' = 'csv'\n" +
                        ")"
        );

        // 获取 Table
        Table table = tableEnv.from("myTable");

        // 使用 SQL 实现聚合
        Table result = tableEnv.sqlQuery("SELECT name, SUM(salary) FROM myTable GROUP BY name");

        // 将 Table 转换为 DataStream<Row>
        DataStream<Row> rowStream = tableEnv.toChangelogStream(result);

        // 打印DataStream数据
        rowStream.print();

        env.execute("Table to DataStream Example");
    }
}

(3)准备测试数据 input.txt,内容如下:
1,hangge,100
2,小刘,150
3,老王,100
4,hangge,200
5,老王,100

(4)程序启动后运行结果如下:
  • +I 表示插入记录
  • -U 表示更新前的记录
评论0