当前位置: > > > Flink SQL - 兼容Hive SQL教程详解(调用Hive SQL函数、使用Hive SQL语法)

Flink SQL - 兼容Hive SQL教程详解(调用Hive SQL函数、使用Hive SQL语法)

    由于 Flink SQLHive SQL 底层使用的 SQL 解析引擎不一样,所以目前 Flink SQL 中无法直接使用 Hive SQL 中的函数和语法。但是在工作中大家可能前期使用 Hive 比较多,对 Hive SQL 的各种用法非常熟悉,现在切换到 Flink SQL 之后,一些 Hive SQL 中支持的语法和函数无法在 Flink SQL 中使用,感觉用起来不太顺手。
    基于这些问题,Flink SQL 提供的有解决方案。
  • 通过 HiveModule 实现 Hive SQL 函数的兼容,主要是针对函数的支持。
  • 通过 HiveDialect 实现 Hive SQL 语法的兼容,主要是针对 DDLDML 语句的支持。
    下面通过样例分别演示这两种解决方案。

一、通过 HiveModule 实现 Hive SQL 函数的兼容

1,基本介绍

(1)Flink SQL 中可以支持多种 Module,针对这些 ModuleFlink SQL 提供了一些通用命令:
  • 通过 LOAD 命令:可以加载 Flink 中内置的或者用户自定义的 Module
  • 通过 UNLOAD:可以卸载 Flink 中内置的或者用户自定义的 Module
(2)HiveModule 属于其中一种实现,通过 HiveModule 可以在 Flink SQL 中使用 Hive 中的函数。

2,准备工作

要使用 HiveModule 吗,首先程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>

3,样例代码

(1)该样例通过加载 HiveModule,从而使用 Hive SQL 中支持的 GET_JSON_OBJECT 函数。下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.module.hive.HiveModule

object UseHiveModuleSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //查看目前可以使用的Module
    tEnv.executeSql("SHOW FULL MODULES").print()

    //加载Hive Module 使用API
    tEnv.loadModule("hive",new HiveModule("3.1.2"))
    //加载Hive Module 使用DDL
    //tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')")

    //使用Hive Module,禁用Core Module
    //tEnv.executeSql("USE MODULES hive")

    //使用Core Module,禁用Hive Module
    //tEnv.executeSql("USE MODULES core")

    //使用Hive Module和Core Module,改变加载Module的顺序
    //tEnv.executeSql("USE MODULES hive,core")

    //卸载Hive Module
    //tEnv.executeSql("UNLOAD MODULE hive")

    //GET_JSON_OBJECT是Hive SQL中支持的函数,Flink SQL中是不支持的
    val execSql =
      """
        |SELECT
        |GET_JSON_OBJECT('{"name":"tom","age":20}','$.name') AS name
        |""".stripMargin

    tEnv.executeSql(execSql).print()
  }
}
  • 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.module.hive.HiveModule;

public class UseHiveModuleSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();

        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 查看目前可以使用的 Module
        tEnv.executeSql("SHOW FULL MODULES").print();

        // 加载 Hive Module 使用 API
        tEnv.loadModule("hive", new HiveModule("3.1.2"));

        // 如果需要使用 DDL 加载 Hive Module,可以使用以下代码
        // tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");

        // 使用 Hive Module,禁用 Core Module
        // tEnv.executeSql("USE MODULES hive");

        // 使用 Core Module,禁用 Hive Module
        // tEnv.executeSql("USE MODULES core");

        // 使用 Hive Module 和 Core Module,改变加载 Module 的顺序
        // tEnv.executeSql("USE MODULES hive,core");

        // 卸载 Hive Module
        // tEnv.executeSql("UNLOAD MODULE hive");

        // GET_JSON_OBJECT 是 Hive SQL 中支持的函数,Flink SQL 中是不支持的
        String execSql = "SELECT " +
                        "GET_JSON_OBJECT('{\"name\":\"tom\",\"age\":20}', '$.name') AS name";

        tEnv.executeSql(execSql).print();
    }
}

(2)运行程序,可以看到控制台输出内容如下,说明我们成功调用了 Hive SQL 中的 GET_JSON_OBJECT 函数。

二、通过 HiveDialect 实现 Hive SQL 语法的兼容

1,基本介绍

(1)如果想要让 Flink SQL 兼容 Hive SQL 的语法,可以将 Flink SQL 的方言改为 hive,默认方言是 default。 如果将 Flink SQL 的方言改为 hive,那么就可以支持 Hive SQL 的各种语法特性了。

(2)注意:想要开启 hive 方言,必须要使用 HiveCatalog。因为开启了 Hive 方言,是可以在 Flink SQL 中使用 Hive SQL 语法去创建 Hive 表的,Hive 表是需要存储在 Hive Metastore 中的,所以需要使用 HiveCatalog

2,准备工作

(1)本文主要来演示一下 HiveCatalog 的使用,因此首先要准备好 Hive 环境,具体可以参考我之前写的文章:

(2)接着,创建一个 hive-site.xml 文件:
vi hive-site.xml
  • 文件内容如下,里面只需要配置 hive.metastore.uris 属性即可:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.121.128:9083</value>
  </property>
