生产者-消费者模型在Hudi中的应用

简介: 生产者-消费者模型在Hudi中的应用

介绍

生产者-消费者模型用于解耦生产者与消费者,平衡两者之间的能力不平衡,该模型广泛应用于各个系统中,Hudi也使用了该模型控制对记录的处理,即记录会被生产者生产至队列中,然后由消费者从队列中消费,更具体一点,对于更新操作,生产者会将文件中老的记录放入队列中等待消费者消费,消费后交由HoodieMergeHandle处理;对于插入操作,生产者会将新记录放入队列中等待消费者消费,消费后交由HandleCreateHandle处理。

入口

前面的文章中提到过无论是HoodieCopyOnWriteTable#handleUpdate处理更新时直接生成了一个SparkBoundedInMemoryExecutor对象,还是HoodieCopyOnWriteTable#handleInsert处理插入时生成了一个CopyOnWriteLazyInsertIterable对象,再迭代时调用该对象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor对象。最后两者均会调用SparkBoundedInMemoryExecutor#execute开始记录的处理,该方法核心代码如下

public E execute() {
   try {
     ExecutorCompletionService<Boolean> producerService = startProducers();
     Future<E> future = startConsumer();
     // Wait for consumer to be done
     return future.get();
  } catch (Exception e) {
     throw new HoodieException(e);
  }
}

该方法会启动所有生产者和单个消费者进行处理。

Hudi定义了BoundedInMemoryQueueProducer接口表示生产者,其子类实现如下

  • FunctionBasedQueueProducer,基于Function来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基于迭代器来生产记录,在插入更新时使用。

定义了BoundedInMemoryQueueConsumer类表示消费者,其主要子类实现如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要处理CopyOnWrite表类型时的插入。
  • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要处理MergeOnRead

表类型时的插入,其为CopyOnWriteInsertHandler的子类。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要处理CopyOnWrite表类型时的更新。

整个生产消费相关的类继承结构非常清晰。

对于生产者的启动,startProducers方法核心代码如下

public ExecutorCompletionService<Boolean> startProducers() {
   // Latch to control when and which producer thread will close the queue
   final CountDownLatch latch = new CountDownLatch(producers.size());
   final ExecutorCompletionService<Boolean> completionService =
       new ExecutorCompletionService<Boolean>(executorService);
   producers.stream().map(producer -> {
     return completionService.submit(() -> {
       try {
         preExecute();
         producer.produce(queue);
      } catch (Exception e) {
         logger.error("error producing records", e);
         queue.markAsFailed(e);
         throw e;
      } finally {
         synchronized (latch) {
           latch.countDown();
           if (latch.getCount() == 0) {
             // Mark production as done so that consumer will be able to exit
             queue.close();
          }
        }
      }
       return true;
    });
  }).collect(Collectors.toList());
   return completionService;
}

该方法使用CountDownLatch来协调生产者线程与消费者线程的退出动作,然后调用produce方法开始生产,对于插入更新时的IteratorBasedQueueProducer而言,其核心代码如下

public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
  ...
   while (inputIterator.hasNext()) {
     queue.insertRecord(inputIterator.next());
  }
  ...
}

可以看到只要迭代器还有记录(可能为插入时的新记录或者更新时的旧记录),就会往队列中不断写入。

对于消费者的启动,startConsumer方法的核心代码如下

private Future<E> startConsumer() {
   return consumer.map(consumer -> {
     return executorService.submit(() -> {
      ...
       preExecute();
       try {
         E result = consumer.consume(queue);
         return result;
      } catch (Exception e) {
         queue.markAsFailed(e);
         throw e;
      }
    });
  }).orElse(CompletableFuture.completedFuture(null));
}

消费时会先进行执行前的准备,然后开始消费,其中consume方法的核心代码如下

public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
   Iterator<I> iterator = queue.iterator();
   while (iterator.hasNext()) {
     consumeOneRecord(iterator.next());
  }
   // Notifies done
   finish();
   return getResult();
}

可以看到只要队列中还有记录,就可以获取该记录,然后调用不同BoundedInMemoryQueueConsumer子类的consumeOneRecord进行更新插入处理。

值得一提的是Hudi对队列进行了流控,生产者不能无限制地将记录写入队列中,队列缓存的大小由用户配置,队列能放入记录的条数由采样的记录大小和队列缓存大小控制。

在生产时,会调用BoundedInMemoryQueue#insertRecord将记录写入队列,其核心代码如下

