HBase - 数据批量导入教程2(使用BulkLoad)
我在之前的文章中介绍了如何使用 MapReduce 实现数据批量导入到 HBase(点击查看),本文接着介绍另一种方法:利用 Bulkload。该方法首先使用 MapReduce 直接生成 HFile 文件,然后再通过 Bulkload 将 HFile 文件直接加载到表中。
(2)然后将该文件传到 HDFS 中:
(3)接着我们在 HBase 中创建需要的目标表 batch1:
(2)并且 pom.xml 文件中还需要添加 Maven 的编译打包插件配置:
(3)接着我们编写一个 MapReduce 任务 BatchImportBulkLoad,其代码如下,在 map 阶段,把数据封装成 put 操作,将数据生成 HBase 的底层存储文件 HFile
(7)执行成功之后,可以看到 HDFS 上的 HFile 文件已经成功生成:
(2)执行成功之后,查询 HBase 中 batch2 表中的结果,可以看到数据已经导入成功。
二、使用 BulkLoad 实现数据批量导入 HBase
1,准备工作
(1)首先我们要准备输入的数据文件 import.dat,其内容如下:
注意:字段分隔符为制表符,建议在服务器上使用 vi 命令创建编辑。
a c1 name hangge a c1 age 88 b c1 name xiaoliu b c1 age 19 c c1 name lili c c1 age 33
(2)然后将该文件传到 HDFS 中:
hdfs dfs -put import.dat /
(3)接着我们在 HBase 中创建需要的目标表 batch1:
create 'batch2','c1'
2,生成 HFile 文件
(1)首先我们需要创建一个 MapReduce 任务来生成 HFile 文件,该项目的 pom.xml 文件中需要添加 hadoop-client、hbase-client 和 hbase-mapreduce 的依赖:
注意:hbase-client 和 hbase-mapreduce 不能设置 provided,这两个依赖需要打进 jar 包里面,否则会提示找不到对应的类。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.3.0</version> </dependency>
(2)并且 pom.xml 文件中还需要添加 Maven 的编译打包插件配置:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(3)接着我们编写一个 MapReduce 任务 BatchImportBulkLoad,其代码如下,在 map 阶段,把数据封装成 put 操作,将数据生成 HBase 的底层存储文件 HFile
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 利用 BulkLoad 批量导入 * 在 map 阶段,把数据封装成 put 操作,将数据生成 HBase 的底层存储文件 HFile */ public class BatchImportBulkLoad { public static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strs = value.toString().split("\t"); if(strs.length==4){ String rowkey = strs[0]; String columnFamily = strs[1]; String name = strs[2]; String val = strs[3]; ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey.getBytes()); Put put = new Put(rowkey.getBytes()); put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes()); context.write(rowkeyWritable,put); } } } public static void main(String[] args) throws Exception{ if(args.length!=3){ //如果传递的参数不够,程序直接退出 System.exit(100); } String inPath = args[0]; String outPath = args[1]; String outTableName = args[2]; //设置属性对应参数 Configuration conf = new Configuration(); conf.set("hbase.table.name",outTableName); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); //封装 Job Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName); job.setJarByClass(BatchImportBulkLoad.class); //指定输入路径 FileInputFormat.setInputPaths(job,new Path(inPath)); //指定输出路径[如果输出路径存在,就将其删除] FileSystem fs = FileSystem.get(conf); Path output = new Path(outPath); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job, output); //指定 map 相关的代码 job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //禁用 Reduce job.setNumReduceTasks(0); Connection connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf(outTableName); HFileOutputFormat2.configureIncrementalLoad(job,connection.getTable(tableName), connection.getRegionLocator(tableName)); job.waitForCompletion(true); } }
(4)代码编写完毕后,执行打 Jar 包的操作:
(5)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。
(6)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交任务:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar BatchImportBulkLoad hdfs://node1:9000/import.dat hdfs://node1:9000/hbase_out batch2
(7)执行成功之后,可以看到 HDFS 上的 HFile 文件已经成功生成:
3,加载 HFile 文件
(1)在 HBase 客户端节点上执行下面命令,把 HFile 数据转移到表对应的 region 中。
hbase org.apache.hadoop.hbase.tool.BulkLoadHFilesTool hdfs://node1:9000/hbase_out batch2
(2)执行成功之后,查询 HBase 中 batch2 表中的结果,可以看到数据已经导入成功。
scan 'batch2'