</configuration>
  • 然后将 hive-site.xml 这个文件上传到 HDFS 上。
注意:具体的 HDFS 目录需要和后面代码中指定的 hiveConfDir 参数的值保持一致。
 hdfs dfs -mkdir -p /hive-conf
hdfs dfs -put hive-site.xml /hive-conf

(3)然后在上面配置的节点上启动 Hivemetastore 服务:
cd /usr/local/hive/
nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &

(4)最后,在使用 HiveCatalog 的程序项目中需要先引入对应的依赖:
<!-- hive-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
    <exclusions>
        <exclusion>
            <artifactId>log4j-slf4j-impl</artifactId>
            <groupId>org.apache.logging.log4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
  • 注意:我这里将 log4j-slf4j-imp 排除掉,因为 slf4j-log4j12log4j-slf4j-imp 冲突了,实际上使用的是 org.slf4j.impl.Log4jLoggerFactoryslf4j-log4j12)。如果不排除,运行程序时会提示日志冲突(虽然不影响程序正常运行)。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/hangge/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

3,样例代码

(1)该样例我们使用 HiveSQL 建表语句来创建表,下面是 Scala 语言代码:
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment}

object UseHiveDialectSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //创建Catalog
    val catalogDDL =
      """
        |CREATE CATALOG myhivecatalog WITH (
        |    'type' = 'hive',
        |    'default-database' = 'default',
        |    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'
        |)
        |""".stripMargin
    tEnv.executeSql(catalogDDL)

    //进入HiveCatalog和default数据库
    tEnv.executeSql("USE CATALOG  myhivecatalog")// 使用DDL
    tEnv.executeSql("USE `default`")// 使用DDL

    //设置使用hive方言【注意:此时必须使用HiveCatalog】
    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

    // FlinkSQL建表语句执行会报错
    val flinkSqlDDL =
      """
        |CREATE TABLE IF NOT EXISTS orders_test (
        |    order_id     BIGINT,
        |    price        DECIMAL(10,2),
        |    order_time   TIMESTAMP
        |) WITH (
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1'
        |)
        |""".stripMargin
    //tEnv.executeSql(flinkSqlDDL)

    //HiveSQL建表语句可以正常执行
    val hiveSqlDDL =
      """
        |CREATE TABLE IF NOT EXISTS flink_stu(
        |id INT,
        |name STRING
        |)
        |""".stripMargin
    tEnv.executeSql(hiveSqlDDL)

    //tEnv.executeSql("SELECT * FROM flink_stu").print()
  }
}
  • 下面是使用 Java 语言实现同样功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;

public class UseHiveDialectSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式,支持 inBatchMode 和 inStreamingMode
                .inStreamingMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 创建 Catalog
        String catalogDDL =
                "CREATE CATALOG myhivecatalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'default-database' = 'default',\n" +
                        "    'hive-conf-dir' = 'hdfs://192.168.121.128:9000/hive-conf'\n" +
                        ")";
        tEnv.executeSql(catalogDDL);

        // 进入 Hive Catalog 和 default 数据库
        tEnv.executeSql("USE CATALOG myhivecatalog");  // 使用 DDL
        tEnv.executeSql("USE `default`");  // 使用 DDL

        // 设置使用 Hive 方言【注意:此时必须使用 HiveCatalog】
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        // FlinkSQL 建表语句执行会报错
        String flinkSqlDDL =
                "CREATE TABLE IF NOT EXISTS orders_test (\n" +
                        "    order_id     BIGINT,\n" +
                        "    price        DECIMAL(10,2),\n" +
                        "    order_time   TIMESTAMP\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1'\n" +
                        ")";
        // tEnv.executeSql(flinkSqlDDL);  // 如果需要执行此语句,取消注释

        // HiveSQL 建表语句可以正常执行
        String hiveSqlDDL =
                "CREATE TABLE IF NOT EXISTS flink_stu(\n" +
                        "id INT,\n" +
                        "name STRING\n" +
                        ")";
        tEnv.executeSql(hiveSqlDDL);

        // tEnv.executeSql("SELECT * FROM flink_stu").print();  // 如果需要打印结果,取消注释
    }
}

(2)上面样例在执行 flinkSqlDDL 语句的时候,程序会报错,提示语法错误,而使用 hiveSqlDDL 语句是可以正常创建表的,并且这个表也会存储在 Hive Metastore 中,到 MySQLtbls 表中是可以看到的。

4,hive 方言创建的表与 default 方言创建的表对比

(1)我们在 Hive 客户端中查看 flink_stu 表的详细信息:
show create table flink_stu;
  • 可以发现这个使用 hive 方言创建的表就是常规的 Hive 表。

(2)而我们之前文章(点击查看)创建的 orders 表就是 Flink SQLdefault 方言创建的,在 Hive 客户端中查看它的表结构:
show create table orders;
  • 这种表相当于就是 Flink SQL 自身的表,只是在 Hive 中存储了元数据信息而已,在 Hive 中是无法查询的。
评论0