当前位置: > > > Spark - SparkSQL使用详解6(HBase的读取与写入)

Spark - SparkSQL使用详解6(HBase的读取与写入)

    我在之前的文章中介绍了如何读取 HBase 表中数据转换为 RDD,以及如何将 RDD 中的数据保存到 HBase 数据库中(点击查看)。本文接着介绍如何通过 SparkSQL 来实现读写 HBase 中的数据。 

六、HBase 的读取与写入 

1,准备测试数据

(1)首先我们启动 HBaseshell 命令行工具:
./bin/hbase shell

(2)使用 create 命令创建一个新表(这里表名称为 people,列簇名为 cf
create 'people', 'cf'

(3)接着执行如下命令插入 3 条数据(每条数据均有三列):
put 'people', 1, 'cf:name', 'hangge'
put 'people', 1, 'cf:gender', 'M'
put 'people', 1, 'cf:age', 100
put 'people', 2, 'cf:name', 'lili'
put 'people', 2, 'cf:gender', 'F'
put 'people', 2, 'cf:age', 27
put 'people', 3, 'cf:name', 'liuyun'
put 'people', 3, 'cf:gender', 'M'
put 'people', 3, 'cf:age', 35

(4)使用 scan 'people' 查看表数据,确保添加成功:

2,准备 HBase 连接器

(1)由于我使用的 Spark 版本时 3.X 的,而 Maven 上面的 HBase 连接器(Apache Spark HBase connector)只支持 Spark 2.X 版本,因此我们需要下载源码自行编译。首先执行如下命令将 Apache HBase Connectors 项目检出到本地:
git clone https://github.com/apache/hbase-connectors.git

(2)然后执行如下命令进行编译:
注意mvn 命令中的 sparkscalahbasehadoop 版本参数根据实际情况进行修改,必须与实际使用的一致。
mvn -Dspark.version=3.3.1 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -Dhbase.version=2.4.15 -Dhadoop-three.version=3.3.2 -DskipTests clean package

(3)编译完毕后,我们需要的两个 jar 包会保存到项目目录的 spark/hbase-spark/target/ spark/hbase-spark-protocol-shaded/target/ 文件夹下:

(4)打开我们的 Spark 应用项目,在 src 同级的目录下新建一个 lib 目录,然后将这个 jar 包放在 lib 目录下:

3,添加依赖

    编辑我们 Spark 应用项目的 pom.xml 文件,添加 SparkHBaseHadoopScalaHBase Spark 连接器依赖。下面是完整的配置代码:
注意HBase Spark 连接器我们使用的是本地的 jar 包,注意其配置与其它依赖的区别,关于项目中引入本地或第三方 JAR 包的具体说明,可以查看我之前写的文章(点击查看
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <!-- 项目元数据 -->
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>MySpark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 配置属性 -->
    <properties>
        <!-- 指定编译器版本 -->
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <!-- Spark 版本 -->
        <spark.version>3.3.1</spark.version>
        <!-- Scala 版本 -->
        <scala.version>2.12</scala.version>
        <!-- HBase 版本 -->
        <hbase.version>2.5.5</hbase.version>
    </properties>

    <!-- 仓库配置 -->
    <repositories>
        <!-- 配置 Spring 仓库 -->
        <repository>
            <id>spring</id>
            <url>https://maven.aliyun.com/repository/spring</url>
        </repository>
    </repositories>

    <!-- 项目依赖 -->
    <dependencies>
        <!-- Spark 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- HBase 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!-- Hadoop 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.2</version>
        </dependency>
        <!-- Scala 依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.15</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-parser-combinators_2.12</artifactId>
            <version>1.0.4</version>
        </dependency>
        <!-- HBase Spark 连接器依赖 -->
        <dependency>
            <groupId>org.apache.hbase.connectors.spark</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>1.0.1</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/hbase-spark-1.0.1-SNAPSHOT.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase.connectors.spark</groupId>
            <artifactId>hbase-spark-protocol-shaded</artifactId>
            <version>1.0.1</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT.jar</systemPath>
        </dependency>
    </dependencies>

    <!-- 构建配置 -->
    <build>
        <plugins>
            <!-- Spring Boot Maven 插件配置 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <!-- 包括系统范围的依赖 -->
                    <includeSystemScope>true</includeSystemScope>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

4,读取 HBase 数据

(1)下面代码我们使用 Spark 读取 HBase 表中的数据,并将其转换为 DataFrame,然后就可以通过 Spark SQL 语法或者 DSL 语法对数据进行分析统计了。
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf}
import org.apache.hadoop.hbase.{HBaseConfiguration}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建 HBase 配置对象
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
    // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
    new HBaseContext(spark.sparkContext, conf)

    // 指定要操作的 HBase 表名
    val hbase_table = "people"

    // 定义 HBase 列映射关系
    val hbase_column_mapping = "id STRING :key, " +
      "name STRING cf:name, " +
      "gender STRING cf:gender, " +
      "age STRING cf:age"

    // 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
    val hbaseDF = spark.read
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.columns.mapping", hbase_column_mapping)
      .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
      .option("hbase.table", hbase_table)
      .load()

    // 打印 DataFrame 的模式信息
    hbaseDF.printSchema()

    // 显示DataFrame内容
    hbaseDF.show()

    // 使用DSL语法:统计男性和女性的人数
    val genderCountsDF = hbaseDF.groupBy("gender").count()
    println("性别统计:")
    genderCountsDF.show()

    // 使用SQL语法:查询年龄大于30的人数
    hbaseDF.createOrReplaceTempView("people") // 首先要注册Dataframe为临时视图
    val result2 = spark.sql("SELECT COUNT(*) FROM people WHERE age > 30")
    println("年龄大于30岁的人员:")
    result2.show()

    //关闭 Spark
    spark.stop()
  }
}

