Bloom Filter在Hudi中的应用

简介: Bloom Filter在Hudi中的应用

介绍

Bloom Filter可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,主要缺点是存在一定的误判率:当其判断元素存在时,实际上元素可能并不存在。而当判定不存在时,则元素一定不存在,Bloom Filter在对精确度要求不太严格的大数据量场景下运用十分广泛。


引入

为何要引入Bloom Filter?这是Hudi为加快数据upsert采用的一种解决方案,即判断record是否已经在文件中存在,若存在,则更新,若不存在,则插入。对于upsert显然无法容忍出现误判,否则可能会出现应该插入和变成了更新的错误,那么Hudi是如何解决误判问题的呢?一种简单办法是当Bloom Filter判断该元素存在时,再去文件里二次确认该元素是否真的存在;而当Bloom Filter判断该元素不存在时,则无需读文件,通过二次确认的方法来规避Bloom Filter的误判问题,实际上这也是Hudi采取的方案,值得一提的是,现在Delta暂时还不支持Bloom Filter,其判断一条记录是否存在是直接通过一次全表join来实现,效率比较低下。接下来我们来分析Bloom Filter在Hudi中的应用。

流程

Hudi从上游系统(Kafka、DFS等)消费一批数据后,会根据用户配置的写入模式(insert、upsert、bulkinsert)写入Hudi数据集。而当配置为upsert时,意味着需要将数据插入更新至Hudi数据集,而第一步是需要标记哪些记录已经存在,哪些记录不存在,然后,对于存在的记录进行更新,不存在记录进行插入。

HoodieWriteClient中提供了对应三种写入模式的方法(#insert、#upsert、#bulkinsert),对于使用了Bloom Filter的#upsert方法而言,其核心源代码如下

public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
  ...
   // perform index loop up to get existing location of records
   JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
  ...
   return upsertRecordsInternal(taggedRecords, commitTime, table, true);
}

可以看到首先利用索引给记录打标签,然后再进行更新,下面主要分析打标签的过程。

对于索引,Hudi提供了四种索引方式的实现:HBaseIndexHoodieBloomIndexHoodieGlobalBloomIndexInMemoryHashIndex,默认使用HoodieBloomIndex。其中HoodieGlobalBloomIndex与HoodieBloomIndex的区别是前者会读取所有分区文件,而后者只读取记录所存在的分区下的文件。下面以HoodieBloomIndex为例进行分析。

HoodieBloomIndex#tagLocation核心代码如下
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
     HoodieTable<T> hoodieTable) {
   // Step 0: cache the input record RDD
   if (config.getBloomIndexUseCaching()) {
     recordRDD.persist(config.getBloomIndexInputStorageLevel());
  }
   // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
   JavaPairRDD<String, String> partitionRecordKeyPairRDD =
       recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));
   // Lookup indexes for all the partition/recordkey pair
   JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD =
       lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
   // Cache the result, for subsequent stages.
   if (config.getBloomIndexUseCaching()) {
     keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
  }
   // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
   // Cost: 4 sec.
   JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);
   if (config.getBloomIndexUseCaching()) {
     recordRDD.unpersist(); // unpersist the input Record RDD
     keyFilenamePairRDD.unpersist();
  }
   return taggedRecordRDD;
}

该过程会缓存记录以便优化数据的加载。首先从记录中解析出对应的分区路径 -> key,接着查看索引,然后将位置信息(存在于哪个文件)回推到记录中。

HoodieBloomIndex#lookup核心代码如下
private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
     JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
     final HoodieTable hoodieTable) {
   // Obtain records per partition, in the incoming records
   Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
   List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
   // Step 2: Load all involved files as <Partition, filename> pairs
   List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
       loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
   final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
       fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
   // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
   // that contains it.
   Map<String, Long> comparisonsPerFileGroup =
       computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
   int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
   int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
   return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
       comparisonsPerFileGroup);
}

该方法首先会计算出每个分区有多少条记录和影响的分区有哪些,然后加载影响的分区的文件,最后计算并行度后,开始找记录真正存在的文件。

对于#loadInvolvedFiles方法而言,其会查询指定分区分区下所有的数据文件(parquet格式),并且如果开启了hoodie.bloom.index.prune.by.ranges,还会读取文件中的最小key和最大key(为加速后续的查找)。

HoodieBloomIndex#findMatchingFilesForRecordKeys核心代码如下
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
     final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
     JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
     Map<String, Long> fileGroupToComparisons) {
   JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
       explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
   if (config.useBloomIndexBucketizedChecking()) {
     Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
         config.getBloomIndexKeysPerBucket());
     fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
        .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
  } else {
     fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
  }
   return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
      .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
      .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
          .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
               new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
          .collect(Collectors.toList()).iterator());
}

该方法首先会查找记录需要进行比对的文件,然后再查询的记录的位置信息。

其中,对于#explodeRecordRDDWithFileComparisons方法而言,其会借助树/链表结构构造的文件过滤器来加速记录对应文件的查找(每个record可能会对应多个文件)。

