当前位置: > > > Flink SQL - 元数据Catalog使用详解(附:API、DDL两种操作方式)

Flink SQL - 元数据Catalog使用详解(附:API、DDL两种操作方式)

    我们在使用 Hive SQL 的时候感觉很方便,创建一次表之后,可以随时使用,这是因为 HiveMetastore 中对元数据进行了持久化存储。
    但是针对 Flink SQL 的使用就没有 Hive SQL 那么方便了,我们前面通过 Flink SQL 创建的表相当于都是临时表,默认只在当前任务的 Session 会话中有效,无法在其他任务中重复使用,想要重复使用只能重新再创建表,这样就比较麻烦了。
    为了解决这个问题,Flink SQL 提供了 Catalog 这个功能。本文将通过样例详细演示 Catalog 的使用。

一、基本介绍

1,什么是 Catalog?

(1)Catalog 主要为 Flink SQL 提供元数据管理,类似于 Hive 中的 Metastore

(2)Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink SQL 中。
例如Flink SQL 通过 Catalog 可以直接使用 Hive MetaStore 中的元数据,也可以将 Flink SQL 中的元数据信息存储到 HiveMetaStore 中。

(3)Catalog 相当于是在 Database 之上抽取出来的一个概念,它和 DatabaseTableView 之间的关系如下图所示:
  • Catalog 下面可以有多个 DatabaseDatabase 下面可以有多个 TableView
  • 其实我们在使用表的时候,表的全名是由三个部分组成的:Catalog名称.Database名称.表名称。
  • 可以在 Flink SQL 中通过 use 命令进入到对应的 CatalogDatabase

2,Catalog 类型

(1)目前 Flink SQL 提供以下四种 Catalog
  • GenericInMemoryCatalog:他是一个基于内存的 CatalogFlink SQL 的元数据只能在当前任务的 Session 会话中有效,默认使用的其实就是这种。
  • JdbcCatalog:他可以通过 JDBC 连接的方式将 Flink SQL 的元数据信息存储到关系型数据库中,目前只支持 PostgresMySQL 这两种数据库。
  • HiveCatalog:他有两个功能,第一个是作为 Flink SQL 元数据的持久化存储,第二个是作为读写现有 Hive 元数据的接口。
注意:在存储 Flink SQL 元数据信息的时候,HiveCatalog 会转为小写形式存储,而 GenericInMemoryCatalog 会区分大小写。
  • User-Defined Catalog:用户自定义的 Catalog

(2)其实在实际工作中最常使用的就是 GenericInMemoryCatalogHiveCatalog 了。
  • 如果不需要持久化保存元数据,则使用默认的 GenericInMemoryCatalog 即可。
  • 如果需要持久化保存元数据,则建议使用 HiveCatalog,这样既可以持久化 Flink SQL 的元数据信息,还可以读取 Hive 中已有的元数据。

3,准备工作

(1)本文主要来演示一下 HiveCatalog 的使用,因此首先要准备好 Hive 环境,具体可以参考我之前写的文章:

(2)接着,创建一个 hive-site.xml 文件:
vi hive-site.xml
  • 文件内容如下,里面只需要配置 hive.metastore.uris 属性即可:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.121.128:9083</value>
  </property>
</configuration>
  • 然后将 hive-site.xml 这个文件上传到 HDFS 上。
注意:具体的 HDFS 目录需要和后面代码中指定的 hiveConfDir 参数的值保持一致。
 hdfs dfs -mkdir -p /hive-conf
hdfs dfs -put hive-site.xml /hive-conf

(3)然后在上面配置的节点上启动 Hivemetastore 服务:
cd /usr/local/hive/
nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &

(4)最后,在使用 HiveCatalog 的程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
    <exclusions>
        <exclusion>
            <artifactId>log4j-slf4j-impl</artifactId>
            <groupId>org.apache.logging.log4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
  • 注意:我这里将 log4j-slf4j-imp 排除掉了,因为 slf4j-log4j12log4j-slf4j-imp 冲突了,实际上使用的是 org.slf4j.impl.Log4jLoggerFactoryslf4j-log4j12)。如果不排除,运行程序时会提示日志冲突(虽然不影响程序正常运行)。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

