生产者-消费者模型在Hudi中的应用

简介: 生产者-消费者模型在Hudi中的应用

介绍

生产者-消费者模型用于解耦生产者与消费者,平衡两者之间的能力不平衡,该模型广泛应用于各个系统中,Hudi也使用了该模型控制对记录的处理,即记录会被生产者生产至队列中,然后由消费者从队列中消费,更具体一点,对于更新操作,生产者会将文件中老的记录放入队列中等待消费者消费,消费后交由HoodieMergeHandle处理;对于插入操作,生产者会将新记录放入队列中等待消费者消费,消费后交由HandleCreateHandle处理。

入口

前面的文章中提到过无论是HoodieCopyOnWriteTable#handleUpdate处理更新时直接生成了一个SparkBoundedInMemoryExecutor对象,还是HoodieCopyOnWriteTable#handleInsert处理插入时生成了一个CopyOnWriteLazyInsertIterable对象,再迭代时调用该对象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor对象。最后两者均会调用SparkBoundedInMemoryExecutor#execute开始记录的处理,该方法核心代码如下

public E execute() {
   try {
     ExecutorCompletionService<Boolean> producerService = startProducers();
     Future<E> future = startConsumer();
     // Wait for consumer to be done
     return future.get();
  } catch (Exception e) {
     throw new HoodieException(e);
  }
}

该方法会启动所有生产者和单个消费者进行处理。

Hudi定义了BoundedInMemoryQueueProducer接口表示生产者,其子类实现如下

  • FunctionBasedQueueProducer,基于Function来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基于迭代器来生产记录,在插入更新时使用。

定义了BoundedInMemoryQueueConsumer类表示消费者,其主要子类实现如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要处理CopyOnWrite表类型时的插入。
  • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要处理MergeOnRead

表类型时的插入,其为CopyOnWriteInsertHandler的子类。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要处理CopyOnWrite表类型时的更新。

整个生产消费相关的类继承结构非常清晰。

对于生产者的启动,startProducers方法核心代码如下

public ExecutorCompletionService<Boolean> startProducers() {
   // Latch to control when and which producer thread will close the queue
   final CountDownLatch latch = new CountDownLatch(producers.size());
   final ExecutorCompletionService<Boolean> completionService =
       new ExecutorCompletionService<Boolean>(executorService);
   producers.stream().map(producer -> {
     return completionService.submit(() -> {
       try {
         preExecute();
         producer.produce(queue);
      } catch (Exception e) {
         logger.error("error producing records", e);
         queue.markAsFailed(e);
         throw e;
      } finally {
         synchronized (latch) {
           latch.countDown();
           if (latch.getCount() == 0) {
             // Mark production as done so that consumer will be able to exit
             queue.close();
          }
        }
      }
       return true;
    });
  }).collect(Collectors.toList());
   return completionService;
}

该方法使用CountDownLatch来协调生产者线程与消费者线程的退出动作,然后调用produce方法开始生产,对于插入更新时的IteratorBasedQueueProducer而言,其核心代码如下

public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
  ...
   while (inputIterator.hasNext()) {
     queue.insertRecord(inputIterator.next());
  }
  ...
}

可以看到只要迭代器还有记录(可能为插入时的新记录或者更新时的旧记录),就会往队列中不断写入。

对于消费者的启动,startConsumer方法的核心代码如下

private Future<E> startConsumer() {
   return consumer.map(consumer -> {
     return executorService.submit(() -> {
      ...
       preExecute();
       try {
         E result = consumer.consume(queue);
         return result;
      } catch (Exception e) {
         queue.markAsFailed(e);
         throw e;
      }
    });
  }).orElse(CompletableFuture.completedFuture(null));
}

消费时会先进行执行前的准备,然后开始消费,其中consume方法的核心代码如下

public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
   Iterator<I> iterator = queue.iterator();
   while (iterator.hasNext()) {
     consumeOneRecord(iterator.next());
  }
   // Notifies done
   finish();
   return getResult();
}

可以看到只要队列中还有记录,就可以获取该记录,然后调用不同BoundedInMemoryQueueConsumer子类的consumeOneRecord进行更新插入处理。

值得一提的是Hudi对队列进行了流控,生产者不能无限制地将记录写入队列中,队列缓存的大小由用户配置,队列能放入记录的条数由采样的记录大小和队列缓存大小控制。

在生产时,会调用BoundedInMemoryQueue#insertRecord将记录写入队列,其核心代码如下

