当前位置: > > > SpringBoot - 使用hbase-client操作HBase教程3(分页查询)

SpringBoot - 使用hbase-client操作HBase教程3(分页查询)

    我在之前的文章中演示了如何通过集成 hbase-client 来对 HBase 表进行创建、删除、数据插入、查询等操作(点击查看),以及如何使用 Filter 进行过滤操作(点击查看)。本文将在前面的基础上,演示如何实现分页查询。

三、分页查询

1,实现思路

(1)核心思路是使用 PageFilter 过滤器 + 循环动态设置 startRow 实现:
  • 正常情况 PageFilter 返回的结果数量可能大于设定的值,因为服务器集群的 PageFilter 是隔离的,只能保证每个 Region 返回的数据量不会超过 PageFilter 中设置的值。
  • 想要解决返回数据超过设置数量的问题,可以考虑对获取到的数据进行截取,只保留需要的数据条数,下一次读取时根据上次截取的位置开始往后读取。
(2)该方案有如下两个地方要注意:
  • PageFilter 和其它 Filter 一起使用时,需要将 PageFilter 加入到 FilterList 的末尾,否则会出现结果个数小于你期望的数量。
  • 循环动态设置 startRow 时,需要在上一次获取到的 rowkey 后面补 0,表示新的开始,因为默认 startRow 中包含这一行数据

2,实现代码

   为了方便使用,首先对前文的 HBaseUtils.java 工具类稍作修改,增加如下方法,可以通过传入的 FilterpageNumpageSize 进行数据过滤并分页:
/**
 * 获取数据(根据传入的 filter,pageNum,pageSize)
 * @param tableName 表名
 * @param filter 过滤器
 * @param pageNum 页码(从 1 开始)
 * @param pageSize 每页数量
 * @return map
 */
public List<Map<String, String>> getData(String tableName, Filter filter, int pageNum, int pageSize)
        throws IOException {
    List<Map<String, String>> list = new ArrayList<>();
    Table table = hbaseAdmin.getConnection().getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();

    // 组装 Filter 列表
    FilterList filterList = new FilterList();
    filterList.addFilter(filter);

    // PageFilter
    PageFilter pageFilter = new PageFilter(pageSize);
    //注意:如果用到了多个 filter,其中包含 pagefilter,那么 pagefilter 需要放在 fiterlist 的最后一个
    filterList.addFilter(pageFilter);
    scan.setFilter(filterList);

    //记录上一次返回的分页数据中的最大的 Rowkey,最开始为 null
    byte[] lastRowKey = null;

    for (int currentPage = 1; currentPage <= pageNum; currentPage++) {
        if (lastRowKey != null) {
            // 注意:在这里需要在 lastRowkey 后面补 0,否则会把当前这条数据也返回过来,这样就重复了,
            // 补 0 之后可以保证返回的都是新数据,避免重复数据
            scan.withStartRow(Bytes.add(lastRowKey, new byte[]{0}));
        }

        //获取结果
        ResultScanner resultScanner = table.getScanner(scan);
        //记录每次迭代的数据条数
        int rowCount = 0;

        for (Result result : resultScanner) {
            if (currentPage == pageNum) {
                HashMap<String, String> map = new HashMap<>();
                String row = Bytes.toString(result.getRow());
                map.put("row", row);
                for (Cell cell : result.listCells()) {
                    String family = Bytes.toString(cell.getFamilyArray(),
                            cell.getFamilyOffset(), cell.getFamilyLength());
                    String qualifier = Bytes.toString(cell.getQualifierArray(),
                            cell.getQualifierOffset(), cell.getQualifierLength());
                    String data = Bytes.toString(cell.getValueArray(),
                            cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
                list.add(map);
            }

            lastRowKey = result.getRow();
            rowCount++;

            //scan 返回的数据是基于 rowkey 有序的,直接判断数据条数即可。
            //当前页面数据获取完毕,退出循环
            if (rowCount >= pageSize) {
                break;
            }
        }

        resultScanner.close();
    }

    table.close();
    return list;
}

3,使用测试
(1)首先创建一个带有预分区的表,并且向表里面初始化一批测试数据。
create 'user', 'info', SPLITS => ['10', '20', '30', '40']

put 'user','10001','info:name','zs'
put 'user','10001','info:address','bj'

put 'user','10002','info:name','ww'
put 'user','10002','info:address','sh'

put 'user','20001','info:name','ls'
put 'user','20001','info:address','sh'

put 'user','20002','info:name','jack'
put 'user','20002','info:address','sh'

put 'user','20003','info:name','hangge'
put 'user','20003','info:address','sh'

put 'user','20004','info:name','xl'
put 'user','20004','info:address','lz'

put 'user','30001','info:name','ls'
put 'user','30001','info:address','hz'

put 'user','30002','info:name','tom'
put 'user','30002','info:address','sh'

(2)编写测试代码进行分页查询:
@RestController
public class HelloController {

    @Autowired
    private HBaseUtils hbaseUtils;

    @GetMapping("/test")
    public void test() throws IOException {
        //筛选出列族为 info,列为 address,且值为 33 的整行数据
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "info".getBytes(),
                "address".getBytes(),
                CompareOperator.EQUAL.EQUAL,
                new SubstringComparator("sh"));
        System.out.println("--- 查询全部数据 ---");
        List<Map<String, String>> data1 = hbaseUtils.getData("user", singleColumnValueFilter);
        System.out.println(data1);
        System.out.println("--- 分页查询:第一页 ---");
        List<Map<String, String>> data2 = hbaseUtils.getData("user", singleColumnValueFilter, 1, 3);
        System.out.println(data2);
        System.out.println("--- 分页查询:第二页 ---");
        List<Map<String, String>> data3 = hbaseUtils.getData("user", singleColumnValueFilter, 2, 3);
        System.out.println(data3);
    }
}

(3)项目启动后访问/test 接口,可以看到控制台输出内容如下:
评论0