public void insertRecord(I t) throws Exception {
  ...
   rateLimiter.acquire();
   // We are retrieving insert value in the record queueing thread to offload computation
   // around schema validation
   // and record creation to it.
   final O payload = transformFunction.apply(t);
   adjustBufferSizeIfNeeded(payload);
   queue.put(Option.of(payload));
}

首先获取一个许可(Semaphore),未成功获取会被阻塞直至成功获取,然后获取记录的负载以便调整队列,然后放入内部队列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代码如下

private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
   if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
     return;
  }
   final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
   final long newAvgRecordSizeInBytes =
       Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
   final int newRateLimit =
      (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));
   // If there is any change in number of records to cache then we will either release (if it increased) or acquire
   // (if it decreased) to adjust rate limiting to newly computed value.
   if (newRateLimit > currentRateLimit) {
     rateLimiter.release(newRateLimit - currentRateLimit);
  } else if (newRateLimit < currentRateLimit) {
     rateLimiter.acquire(currentRateLimit - newRateLimit);
  }
   currentRateLimit = newRateLimit;
   avgRecordSizeInBytes = newAvgRecordSizeInBytes;
   numSamples++;
}

首先看是否已经达到采样频率,然后计算新的记录平均大小和限流速率,如果新的限流速率大于当前速率,则可释放一些许可(供阻塞的生产者获取后继续生产),否则需要获取(回收)一些许可(许可变少后生产速率自然就降低了)。该操作可根据采样的记录大小动态调节速率,不至于在记录负载太大和记录负载太小时,放入同等个数,从而起到动态调节作用。

在消费时,会调用BoundedInMemoryQueue#readNextRecord读取记录,其核心代码如下

private Option<O> readNextRecord() {
  ...
   rateLimiter.release();
   Option<O> newRecord = Option.empty();
   while (expectMoreRecords()) {
     try {
       throwExceptionIfFailed();
       newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
       if (newRecord != null) {
         break;
      }
    } catch (InterruptedException e) {
       throw new HoodieException(e);
    }
  }
  ...
   if (newRecord != null && newRecord.isPresent()) {
     return newRecord;
  } else {
     // We are done reading all the records from internal iterator.
     this.isReadDone.set(true);
     return Option.empty();
  }
}

可以看到首先会释放一个许可,然后判断是否还可以读取记录(还在生产或者停止生产但队列不为空都可读取),然后从内部队列获取记录或返回。

上述便是生产者-消费者在Hudi中应用的分析。

总结

Hudi采用了生产者-消费者模型来控制记录的处理,与传统多生产者-多消费者模型不同的是,Hudi现在只支持多生产者-单消费者模型,单消费者意味着Hudi暂时不支持文件的并发写入。而对于生产消费的队列的实现,Hudi并未仅仅只是基于LinkedBlockingQueue,而是采用了更精细化的速率控制,保证速率会随着记录负载大小的变化和配置的队列缓存大小而动态变化,这也降低了系统发生OOM的概率。

目录
相关文章
|
7月前
|
消息中间件 缓存 监控
Flink背压原理以及解决优化
Flink背压原理以及解决优化
431 0
|
消息中间件 分布式计算 Kafka
SparkStreaming实时任务处理的三种语义
SparkStreaming实时任务处理的三种语义
|
消息中间件 缓存 资源调度
在 Flink 算子中使用多线程如何保证不丢数据?
本人通过分析痛点、同步批量请求优化为异步请求、多线程 Client 模式、Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。
在 Flink 算子中使用多线程如何保证不丢数据?
|
4月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
250 0
|
7月前
|
消息中间件 分布式计算 Java
流计算与批处理的区别是什么?请举例说明。
流计算与批处理的区别是什么?请举例说明。
109 0
|
7月前
|
监控 分布式数据库 流计算
Flink 异步IO优化任务
Flink 异步IO优化任务
95 0
|
存储 数据处理 容器
生产者消费者模型(一)
生产者消费者模型
147 0
|
存储
生产者消费者模型(二)
生产者消费者模型
74 0
|
消息中间件 资源调度 Oracle
对Flink流处理模型的抽象
对Flink流处理模型的抽象
对Flink流处理模型的抽象
|
消息中间件 SQL 分布式计算
Sparkstreaming 介绍-流计算和批计算的区别 | 学习笔记
快速学习 Sparkstreaming 介绍-流计算和批计算的区别
Sparkstreaming 介绍-流计算和批计算的区别 | 学习笔记