二、使用 API 方式创建并使用 Catalog

1,样例代码

(1)下面是 Scala 语言代码,代码的逻辑如下:
  • 首先创建 Catalog
  • 接着使用 Catalog
  • 使用指定 Database
  • 创建表
  • 查询表中的数据
  • 删除表
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveCatalogSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //使用API创建Catalog
    //catalogName这个参数必须要指定,名称可随意指定,因为最终在Hive的Metastore中不存储catalogName
    val catalogName = "myhivecatalog"
    //defaultDatabase这个参数必须指定Hive中存在的数据库名称
    val defaultDatabase = "default"
    //hiveConfDir这个参数可以不指定,不指定时Flink默认会到本地类路径下(resoures)查找hive-site.xml
    //hiveConfDir这个参数也可以指定具体的目录,建议使用HDFS,在HDFS目录下需要有hive-site.xml
    val hiveConfDir = "hdfs://192.168.121.128:9000/hive-conf" // 指定hive-site.xml存放的目录
    val hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir)
    //创建Catalog
    tEnv.registerCatalog(catalogName, hive)

    //使用Catalog
    tEnv.useCatalog("myhivecatalog") // 使用API

    //使用Database
    tEnv.useDatabase("default") // 使用API

    //创建表
    //注意:此时这个表的元数据信息会存储到HiveCatalog中指定的Hive Metastore中。
    val inTableSql =
    """
      |-- 增加了IF NOT EXISTS之后可以多次执行此SQL语句,只有表不存在时才会真正执行
      |CREATE TABLE IF NOT EXISTS orders (
      |    order_id     BIGINT,
      |    price        DECIMAL(10,2),
      |    order_time   TIMESTAMP
      |) WITH (
      |  'connector' = 'datagen',
      |  'rows-per-second' = '1'
      |)
      |""".stripMargin
    tEnv.executeSql(inTableSql)

    //查询表中的数据【此时也可以直接查询Hive中的表】
    tEnv.executeSql("SELECT * FROM orders").print()

    //删除表
    //tEnv.executeSql("DROP TABLE orders")
  }
}

(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class HiveCatalogSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持inBatchMode和inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 使用API创建Catalog
        // catalogName这个参数必须要指定,名称可随意指定,因为最终在Hive的Metastore中不存储catalogName
        String catalogName = "myhivecatalog";
        // defaultDatabase这个参数必须指定Hive中存在的数据库名称
        String defaultDatabase = "default";
        // hiveConfDir这个参数可不指定,不指定Flink默认会到本地类路径下(resources)查找hive-site.xml
        // hiveConfDir这个参数也可以指定具体的目录,建议使用HDFS,在HDFS目录下需要有hive-site.xml
        String hiveConfDir = "hdfs://192.168.121.128:9000/hive-conf"; // 指定hive-site.xml存放的目录
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
        // 创建Catalog
        tEnv.registerCatalog(catalogName, hive);

        // 使用Catalog
        tEnv.useCatalog("myhivecatalog"); // 使用API

        // 使用Database
        tEnv.useDatabase("default"); // 使用API

        // 创建表
        // 注意:此时这个表的元数据信息会存储到HiveCatalog中指定的Hive Metastore中。
        String inTableSql =
                "-- 增加了IF NOT EXISTS之后可以多次执行此SQL语句,只有表不存在时才会真正执行\n" +
                        "CREATE TABLE IF NOT EXISTS orders (\n" +
                        "    order_id     BIGINT,\n" +
                        "    price        DECIMAL(10,2),\n" +
                        "    order_time   TIMESTAMP\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1'\n" +
                        ")";
        tEnv.executeSql(inTableSql);

        // 查询表中的数据【此时也可以直接查询Hive中的表】
        tEnv.executeSql("SELECT * FROM orders").print();

        // 删除表
        // tEnv.executeSql("DROP TABLE orders");
    }
}