public void insertRecord(I t) throws Exception {
  ...
   rateLimiter.acquire();
   // We are retrieving insert value in the record queueing thread to offload computation
   // around schema validation
   // and record creation to it.
   final O payload = transformFunction.apply(t);
   adjustBufferSizeIfNeeded(payload);
   queue.put(Option.of(payload));
}

首先获取一个许可(Semaphore),未成功获取会被阻塞直至成功获取,然后获取记录的负载以便调整队列,然后放入内部队列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代码如下

private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
   if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
     return;
  }
   final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
   final long newAvgRecordSizeInBytes =
       Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
   final int newRateLimit =
      (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));
   // If there is any change in number of records to cache then we will either release (if it increased) or acquire
   // (if it decreased) to adjust rate limiting to newly computed value.
   if (newRateLimit > currentRateLimit) {
     rateLimiter.release(newRateLimit - currentRateLimit);
  } else if (newRateLimit < currentRateLimit) {
     rateLimiter.acquire(currentRateLimit - newRateLimit);
  }
   currentRateLimit = newRateLimit;
   avgRecordSizeInBytes = newAvgRecordSizeInBytes;
   numSamples++;
}

首先看是否已经达到采样频率,然后计算新的记录平均大小和限流速率,如果新的限流速率大于当前速率,则可释放一些许可(供阻塞的生产者获取后继续生产),否则需要获取(回收)一些许可(许可变少后生产速率自然就降低了)。该操作可根据采样的记录大小动态调节速率,不至于在记录负载太大和记录负载太小时,放入同等个数,从而起到动态调节作用。

在消费时,会调用BoundedInMemoryQueue#readNextRecord读取记录,其核心代码如下

private Option<O> readNextRecord() {
  ...
   rateLimiter.release();
   Option<O> newRecord = Option.empty();
   while (expectMoreRecords()) {
     try {
       throwExceptionIfFailed();
       newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
       if (newRecord != null) {
         break;
      }
    } catch (InterruptedException e) {
       throw new HoodieException(e);
    }
  }
  ...
   if (newRecord != null && newRecord.isPresent()) {
     return newRecord;
  } else {
     // We are done reading all the records from internal iterator.
     this.isReadDone.set(true);
     return Option.empty();
  }
}

可以看到首先会释放一个许可,然后判断是否还可以读取记录(还在生产或者停止生产但队列不为空都可读取),然后从内部队列获取记录或返回。

上述便是生产者-消费者在Hudi中应用的分析。

总结

Hudi采用了生产者-消费者模型来控制记录的处理,与传统多生产者-多消费者模型不同的是,Hudi现在只支持多生产者-单消费者模型,单消费者意味着Hudi暂时不支持文件的并发写入。而对于生产消费的队列的实现,Hudi并未仅仅只是基于LinkedBlockingQueue,而是采用了更精细化的速率控制,保证速率会随着记录负载大小的变化和配置的队列缓存大小而动态变化,这也降低了系统发生OOM的概率。

目录
相关文章
|
4月前
|
消息中间件 缓存 监控
Flink背压原理以及解决优化
Flink背压原理以及解决优化
319 0
|
存储 SQL 缓存
hudi概念讲解
hudi概念讲解
hudi概念讲解
|
消息中间件 分布式计算 Kafka
SparkStreaming实时任务处理的三种语义
SparkStreaming实时任务处理的三种语义
|
消息中间件 缓存 资源调度
在 Flink 算子中使用多线程如何保证不丢数据?
本人通过分析痛点、同步批量请求优化为异步请求、多线程 Client 模式、Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。
在 Flink 算子中使用多线程如何保证不丢数据?
|
30天前
|
消息中间件 分布式计算 Kafka
流计算引擎数据问题之MillWheel 和 Flink 实现数据流的同步处理如何解决
流计算引擎数据问题之MillWheel 和 Flink 实现数据流的同步处理如何解决
26 0
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在Flink算子内部使用异步IO可以通过什么办法实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
监控 分布式数据库 流计算
Flink 异步IO优化任务
Flink 异步IO优化任务
77 0
|
存储 关系型数据库 MySQL
如何实现基于Flink的高吞吐、精确一致性数据入湖
APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。
|
消息中间件 资源调度 Oracle
对Flink流处理模型的抽象
对Flink流处理模型的抽象
对Flink流处理模型的抽象
|
消息中间件 SQL 缓存
Exactly Once语义在Flink中的实现
Exactly Once语义在Flink中的实现
193 0
Exactly Once语义在Flink中的实现