Spark - RDD使用详解10(HBase的读取与写入)
十一、HBase 的读取与写入
1,准备工作
(1)首先我们需要在项目的 pom.xml 文件中添加 HBase 相关依赖项。
(2)接着我们编写一段代码测试对 HBase 的数据读写操作(与 Spark 无关):
<!-- hbase-client 依赖--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.5.5</version> </dependency> <!-- HBase MapReduce 模块的依赖--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.5.5</version> </dependency>
(2)接着我们编写一段代码测试对 HBase 的数据读写操作(与 Spark 无关):
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Get, Admin, TableDescriptorBuilder, ColumnFamilyDescriptorBuilder} import org.apache.hadoop.hbase.util.Bytes object Hello { def main(args: Array[String]): Unit = { // 配置 HBase 连接信息 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9") conf.set("hbase.zookeeper.property.clientPort", "2181") // 建立 HBase 连接 val connection: Connection = ConnectionFactory.createConnection(conf) val admin: Admin = connection.getAdmin val tableName = TableName.valueOf("my_table") val table = connection.getTable(tableName) try { // 自动创建表(如果不存在) if (!admin.tableExists(tableName)) { val tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build()) .build() admin.createTable(tableDescriptor) println("成功创建表 my_table。") } // 写入数据到 HBase val put = new Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("hangge.com")) table.put(put) println("成功写入数据到 HBase。") // 从 HBase 读取数据 val get = new Get(Bytes.toBytes("row1")) val result = table.get(get) val value = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1"))) println(s"从 HBase 读取数据: $value") } finally { // 关闭连接 table.close() connection.close() } } }
(3)程序运行后控制台输出的内容如下,说明 HBase 可以正常读写了:
2,将 RDD 的数据写入到 HBase 表中
(1)下面代码我们从一个包含记录的 RDD 构建 Put 对象,然后使用 saveAsNewAPIHadoopDataset 这个 Hadoop API 方法将这些 Put 对象写入到 HBase 表中。
import org.apache.hadoop.hbase.{HBaseConfiguration} import org.apache.hadoop.hbase.client.{Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.rdd.RDD import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 配置 HBase 连接信息 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9") conf.set("hbase.zookeeper.property.clientPort", "2181") // 生成作业 val jobConf = new JobConf(conf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "my_table") val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) // 构建记录 val dataRDD: RDD[String] = sc.makeRDD(Array("1,hangge,M,100", "2,lili,M,27", "3,liuyun,F,35")) // 数据进行转换 val rdd: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(_.split(',')).map { arr: Array[String] => { //行健的值 val put = new Put(Bytes.toBytes(arr(0))) //name列的值 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) //gender列的值 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2))) //age列的值 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt)) (new ImmutableBytesWritable, put) } } // 使用 saveAsNewAPIHadoopDataset 方法将RDD数据写入到 HBase 表 rdd.saveAsNewAPIHadoopDataset(job.getConfiguration) println("成功将RDD数据写入到 HBase。") //关闭 Spark sc.stop() } }
(2)代码执行完毕后,可以看到数据以及成功添加到 HBase 数据库中:
3,读取 HBase 表中数据转换为RDD
(1)下面代码将前面存放在 HBase 表中的数据读取出来转换为 RDD,并进行转换,统计出各性别的人数。
注意第 12 行代码:自从 Spark 3.2 版本以来,默认情况下 spark.hadoopRDD.ignoreEmptySplits 被设置为 true,这意味着 Spark 将不会为空的输入分片创建空分区。如果想要恢复到 Spark 3.2 之前的行为,可以将 spark.hadoopRDD.ignoreEmptySplits 设置为 false。否则会发现 newAPIHadoopRDD 方法得到的 RDD 里面不包含任何数据(程序也不会报错)
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") .set("spark.hadoopRDD.ignoreEmptySplits", "false") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 配置 HBase 连接信息 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.60.9") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set(TableInputFormat.INPUT_TABLE, "my_table") // 读取 HBase 数据并将其转换为 RDD val hbaseRDD = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) // 对 RDD 进行转换和统计 val genderCountRDD = hbaseRDD.map { case (_, result) => val gender = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("gender"))) (gender, 1) }.reduceByKey(_ + _) // 打印结果 genderCountRDD.collect().foreach { case (gender, count) => println(s"性别: $gender, 数量: $count") } //关闭 Spark sc.stop() } }
(2)代码执行后控制台输出如下内容: