当前位置: > > > Spark - RDD使用详解10(HBase的读取与写入)

Spark - RDD使用详解10(HBase的读取与写入)

十一、HBase 的读取与写入

1,准备工作

(1)首先我们需要在项目的 pom.xml 文件中添加 HBase 相关依赖项。
<!-- hbase-client 依赖-->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.5.5</version>
</dependency>
<!-- HBase MapReduce 模块的依赖-->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-mapreduce</artifactId>
    <version>2.5.5</version>
</dependency>

(2)接着我们编写一段代码测试对 HBase 的数据读写操作(与 Spark 无关):
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Get, Admin,
  TableDescriptorBuilder, ColumnFamilyDescriptorBuilder}
import org.apache.hadoop.hbase.util.Bytes

object Hello {
  def main(args: Array[String]): Unit = {
    // 配置 HBase 连接信息
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9")
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    // 建立 HBase 连接
    val connection: Connection = ConnectionFactory.createConnection(conf)
    val admin: Admin = connection.getAdmin
    val tableName = TableName.valueOf("my_table")
    val table = connection.getTable(tableName)

    try {
      // 自动创建表(如果不存在)
      if (!admin.tableExists(tableName)) {
        val tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
          .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build())
          .build()
        admin.createTable(tableDescriptor)
        println("成功创建表 my_table。")
      }

      // 写入数据到 HBase
      val put = new Put(Bytes.toBytes("row1"))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("hangge.com"))
      table.put(put)
      println("成功写入数据到 HBase。")

      // 从 HBase 读取数据
      val get = new Get(Bytes.toBytes("row1"))
      val result = table.get(get)
      val value = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")))
      println(s"从 HBase 读取数据: $value")
    } finally {
      // 关闭连接
      table.close()
      connection.close()
    }
  }
}

(3)程序运行后控制台输出的内容如下,说明 HBase 可以正常读写了:

2,将 RDD 的数据写入到 HBase 表中

(1)下面代码我们从一个包含记录的 RDD 构建 Put 对象,然后使用 saveAsNewAPIHadoopDataset 这个 Hadoop API 方法将这些 Put 对象写入到 HBase 表中。
import org.apache.hadoop.hbase.{HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 配置 HBase 连接信息
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9")
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    // 生成作业
    val jobConf = new JobConf(conf, this.getClass)
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,  "my_table")
    val job = Job.getInstance(jobConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Put])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    // 构建记录
    val dataRDD: RDD[String] = sc.makeRDD(Array("1,hangge,M,100", "2,lili,M,27", "3,liuyun,F,35"))

    // 数据进行转换
    val rdd: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(_.split(',')).map {
      arr: Array[String] => {
        //行健的值
        val put = new Put(Bytes.toBytes(arr(0)))
        //name列的值
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
        //gender列的值
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2)))
        //age列的值
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt))
        (new ImmutableBytesWritable, put)
      }
    }

    // 使用 saveAsNewAPIHadoopDataset 方法将RDD数据写入到 HBase 表
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
    println("成功将RDD数据写入到 HBase。")

    //关闭 Spark
    sc.stop()
  }
}

(2)代码执行完毕后,可以看到数据以及成功添加到 HBase 数据库中:

3,读取 HBase 表中数据转换为RDD

(1)下面代码将前面存放在 HBase 表中的数据读取出来转换为 RDD,并进行转换,统计出各性别的人数。
注意第 12 行代码:自从 Spark 3.2 版本以来,默认情况下 spark.hadoopRDD.ignoreEmptySplits 被设置为 true,这意味着 Spark 将不会为空的输入分片创建空分区。如果想要恢复到 Spark 3.2 之前的行为,可以将 spark.hadoopRDD.ignoreEmptySplits 设置为 false。否则会发现 newAPIHadoopRDD 方法得到的 RDD 里面不包含任何数据(程序也不会报错)
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
      .set("spark.hadoopRDD.ignoreEmptySplits", "false")

    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 配置 HBase 连接信息
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.60.9")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set(TableInputFormat.INPUT_TABLE, "my_table")

    // 读取 HBase 数据并将其转换为 RDD
    val hbaseRDD = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    // 对 RDD 进行转换和统计
    val genderCountRDD = hbaseRDD.map {
      case (_, result) =>
        val gender = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("gender")))
        (gender, 1)
    }.reduceByKey(_ + _)

    // 打印结果
    genderCountRDD.collect().foreach {
      case (gender, count) => println(s"性别: $gender, 数量: $count")
    }

    //关闭 Spark
    sc.stop()
  }
}

(2)代码执行后控制台输出如下内容:
评论0