(2)除了上面那种读取 HBase 数据的方式外,我们还有另一种方式:针对 people 表创建一个目录(catalog),然后将其用于读取数据:
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建 HBase 配置对象
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
    // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
    val hbaseContext = new HBaseContext(spark.sparkContext, conf)

    // 定义目录
    val catalog = s"""{
         |"table":{"namespace":"default", "name":"people"},
         |"rowkey":"key",
         |"columns":{
         |"id":{"col":"key", "type":"string"},
         |"name":{"cf":"cf", "col":"name", "type":"string"},
         |"gender":{"cf":"cf", "col":"gender", "type":"string"},
         |"age":{"cf":"cf", "col":"age", "type":"string"}
         |}
         |}""".stripMargin

    // 使目录读取 HBase 表中的数据,转换为 DataFrame
    val hbaseDF = spark.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
      .load()

    // 打印 DataFrame 的模式信息
    hbaseDF.printSchema()

    // 显示DataFrame内容
    hbaseDF.show()

    // 使用DSL语法:统计男性和女性的人数
    val genderCountsDF = hbaseDF.groupBy("gender").count()
    println("性别统计:")
    genderCountsDF.show()

    // 使用SQL语法:查询年龄大于30的人数
    hbaseDF.createOrReplaceTempView("people") // 首先要注册Dataframe为临时视图
    val result2 = spark.sql("SELECT COUNT(*) FROM people WHERE age > 30")
    println("年龄大于30岁的人员:")
    result2.show()

    //关闭 Spark
    spark.stop()
  }
}

5,保存数据到 HBase

(1)下面代码我们首先读取 people 表的数据,并将性别值改成中文后保存更新到库中。然后统计出各性别的人数后,保存到 gender_count 表中。
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.sql.functions.when

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建 HBase 配置对象
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
    // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
    new HBaseContext(spark.sparkContext, conf)

    // 指定要操作的 HBase 表名
    val people_table = "people"

    // 定义 HBase 列映射关系
    val people_column_mapping = "id STRING :key, " +
      "name STRING cf:name, " +
      "gender STRING cf:gender, " +
      "age STRING cf:age"

    // 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
    val hbaseDF = spark.read
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.columns.mapping", people_column_mapping)
      .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
      .option("hbase.table", people_table)
      .load()

    // 将性别值改成中文并保存到库中
    val hbaseDFWithChineseGender = hbaseDF.withColumn("gender",
      when($"gender" === "M", "男性").otherwise("女性")
    )
    hbaseDFWithChineseGender.write
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.columns.mapping", people_column_mapping)
      .option("hbase.table", people_table)
      .save()

    // 定义性别统计表的列映射关系
    val gender_count_column_mapping = "gender STRING :key, " +
      "count INT cf:count"

    // 统计男性和女性的人数
    val genderCountsDF = hbaseDFWithChineseGender.groupBy("gender").count()

    // 将性别统计结果存储到另一张表中,使用HBase列映射关系
    genderCountsDF.write
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.columns.mapping", gender_count_column_mapping)
      .option("hbase.table", "gender_count")
      .save()

    //关闭 Spark
    spark.stop()
  }
}

