Spark - SparkSQL使用详解6(HBase的读取与写入)
我在之前的文章中介绍了如何读取 HBase 表中数据转换为 RDD,以及如何将 RDD 中的数据保存到 HBase 数据库中(点击查看)。本文接着介绍如何通过 SparkSQL 来实现读写 HBase 中的数据。
(2)使用 create 命令创建一个新表(这里表名称为 people,列簇名为 cf)
(3)接着执行如下命令插入 3 条数据(每条数据均有三列):
(4)使用 scan 'people' 查看表数据,确保添加成功:
(2)然后执行如下命令进行编译:
(3)编译完毕后,我们需要的两个 jar 包会保存到项目目录的 spark/hbase-spark/target/ 和 spark/hbase-spark-protocol-shaded/target/ 文件夹下:
(2)除了上面那种读取 HBase 数据的方式外,我们还有另一种方式:针对 people 表创建一个目录(catalog),然后将其用于读取数据:
(2)但当我们开启后运行程序有会发现一旦遇到过滤操作时,会发现程序报如下错误。这是由于 HBase 服务器缺少相关的 jar 包。
(4)然后重启 HBase 服务,再次执行任务应用可以发现就能成功过滤并查询到数据,不会报错了。
六、HBase 的读取与写入
1,准备测试数据
(1)首先我们启动 HBase 的 shell 命令行工具:
./bin/hbase shell
(2)使用 create 命令创建一个新表(这里表名称为 people,列簇名为 cf)
create 'people', 'cf'
(3)接着执行如下命令插入 3 条数据(每条数据均有三列):
put 'people', 1, 'cf:name', 'hangge' put 'people', 1, 'cf:gender', 'M' put 'people', 1, 'cf:age', 100 put 'people', 2, 'cf:name', 'lili' put 'people', 2, 'cf:gender', 'F' put 'people', 2, 'cf:age', 27 put 'people', 3, 'cf:name', 'liuyun' put 'people', 3, 'cf:gender', 'M' put 'people', 3, 'cf:age', 35
(4)使用 scan 'people' 查看表数据,确保添加成功:
2,准备 HBase 连接器
(1)由于我使用的 Spark 版本时 3.X 的,而 Maven 上面的 HBase 连接器(Apache Spark HBase connector)只支持 Spark 2.X 版本,因此我们需要下载源码自行编译。首先执行如下命令将 Apache HBase Connectors 项目检出到本地:
git clone https://github.com/apache/hbase-connectors.git
(2)然后执行如下命令进行编译:
注意:mvn 命令中的 spark、scala、hbase、hadoop 版本参数根据实际情况进行修改,必须与实际使用的一致。
mvn -Dspark.version=3.3.1 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -Dhbase.version=2.4.15 -Dhadoop-three.version=3.3.2 -DskipTests clean package
(4)打开我们的 Spark 应用项目,在 src 同级的目录下新建一个 lib 目录,然后将这个 jar 包放在 lib 目录下:
3,添加依赖
编辑我们 Spark 应用项目的 pom.xml 文件,添加 Spark、HBase、Hadoop、Scala、HBase Spark 连接器依赖。下面是完整的配置代码:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <!-- 项目元数据 --> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>MySpark</artifactId> <version>1.0-SNAPSHOT</version> <!-- 配置属性 --> <properties> <!-- 指定编译器版本 --> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <!-- Spark 版本 --> <spark.version>3.3.1</spark.version> <!-- Scala 版本 --> <scala.version>2.12</scala.version> <!-- HBase 版本 --> <hbase.version>2.5.5</hbase.version> </properties> <!-- 仓库配置 --> <repositories> <!-- 配置 Spring 仓库 --> <repository> <id>spring</id> <url>https://maven.aliyun.com/repository/spring</url> </repository> </repositories> <!-- 项目依赖 --> <dependencies> <!-- Spark 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- HBase 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> </dependency> <!-- Hadoop 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> </dependency> <!-- Scala 依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.15</version> </dependency> <dependency> <groupId>org.scala-lang.modules</groupId> <artifactId>scala-parser-combinators_2.12</artifactId> <version>1.0.4</version> </dependency> <!-- HBase Spark 连接器依赖 --> <dependency> <groupId>org.apache.hbase.connectors.spark</groupId> <artifactId>hbase-spark</artifactId> <version>1.0.1</version> <scope>system</scope> <systemPath>${project.basedir}/lib/hbase-spark-1.0.1-SNAPSHOT.jar</systemPath> </dependency> <dependency> <groupId>org.apache.hbase.connectors.spark</groupId> <artifactId>hbase-spark-protocol-shaded</artifactId> <version>1.0.1</version> <scope>system</scope> <systemPath>${project.basedir}/lib/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT.jar</systemPath> </dependency> </dependencies> <!-- 构建配置 --> <build> <plugins> <!-- Spring Boot Maven 插件配置 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <!-- 包括系统范围的依赖 --> <includeSystemScope>true</includeSystemScope> </configuration> </plugin> </plugins> </build> </project>
4,读取 HBase 数据
(1)下面代码我们使用 Spark 读取 HBase 表中的数据,并将其转换为 DataFrame,然后就可以通过 Spark SQL 语法或者 DSL 语法对数据进行分析统计了。
import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf} import org.apache.hadoop.hbase.{HBaseConfiguration} object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建 HBase 配置对象 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181") // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase new HBaseContext(spark.sparkContext, conf) // 指定要操作的 HBase 表名 val hbase_table = "people" // 定义 HBase 列映射关系 val hbase_column_mapping = "id STRING :key, " + "name STRING cf:name, " + "gender STRING cf:gender, " + "age STRING cf:age" // 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame val hbaseDF = spark.read .format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", hbase_column_mapping) .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化 .option("hbase.table", hbase_table) .load() // 打印 DataFrame 的模式信息 hbaseDF.printSchema() // 显示DataFrame内容 hbaseDF.show() // 使用DSL语法:统计男性和女性的人数 val genderCountsDF = hbaseDF.groupBy("gender").count() println("性别统计:") genderCountsDF.show() // 使用SQL语法:查询年龄大于30的人数 hbaseDF.createOrReplaceTempView("people") // 首先要注册Dataframe为临时视图 val result2 = spark.sql("SELECT COUNT(*) FROM people WHERE age > 30") println("年龄大于30岁的人员:") result2.show() //关闭 Spark spark.stop() } }
import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建 HBase 配置对象 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181") // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase val hbaseContext = new HBaseContext(spark.sparkContext, conf) // 定义目录 val catalog = s"""{ |"table":{"namespace":"default", "name":"people"}, |"rowkey":"key", |"columns":{ |"id":{"col":"key", "type":"string"}, |"name":{"cf":"cf", "col":"name", "type":"string"}, |"gender":{"cf":"cf", "col":"gender", "type":"string"}, |"age":{"cf":"cf", "col":"age", "type":"string"} |} |}""".stripMargin // 使目录读取 HBase 表中的数据,转换为 DataFrame val hbaseDF = spark.read .options(Map(HBaseTableCatalog.tableCatalog -> catalog)) .format("org.apache.hadoop.hbase.spark") .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化 .load() // 打印 DataFrame 的模式信息 hbaseDF.printSchema() // 显示DataFrame内容 hbaseDF.show() // 使用DSL语法:统计男性和女性的人数 val genderCountsDF = hbaseDF.groupBy("gender").count() println("性别统计:") genderCountsDF.show() // 使用SQL语法:查询年龄大于30的人数 hbaseDF.createOrReplaceTempView("people") // 首先要注册Dataframe为临时视图 val result2 = spark.sql("SELECT COUNT(*) FROM people WHERE age > 30") println("年龄大于30岁的人员:") result2.show() //关闭 Spark spark.stop() } }
5,保存数据到 HBase
(1)下面代码我们首先读取 people 表的数据,并将性别值改成中文后保存更新到库中。然后统计出各性别的人数后,保存到 gender_count 表中。
import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.spark.sql.functions.when object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建 HBase 配置对象 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181") // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase new HBaseContext(spark.sparkContext, conf) // 指定要操作的 HBase 表名 val people_table = "people" // 定义 HBase 列映射关系 val people_column_mapping = "id STRING :key, " + "name STRING cf:name, " + "gender STRING cf:gender, " + "age STRING cf:age" // 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame val hbaseDF = spark.read .format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", people_column_mapping) .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化 .option("hbase.table", people_table) .load() // 将性别值改成中文并保存到库中 val hbaseDFWithChineseGender = hbaseDF.withColumn("gender", when($"gender" === "M", "男性").otherwise("女性") ) hbaseDFWithChineseGender.write .format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", people_column_mapping) .option("hbase.table", people_table) .save() // 定义性别统计表的列映射关系 val gender_count_column_mapping = "gender STRING :key, " + "count INT cf:count" // 统计男性和女性的人数 val genderCountsDF = hbaseDFWithChineseGender.groupBy("gender").count() // 将性别统计结果存储到另一张表中,使用HBase列映射关系 genderCountsDF.write .format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", gender_count_column_mapping) .option("hbase.table", "gender_count") .save() //关闭 Spark spark.stop() } }
(2)同样的,保存数据到 HBase 中除了上面那种方式外,还有另一种方式:针对 people、gender_count 表分别创建相应的目录(catalog),然后将其用于保存数据:
import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog import org.apache.spark.sql.functions.when object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建 HBase 配置对象 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181") // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase new HBaseContext(spark.sparkContext, conf) // 定义people表的目录 val people_catalog = s"""{ |"table":{"namespace":"default", "name":"people"}, |"rowkey":"key", |"columns":{ |"id":{"col":"key", "type":"string"}, |"name":{"cf":"cf", "col":"name", "type":"string"}, |"gender":{"cf":"cf", "col":"gender", "type":"string"}, |"age":{"cf":"cf", "col":"age", "type":"string"} |} |}""".stripMargin // 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame val hbaseDF = spark.read .options(Map(HBaseTableCatalog.tableCatalog -> people_catalog)) .format("org.apache.hadoop.hbase.spark") .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化 .load() // 将性别值改成中文并保存到库中 val hbaseDFWithChineseGender = hbaseDF.withColumn("gender", when($"gender" === "M", "男性").otherwise("女性") ) hbaseDFWithChineseGender.write .options(Map(HBaseTableCatalog.tableCatalog -> people_catalog)) .format("org.apache.hadoop.hbase.spark") .save() // 定义gender_count表的目录 val gender_count_catalog = s"""{ |"table":{"namespace":"default", "name":"gender_count"}, |"rowkey":"key", |"columns":{ |"gender":{"col":"key", "type":"string"}, |"count":{"cf":"cf", "col":"count", "type":"int"} |} |}""".stripMargin // 统计男性和女性的人数 val genderCountsDF = hbaseDFWithChineseGender.groupBy("gender").count() genderCountsDF.show() // 将性别统计结果存储到另一张表中,使用HBase列映射关系 genderCountsDF.write .options(Map(HBaseTableCatalog.tableCatalog -> gender_count_catalog)) .format("org.apache.hadoop.hbase.spark") .save() //关闭 Spark spark.stop() } }
附:启用过滤器下推优化
1,什么是下推优化?
Hbase 提供了种类丰富的过滤器(filter)来提高数据处理的效率,用户可以通过内置或自定义的过滤器来对数据进行过滤,所有的过滤器都在服务端生效,这样可以保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。
2,如何开启下推优化
(1)在上面读取 HBase 数据的实例中,我们是禁用下推优化的,也就是说 HBase 数据其实是全部传输到客户端,然后再在客户端上进行过滤。如果需要启用下推优化,只需要将 hbase.spark.pushdown.columnfilter 设置为 true,或者直接取消这个设置(默认即为 true)
// 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame val hbaseDF = spark.read .format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", hbase_column_mapping) .option("hbase.spark.pushdown.columnfilter", true) // 开启过滤下推优化 .option("hbase.table", hbase_table) .load()
(2)但当我们开启后运行程序有会发现一旦遇到过滤操作时,会发现程序报如下错误。这是由于 HBase 服务器缺少相关的 jar 包。
org.apache.hadoop.hbase.DoNotRetryIOException: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1612)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1157)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.newRegionScanner(RSRpcServices.java:3039)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:3369)
at org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:42278)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at org.apache.hadoop.hbase.util.DynamicClassLoader.tryRefreshClass(DynamicClassLoader.java:188)
at org.apache.hadoop.hbase.util.DynamicClassLoader.loadClass(DynamicClassLoader.java:151)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1603)
... 8 more
(3)要解决这个问题,我们只需要将前面编译的 HBase 连接器 jar 包(hbase-spark-xxx.jar、hbase-spark-protocol-shaded-xxx.jar)以及 scala-library-xxx.jar 包,复制到 HBase 安装目录下的 lib 文件夹中。
注意:scala-library 的 jar 包可以从本地 Maven 缓存中提取,比如在我的系统中就是 /Users/hangge/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar
(4)然后重启 HBase 服务,再次执行任务应用可以发现就能成功过滤并查询到数据,不会报错了。