2,运行测试

(1)运行程序,可以看到控制台输出内容如下:

(2)我们到 HiveMetastoreMySQL)中查看 tbls 表,可以发现有新增的 orders 表信息,说明表的元数据信息成功存储在 HiveMetastoreMySQL)中。
注意:在 Flink SQL 中创建的表,虽然表的元数据信息存储在了 HiveMetastoreMySQL)中,但是这些表在 Hive 中是无法正常使用的,只能在 Flink SQL 中使用。而 Hive 中的表,在 Flink SQL 中是可以查询的。

三、使用 DDL 方式创建并使用 Catalog

1,样例代码

(1)下面是 Scala 语言代码,代码的逻辑如下:
  • 首先创建 Catalog
  • 接着使用 Catalog
  • 使用指定 Database
  • 创建表
  • 查询表中的数据
  • 删除表
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object HiveCatalogSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //使用DDL创建Catalog
    val catalogDDL =
    """
      |CREATE CATALOG myhivecatalog WITH (
      |    'type' = 'hive',
      |    'default-database' = 'default',
      |    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'
      |)
      |""".stripMargin
    tEnv.executeSql(catalogDDL)

    //使用Catalog
    tEnv.executeSql("USE CATALOG  myhivecatalog")// 使用DDL

    //使用Database
    //注意:使用默认的default数据库的时候,需要给default增加反引号转义标识,
    //否则SQL语法解析报错,因为default属于保留关键字
    tEnv.executeSql("USE `default`")// 使用DDLs

    //创建表
    //注意:此时这个表的元数据信息会存储到HiveCatalog中指定的Hive Metastore中。
    val inTableSql =
    """
      |-- 增加了IF NOT EXISTS之后可以多次执行此SQL语句,只有表不存在时才会真正执行
      |CREATE TABLE IF NOT EXISTS orders (
      |    order_id     BIGINT,
      |    price        DECIMAL(10,2),
      |    order_time   TIMESTAMP
      |) WITH (
      |  'connector' = 'datagen',
      |  'rows-per-second' = '1'
      |)
      |""".stripMargin
    tEnv.executeSql(inTableSql)

    //查询表中的数据【此时也可以直接查询Hive中的表】
    tEnv.executeSql("SELECT * FROM orders").print()

    //删除表
    //tEnv.executeSql("DROP TABLE orders")
  }
}

(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class HiveCatalogSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持inBatchMode和inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 使用API创建Catalog
        String catalogDDL = "CREATE CATALOG myhivecatalog WITH (\n" +
                "    'type' = 'hive',\n" +
                "    'default-database' = 'default',\n" +
                "    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'\n" +
                ")";
        tEnv.executeSql(catalogDDL);

        // 使用Catalog
        tEnv.executeSql("USE CATALOG myhivecatalog"); // 使用 DDL

        // 使用Database
        // 注意:使用默认的 default 数据库时,需要加反引号转义,
        // 否则 SQL 语法解析会报错,因为 default 是保留关键字
        tEnv.executeSql("USE `default`"); // 使用 DDLs

        // 创建表
        // 注意:此时这个表的元数据信息会存储到HiveCatalog中指定的Hive Metastore中。
        String inTableSql =
                "-- 增加了IF NOT EXISTS之后可以多次执行此SQL语句,只有表不存在时才会真正执行\n" +
                        "CREATE TABLE IF NOT EXISTS orders (\n" +
                        "    order_id     BIGINT,\n" +
                        "    price        DECIMAL(10,2),\n" +
                        "    order_time   TIMESTAMP\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1'\n" +
                        ")";
        tEnv.executeSql(inTableSql);

        // 查询表中的数据【此时也可以直接查询Hive中的表】
        tEnv.executeSql("SELECT * FROM orders").print();

        // 删除表
        // tEnv.executeSql("DROP TABLE orders");
    }
}

2,运行测试