(2)同样的,保存数据到 HBase 中除了上面那种方式外,还有另一种方式:针对 peoplegender_count 表分别创建相应的目录(catalog),然后将其用于保存数据:
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.functions.when

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建 HBase 配置对象
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
    // 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
    new HBaseContext(spark.sparkContext, conf)

    // 定义people表的目录
    val people_catalog = s"""{
                       |"table":{"namespace":"default", "name":"people"},
                       |"rowkey":"key",
                       |"columns":{
                       |"id":{"col":"key", "type":"string"},
                       |"name":{"cf":"cf", "col":"name", "type":"string"},
                       |"gender":{"cf":"cf", "col":"gender", "type":"string"},
                       |"age":{"cf":"cf", "col":"age", "type":"string"}
                       |}
                       |}""".stripMargin

    // 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
    val hbaseDF = spark.read
      .options(Map(HBaseTableCatalog.tableCatalog -> people_catalog))
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
      .load()

    // 将性别值改成中文并保存到库中
    val hbaseDFWithChineseGender = hbaseDF.withColumn("gender",
      when($"gender" === "M", "男性").otherwise("女性")
    )
    hbaseDFWithChineseGender.write
      .options(Map(HBaseTableCatalog.tableCatalog -> people_catalog))
      .format("org.apache.hadoop.hbase.spark")
      .save()

    // 定义gender_count表的目录
    val gender_count_catalog = s"""{
                    |"table":{"namespace":"default", "name":"gender_count"},
                    |"rowkey":"key",
                    |"columns":{
                    |"gender":{"col":"key", "type":"string"},
                    |"count":{"cf":"cf", "col":"count", "type":"int"}
                    |}
                    |}""".stripMargin

    // 统计男性和女性的人数
    val genderCountsDF = hbaseDFWithChineseGender.groupBy("gender").count()
    genderCountsDF.show()

    // 将性别统计结果存储到另一张表中,使用HBase列映射关系
    genderCountsDF.write
      .options(Map(HBaseTableCatalog.tableCatalog -> gender_count_catalog))
      .format("org.apache.hadoop.hbase.spark")
      .save()

    //关闭 Spark
    spark.stop()
  }
}

附:启用过滤器下推优化

1,什么是下推优化?

    Hbase 提供了种类丰富的过滤器(filter)来提高数据处理的效率,用户可以通过内置或自定义的过滤器来对数据进行过滤,所有的过滤器都在服务端生效,这样可以保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。

2,如何开启下推优化

(1)在上面读取 HBase 数据的实例中,我们是禁用下推优化的,也就是说 HBase 数据其实是全部传输到客户端,然后再在客户端上进行过滤。如果需要启用下推优化,只需要将 hbase.spark.pushdown.columnfilter 设置为 true,或者直接取消这个设置(默认即为 true
// 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
val hbaseDF = spark.read
  .format("org.apache.hadoop.hbase.spark")
  .option("hbase.columns.mapping", hbase_column_mapping)
  .option("hbase.spark.pushdown.columnfilter", true) // 开启过滤下推优化
  .option("hbase.table", hbase_table)
  .load()

(2)但当我们开启后运行程序有会发现一旦遇到过滤操作时,会发现程序报如下错误。这是由于 HBase 服务器缺少相关的 jar 包。
org.apache.hadoop.hbase.DoNotRetryIOException: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1612)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1157)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.newRegionScanner(RSRpcServices.java:3039)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:3369)
at org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:42278)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at org.apache.hadoop.hbase.util.DynamicClassLoader.tryRefreshClass(DynamicClassLoader.java:188)
at org.apache.hadoop.hbase.util.DynamicClassLoader.loadClass(DynamicClassLoader.java:151)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1603)
... 8 more

(3)要解决这个问题,我们只需要将前面编译的 HBase 连接器 jar 包(hbase-spark-xxx.jarhbase-spark-protocol-shaded-xxx.jar)以及 scala-library-xxx.jar 包,复制到 HBase 安装目录下的 lib 文件夹中。
注意scala-library jar 包可以从本地 Maven 缓存中提取,比如在我的系统中就是 /Users/hangge/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar

(4)然后重启 HBase 服务,再次执行任务应用可以发现就能成功过滤并查询到数据,不会报错了。
评论0