1. 介绍
前面分析了基于过滤器的索引,接着分析基于外部存储系统的索引实现:HBaseIndex。对于想自定义实现Index具有一定的借鉴作用。
2. 分析
HBaseIndex也是HoodieIndex的子类实现,其实现了父类的两个核心方法。
// 给输入记录RDD打位置标签 public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable); // 更新位置信息 public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);
在写入数据过程中,会调用tagLocation
给输入记录打位置标签,其核心代码如下
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) { return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable.getMetaClient()), true); }
可以看到该方法主要使用了locationTagFunction
Function来处理原始记录,其核心代码如下
private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> locationTagFunction( HoodieTableMetaClient metaClient) { return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum, hoodieRecordIterator) -> { // 每次取的批次大小 int multiGetBatchSize = config.getHbaseIndexGetBatchSize(); // 获取HBase连接 synchronized (HBaseIndex.class) { if (hbaseConnection == null || hbaseConnection.isClosed()) { hbaseConnection = getHBaseConnection(); } } List<HoodieRecord<T>> taggedRecords = new ArrayList<>(); HTable hTable = null; try { // 获取配置的表 hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); List<Get> statements = new ArrayList<>(); List<HoodieRecord> currentBatchOfRecords = new LinkedList<>(); // 遍历该分区上的记录 while (hoodieRecordIterator.hasNext()) { HoodieRecord rec = hoodieRecordIterator.next(); // 根据recordKey生成Get statements.add(generateStatement(rec.getRecordKey())); currentBatchOfRecords.add(rec); // 达到批量大小或者遍历完记录 if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) { // 获取结果 Result[] results = doGet(hTable, statements); // 清空便于GC回收 statements.clear(); for (Result result : results) { // 移除结果对应的的HoodieRecord HoodieRecord currentRecord = currentBatchOfRecords.remove(0); if (result.getRow() != null) { // 取出key, commit时间,文件ID和分区路径 String keyFromResult = Bytes.toString(result.getRow()); String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); // 检查是否为合法的提交(包含在timeline或者小于最新的一次commit) if (checkIfValidCommit(metaClient, commitTs)) { // 重新生成HoodieRecord currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath), currentRecord.getData()); currentRecord.unseal(); // 设置位置信息 currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); currentRecord.seal(); taggedRecords.add(currentRecord); // the key from Result and the key being processed should be same assert (currentRecord.getRecordKey().contentEquals(keyFromResult)); } else { // 非法提交,也标记为已打完标签 taggedRecords.add(currentRecord); } } else { // 标记为已打完标签 taggedRecords.add(currentRecord); } } } } } return taggedRecords.iterator(); }; }
可以看到从HBase中取位置信息流程非常简单,即遍历指定分区上所有记录,然后批量生成recordKey从HBase索引表(表名自定义配置)取对应的信息,然后生成位置信息。
当写完数据后,需要调用updateLocation
更新记录的位置信息,其核心代码如下
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) { // 根据配置(hoodie.index.hbase.qps.allocator.class)生成Allocator final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); // 根据Allocator进行初始化 setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); // 使用Function处理 JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); // 缓存状态RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel()); return writeStatusJavaRDD; }
其中updateLocationFunction
核心代码如下
private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() { return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition, statusIterator) -> { List<WriteStatus> writeStatusList = new ArrayList<>(); // 获取HBase连接 synchronized (HBaseIndex.class) { if (hbaseConnection == null || hbaseConnection.isClosed()) { hbaseConnection = getHBaseConnection(); } } try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { // 遍历状态信息 while (statusIterator.hasNext()) { WriteStatus writeStatus = statusIterator.next(); List<Mutation> mutations = new ArrayList<>(); try { for (HoodieRecord rec : writeStatus.getWrittenRecords()) { if (!writeStatus.isErrored(rec.getKey())) { // 获取新的位置信息 Option<HoodieRecordLocation> loc = rec.getNewLocation(); if (loc.isPresent()) { // 新的位置信息存在 if (rec.getCurrentLocation() != null) { // 当前位置信息存在 // 表示更新,无需更新 continue; } // 根据HoodieRecord信息初始化Put Put put = new Put(Bytes.toBytes(rec.getRecordKey())); put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime())); put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId())); put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath())); mutations.add(put); } else { // 新的位置不存在 // 表示删除了该记录 Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey())); mutations.add(delete); } } if (mutations.size() < multiPutBatchSize) { continue; } // 更新 doMutations(mutator, mutations); } // 处理剩余的更新 doMutations(mutator, mutations); } writeStatusList.add(writeStatus); } } return writeStatusList.iterator(); }; }
可以看到当写完数据后,会更新位置信息,通过WriteStatus中的HoodieRecord的位置信息判断是否需要更新位置信息,对于更新无需要更新,对于新插入需要更新,对于删除需要删除HBase中存储的信息。
3. 总结
Hudi内置了HBase外置存储系统索引的实现,用户可直接配置HBase索引,将记录索引信息存入HBase,当然用户也可自定义实现其他类型索引。