(1)运行程序,可以看到控制台输出内容如下:

(2)我们到 HiveMetastoreMySQL)中查看 tbls 表,可以发现有新增的 orders 表信息,说明表的元数据信息成功存储在 HiveMetastoreMySQL)中。
注意:在 Flink SQL 中创建的表,虽然表的元数据信息存储在了 HiveMetastoreMySQL)中,但是这些表在 Hive 中是无法正常使用的,只能在 Flink SQL 中使用。而 Hive 中的表,在 Flink SQL 中是可以查询的。

附:使用表的完整名称

1,基本介绍

    如果在一个 Flink SQL 任务中需要操作多个 Catalog、多个 Database 中的表,频繁使用 use 语句是非常繁琐的,此时最好使用表的完整名称:Catalog名称.Database名称.表名称

2,样例代码

(1)下面是 Scala 语言实现代码:
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object HiveCatalogUseFullNameSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //创建Catalog
    val catalogDDL =
      """
        |CREATE CATALOG myhivecatalog WITH (
        |    'type' = 'hive',
        |    'default-database' = 'default',
        |    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'
        |)
        |""".stripMargin
    tEnv.executeSql(catalogDDL)

    //创建Database
    tEnv.executeSql("CREATE DATABASE IF NOT EXISTS myhivecatalog.flinkdb")

    //创建表
    //注意:此时这个表的元数据信息会存储到HiveCatalog中指定的Hive Metastore中。
    val inTableSql =
    """
      |-- 增加了IF NOT EXISTS之后可以多次执行此SQL语句,只有表不存在时才会真正执行
      |CREATE TABLE IF NOT EXISTS myhivecatalog.flinkdb.orders_bak (
      |    order_id     BIGINT,
      |    price        DECIMAL(10,2),
      |    order_time   TIMESTAMP
      |) WITH (
      |  'connector' = 'datagen',
      |  'rows-per-second' = '1'
      |)
      |""".stripMargin
    tEnv.executeSql(inTableSql)

    //查询表中的数据
    tEnv.executeSql("SELECT * FROM myhivecatalog.flinkdb.orders_bak").print()
    //注意:使用默认的default数据库的时候,需要给default增加反引号转义标识,
    //否则SQL语法解析报错,因为default属于保留关键字
    //tEnv.executeSql("SELECT * FROM myhivecatalog.`default`.orders").print()

    //删除表
    //tEnv.executeSql("DROP TABLE myhivecatalog.flinkdb.orders_bak")
  }
}

(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class HiveCatalogUseFullNameSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 创建 Catalog
        String catalogDDL = "CREATE CATALOG myhivecatalog WITH (\n" +
                "    'type' = 'hive',\n" +
                "    'default-database' = 'default',\n" +
                "    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'\n" +
                ")";
        tEnv.executeSql(catalogDDL);

        // 创建 Database
        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS myhivecatalog.flinkdb");

        // 创建表
        // 注意:此时这个表的元数据信息会存储到 HiveCatalog 中指定的 Hive Metastore 中。
        String inTableSql = "-- 增加了 IF NOT EXISTS 之后此SQL语句只有表不存在时才会真正执行\n" +
                "CREATE TABLE IF NOT EXISTS myhivecatalog.flinkdb.orders_bak (\n" +
                "    order_id     BIGINT,\n" +
                "    price        DECIMAL(10,2),\n" +
                "    order_time   TIMESTAMP\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second' = '1'\n" +
                ")";
        tEnv.executeSql(inTableSql);

        // 查询表中的数据
        tEnv.executeSql("SELECT * FROM myhivecatalog.flinkdb.orders_bak").print();

        // 注意:使用默认的 default 数据库时,需要给 default 增加反引号转义标识,避免 SQL 语法解析报错
        // tEnv.executeSql("SELECT * FROM myhivecatalog.`default`.orders").print();

        // 删除表(如果需要)
        // tEnv.executeSql("DROP TABLE myhivecatalog.flinkdb.orders_bak");
    }
}
评论0