Spark - 使用SparkSQL将数据写入Hive表详解2(使用saveAsTable()方法)
在之前的文章中,我介绍了如何在 SparkSQL 中集成 Hive 并查询 Hive 表中的数据(点击查看)。实际工作中,我们不仅需要查询数据并计算结果,还希望将结果数据写入 Hive 表中。通常来说,向 Hive 表中写入数据有如下 3 种方法。
(2)然后给表中添加一些初始数据:
(2)查看这个表的数据也是都有的:
(2)然后修改代码,将 format 中的值改为 text:
(3)重新执行程序,发现程序报错,提示说 Text 格式不支持 int 数据类型。
(2)修改代码,将 format 中的值改为 json:
(3)执行程序。程序虽然不报错,但是表中的数据是无法查询的。首先我们先看表结构:
(2)然后编写如下代码:
(3)执行程序,发现执行报错,提示这个表是 HiveFileFormat,不支持 TextDataSourceV2 格式。
- 第一种:使用 inserInto() 方法。
- 第二种:使用 saveAsTable() 方法。
- 第三种:使用 SparkSQL 语句。
其中第二种不推荐使用,最常用的是第三种,用起来比较方便。我在前文介绍了第一种方法,本文接着介绍第二种使用 saveAsTable() 方法。
二、使用 saveAsTable() 方法写入 Hive 表数据
1,准备工作
(1)首先我们在 hive 中创建一张 student 表:
create table student ( id int, stu_name string, stu_birthday date, online boolean ) row format delimited fields terminated by '\t' lines terminated by '\n';
(2)然后给表中添加一些初始数据:
load data local inpath '/usr/local/student.txt' into table student;
2,编写代码
(1)首先我们的项目要做好 Hive 的集成配置,具体可以参考我之前写的文章:
(2)接着我们编写如下测试代码,用于通过 SparkSQL 从 Hive 表 student 中查询数据并将其写入到另一个 Hive 表 student_bak。不同于使用 inserInto() 方法时要求写入的 Hive 表是要已经存在的,而 saveAsTable() 方法则不需要,这里又分两种情况:
- 表不存在:
- 则会根据 DataFrame 中的 Schema 自动创建目标表并写入数据
- 表存在:
- 如果 mode=append,当 DataFrame 中的 Schema 和表中的 Schema 相同(字段顺序可以不同),则执行追加操作。当 DataFrame 中的 Schema 和表中的 Schema 不相同,则报错。
- 如果 mode=overwrite,当 DataFrame 中的 Schema 和表中的 Schema 相同(字段顺序可以不同),则直接覆盖。当 DataFrame 中的 Schema 和表中的 Schema 不相同,则会删除之前的表,然后按照 DataFrame 中的 Schema 重新创建表并写入数据。
注意:如果使用 idea 本地运行时需要通过 spark.sql.warehouse.dir 指定一下 hive 对应的 hdfs 路径全称(如果在服务器上运行则不需要该设置),否则获取到的值是 file:/user/hive/warehouse,从而造成查询目标表数据时返回的结果为空。
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkSQLWriteHive_2 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("SparkSQLWriteHive_2") .config(conf) //idea本地运行时需要通过spark.sql.warehouse.dir指定一下hive对应的hdfs路径全称 .config("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse") //开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数 .enableHiveSupport() .getOrCreate() import sparkSession.sql //查询数据 val resDf = sql("select * from student") //写入数据 resDf.write //指定数据写入格式append:追加。overwrite:覆盖。 .mode("overwrite") //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。 //不指定的话默认是parquet格式。 //注意:text数据格式在这里不支持int数据类型 //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的, //无法读取生成的普通文件,不要使用这种方式。 //parquet, orc数据格式可以正常使用 .format("parquet") .saveAsTable("student_bak") sparkSession.stop() } }
3,运行测试
(1)上面代码运行后,我们可以到到 Hive 中查看目标表的详细信息:
show create table student_bak;
(2)查看这个表的数据也是都有的:
select * from student_bak;
附:使用 saveAsTable() 方法的注意事项
1,text 数据格式不支持 int 数据类型
(1)我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录
drop table student_bak;
(2)然后修改代码,将 format 中的值改为 text:
//写入数据(表不存在) resDf.write //指定数据写入格式append:追加。overwrite:覆盖。 .mode("overwrite") //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。 //不指定的话默认是parquet格式。 //注意:text数据格式在这里不支持int数据类型 //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的, //无法读取生成的普通文件,不要使用这种方式。 //parquet, orc数据格式可以正常使用 .format("text") .saveAsTable("student_bak")
(3)重新执行程序,发现程序报错,提示说 Text 格式不支持 int 数据类型。
2,普通文本文件数据格式(json、csv)无法正常使用
(1)针对普通文本文件数据格式(json、csv),默认创建的 Hive 表是 SequenceFile 格式的,无法读取生成的普通文件,也无法正常使用。我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录drop table student_bak;
(2)修改代码,将 format 中的值改为 json:
//写入数据(表不存在) resDf.write //指定数据写入格式append:追加。overwrite:覆盖。 .mode("overwrite") //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。 //不指定的话默认是parquet格式。 //注意:text数据格式在这里不支持int数据类型 //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的, //无法读取生成的普通文件,不要使用这种方式。 //parquet, orc数据格式可以正常使用 .format("json") .saveAsTable("student_bak")
(3)执行程序。程序虽然不报错,但是表中的数据是无法查询的。首先我们先看表结构:
show create table student_bak;
- 此时发现表中只有一列,并且表的输入和输出格式化类是 SequenceFile。
(4)查询表中的数据,发现无法查询。这是因为产生的文件是 json 文件,不是一个 SequenceFile 文件,所以无法读取。使用 csv 格式和 json 格式都会遇到同样的问题。
select * from student_bak;
3,针对已存在的表,当 mode 为 append 时,format 必须指定为 hive
(1)针对已存在的表,当 mode 为 append 时,format 里面必须指定为 hive,否则会报错。我们这里验证一下,首先到 hive 中删除表同时删除对应的 hdfs 目录。
drop table student_bak;
(2)然后编写如下代码:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkSQLWriteHive_2 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //获取SparkSession,为了操作SparkSQL val sparkSession = SparkSession .builder() .appName("SparkSQLWriteHive_2") .config(conf) //idea本地运行时需要通过spark.sql.warehouse.dir指定一下hive对应的hdfs路径全称 .config("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse") //开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数 .enableHiveSupport() .getOrCreate() import sparkSession.sql //查询数据 val resDf = sql("select * from student") //创建目标表 sql( """ |create table if not exists student_bak( | id int, | stu_name string, | stu_birthday date, | online boolean |)using hive | OPTIONS( | fileFormat 'textfile', | fieldDelim '\t' | ) |""".stripMargin) //写入数据(表存在 ) resDf.write //指定数据写入格式append:追加。overwrite:覆盖。 .mode("append") //这里需要指定数据格式: parquet, orc, avro(需要添加外部依赖包), json, csv, text。 //不指定的话默认是parquet格式。 //注意:text数据格式在这里不支持int数据类型 //针对普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的, //无法读取生成的普通文件,不要使用这种方式。 //parquet, orc数据格式可以正常使用 .format("text") .saveAsTable("student_bak") sparkSession.stop() } }
(3)执行程序,发现执行报错,提示这个表是 HiveFileFormat,不支持 TextDataSourceV2 格式。
提示:要解决这个问题只需将 format 中的参数置指定为 hive 即可。