而使用Bloom Filter的核心逻辑承载在HoodieBloomIndexCheckFunction,HoodieBloomIndexCheckFunction$LazyKeyCheckIterator该迭代器完成了记录对应文件的实际查找过程,查询的核心逻辑在computeNext`中,其核心代码如下

protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
     List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
     try {
       // process one file in each go.
       while (inputItr.hasNext()) {
         Tuple2<String, HoodieKey> currentTuple = inputItr.next();
         String fileId = currentTuple._1;
         String partitionPath = currentTuple._2.getPartitionPath();
         String recordKey = currentTuple._2.getRecordKey();
         Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
         // lazily init state
         if (keyLookupHandle == null) {
           keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
        }
         // if continue on current file
         if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
           keyLookupHandle.addKey(recordKey);
        } else {
           // do the actual checking of file & break out
           ret.add(keyLookupHandle.getLookupResult());
           keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
           keyLookupHandle.addKey(recordKey);
           break;
        }
      }
       // handle case, where we ran out of input, close pending work, update return val
       if (!inputItr.hasNext()) {
         ret.add(keyLookupHandle.getLookupResult());
      }
    } catch (Throwable e) {
       if (e instanceof HoodieException) {
         throw e;
      }
       throw new HoodieIndexException("Error checking bloom filter index. ", e);
    }
     return ret;
  }

该方法每次迭代只会处理一个文件,每次处理时都会生成HoodieKeyLookupHandle,然后会添加recordKey,处理完后再获取查询结果。

其中HoodieKeyLookupHandle#addKey方法核心代码如下

public void addKey(String recordKey) {
   // check record key against bloom filter of current file & add to possible keys if needed
   if (bloomFilter.mightContain(recordKey)) {
    ...
     candidateRecordKeys.add(recordKey);
  }
   totalKeysChecked++;
}

可以看到,这里使用到了Bloom Filter来判断该记录是否存在,如果存在,则加入到候选队列中,等待进一步判断;若不存在,则无需额外处理,其中Bloom Filter会在创建HoodieKeyLookupHandle实例时初始化(从指定文件中读取Bloom Filter)。

HoodieKeyLookupHandle#getLookupResult方法核心代码如下
public KeyLookupResult getLookupResult() {
  ...
   HoodieDataFile dataFile = getLatestDataFile();
   List<String> matchingKeys =
       checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
  ...
   return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
       dataFile.getCommitTime(), matchingKeys);
}

该方法首先获取指定分区下的最新数据文件,然后判断数据文件存在哪些recordKey,并将其封装进KeyLookupResult后返回。其中#checkCandidatesAgainstFile会读取文件中所有的recordKey,判断是否存在于candidateRecordKeys,这便完成了进一步确认。

到这里即完成了record存在于哪些文件的所有查找,查找完后会进行进一步处理,后续再给出分析。

总结

Hudi引入Bloom Filter是为了加速upsert过程,并将其存入parquet数据文件中的Footer中,在读取文件时会从Footer中读取该Bloom Filter。在利用Bloom Filter来判断记录是否存在时,会采用二次确认的方式规避Bloom Filter的误判问题。

目录
相关文章
|
7月前
|
数据处理 分布式数据库 Apache
一文聊透Apache Hudi的索引设计与应用
一文聊透Apache Hudi的索引设计与应用
354 3
|
7月前
|
存储 缓存 关系型数据库
海量数据去重的hash,bitmap与布隆过滤器Bloom Filter
海量数据去重的hash,bitmap与布隆过滤器Bloom Filter
180 1
|
28天前
|
存储 缓存 算法
【C++】BitSet和Bloom_Filter
位图(Bitmap)和布隆过滤器(Bloom Filter)是两种高效的数据结构。位图使用每一位二进制数表示数据项的存在状态,适用于精确判断元素存在性,广泛应用于图形图像处理、数据压缩、数据库索引等领域。布隆过滤器通过多个哈希函数将元素映射到位数组,用于快速判断元素是否可能属于集合,特别适合处理大规模数据,尽管存在误判率,但在网页缓存、网络数据包过滤等场景中表现出色。两者在空间效率、查询速度及误判率方面各有优势,适用于不同的应用场景。
34 4
|
5月前
布隆过滤器(Bloom Filter)的原理和实现
布隆过滤器(Bloom Filter)的原理和实现
|
存储 关系型数据库 分布式数据库
"filter"的下推
"filter"的下推
51 1
|
缓存 算法 NoSQL
布隆过滤器(Bloom Filter)从入门到出土
布隆过滤器(Bloom Filter)从入门到出土
|
数据采集 缓存 Serverless
布隆过滤器(Bloom Filter)
布隆过滤器(Bloom Filter)
123 0
|
存储 缓存 NoSQL
ES通过 Enrich Processor 的 Ingest Pipeline 实现关系数据库中的表关联(join)操作
ES通过 Enrich Processor 的 Ingest Pipeline 实现关系数据库中的表关联(join)操作
ES通过 Enrich Processor 的 Ingest Pipeline 实现关系数据库中的表关联(join)操作
|
存储 数据采集 缓存
布隆过滤器 Bloom Filter
布隆过滤器 Bloom Filter
布隆过滤器 Bloom Filter
|
消息中间件 JSON 流计算
flink bitmap去重并写入RabitMQ
flink bitmap去重并写入RabitMQ