生产者-消费者模型在Hudi中的应用

简介: 生产者-消费者模型在Hudi中的应用

介绍

生产者-消费者模型用于解耦生产者与消费者,平衡两者之间的能力不平衡,该模型广泛应用于各个系统中,Hudi也使用了该模型控制对记录的处理,即记录会被生产者生产至队列中,然后由消费者从队列中消费,更具体一点,对于更新操作,生产者会将文件中老的记录放入队列中等待消费者消费,消费后交由HoodieMergeHandle处理;对于插入操作,生产者会将新记录放入队列中等待消费者消费,消费后交由HandleCreateHandle处理。

入口

前面的文章中提到过无论是HoodieCopyOnWriteTable#handleUpdate处理更新时直接生成了一个SparkBoundedInMemoryExecutor对象,还是HoodieCopyOnWriteTable#handleInsert处理插入时生成了一个CopyOnWriteLazyInsertIterable对象,再迭代时调用该对象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor对象。最后两者均会调用SparkBoundedInMemoryExecutor#execute开始记录的处理,该方法核心代码如下

public E execute() {
   try {
     ExecutorCompletionService<Boolean> producerService = startProducers();
     Future<E> future = startConsumer();
     // Wait for consumer to be done
     return future.get();
  } catch (Exception e) {
     throw new HoodieException(e);
  }
}

该方法会启动所有生产者和单个消费者进行处理。

Hudi定义了BoundedInMemoryQueueProducer接口表示生产者,其子类实现如下

  • FunctionBasedQueueProducer,基于Function来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基于迭代器来生产记录,在插入更新时使用。

定义了BoundedInMemoryQueueConsumer类表示消费者,其主要子类实现如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要处理CopyOnWrite表类型时的插入。
  • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要处理MergeOnRead

表类型时的插入,其为CopyOnWriteInsertHandler的子类。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要处理CopyOnWrite表类型时的更新。

整个生产消费相关的类继承结构非常清晰。

对于生产者的启动,startProducers方法核心代码如下

public ExecutorCompletionService<Boolean> startProducers() {
   // Latch to control when and which producer thread will close the queue
   final CountDownLatch latch = new CountDownLatch(producers.size());
   final ExecutorCompletionService<Boolean> completionService =
       new ExecutorCompletionService<Boolean>(executorService);
   producers.stream().map(producer -> {
     return completionService.submit(() -> {
       try {
         preExecute();
         producer.produce(queue);
      } catch (Exception e) {
         logger.error("error producing records", e);
         queue.markAsFailed(e);
         throw e;
      } finally {
         synchronized (latch) {
           latch.countDown();
           if (latch.getCount() == 0) {
             // Mark production as done so that consumer will be able to exit
             queue.close();
          }
        }
      }
       return true;
    });
  }).collect(Collectors.toList());
   return completionService;
}

该方法使用CountDownLatch来协调生产者线程与消费者线程的退出动作,然后调用produce方法开始生产,对于插入更新时的IteratorBasedQueueProducer而言,其核心代码如下

public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
  ...
   while (inputIterator.hasNext()) {
     queue.insertRecord(inputIterator.next());
  }
  ...
}

可以看到只要迭代器还有记录(可能为插入时的新记录或者更新时的旧记录),就会往队列中不断写入。

对于消费者的启动,startConsumer方法的核心代码如下

private Future<E> startConsumer() {
   return consumer.map(consumer -> {
     return executorService.submit(() -> {
      ...
       preExecute();
       try {
         E result = consumer.consume(queue);
         return result;
      } catch (Exception e) {
         queue.markAsFailed(e);
         throw e;
      }
    });
  }).orElse(CompletableFuture.completedFuture(null));
}

消费时会先进行执行前的准备,然后开始消费,其中consume方法的核心代码如下

public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
   Iterator<I> iterator = queue.iterator();
   while (iterator.hasNext()) {
     consumeOneRecord(iterator.next());
  }
   // Notifies done
   finish();
   return getResult();
}

可以看到只要队列中还有记录,就可以获取该记录,然后调用不同BoundedInMemoryQueueConsumer子类的consumeOneRecord进行更新插入处理。

值得一提的是Hudi对队列进行了流控,生产者不能无限制地将记录写入队列中,队列缓存的大小由用户配置,队列能放入记录的条数由采样的记录大小和队列缓存大小控制。

在生产时,会调用BoundedInMemoryQueue#insertRecord将记录写入队列,其核心代码如下

public void insertRecord(I t) throws Exception {
  ...
   rateLimiter.acquire();
   // We are retrieving insert value in the record queueing thread to offload computation
   // around schema validation
   // and record creation to it.
   final O payload = transformFunction.apply(t);
   adjustBufferSizeIfNeeded(payload);
   queue.put(Option.of(payload));
}

首先获取一个许可(Semaphore),未成功获取会被阻塞直至成功获取,然后获取记录的负载以便调整队列,然后放入内部队列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代码如下

private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
   if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
     return;
  }
   final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
   final long newAvgRecordSizeInBytes =
       Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
   final int newRateLimit =
      (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));
   // If there is any change in number of records to cache then we will either release (if it increased) or acquire
   // (if it decreased) to adjust rate limiting to newly computed value.
   if (newRateLimit > currentRateLimit) {
     rateLimiter.release(newRateLimit - currentRateLimit);
  } else if (newRateLimit < currentRateLimit) {
     rateLimiter.acquire(currentRateLimit - newRateLimit);
  }
   currentRateLimit = newRateLimit;
   avgRecordSizeInBytes = newAvgRecordSizeInBytes;
   numSamples++;
}

首先看是否已经达到采样频率,然后计算新的记录平均大小和限流速率,如果新的限流速率大于当前速率,则可释放一些许可(供阻塞的生产者获取后继续生产),否则需要获取(回收)一些许可(许可变少后生产速率自然就降低了)。该操作可根据采样的记录大小动态调节速率,不至于在记录负载太大和记录负载太小时,放入同等个数,从而起到动态调节作用。

在消费时,会调用BoundedInMemoryQueue#readNextRecord读取记录,其核心代码如下

private Option<O> readNextRecord() {
  ...
   rateLimiter.release();
   Option<O> newRecord = Option.empty();
   while (expectMoreRecords()) {
     try {
       throwExceptionIfFailed();
       newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
       if (newRecord != null) {
         break;
      }
    } catch (InterruptedException e) {
       throw new HoodieException(e);
    }
  }
  ...
   if (newRecord != null && newRecord.isPresent()) {
     return newRecord;
  } else {
     // We are done reading all the records from internal iterator.
     this.isReadDone.set(true);
     return Option.empty();
  }
}

可以看到首先会释放一个许可,然后判断是否还可以读取记录(还在生产或者停止生产但队列不为空都可读取),然后从内部队列获取记录或返回。

上述便是生产者-消费者在Hudi中应用的分析。

总结

Hudi采用了生产者-消费者模型来控制记录的处理,与传统多生产者-多消费者模型不同的是,Hudi现在只支持多生产者-单消费者模型,单消费者意味着Hudi暂时不支持文件的并发写入。而对于生产消费的队列的实现,Hudi并未仅仅只是基于LinkedBlockingQueue,而是采用了更精细化的速率控制,保证速率会随着记录负载大小的变化和配置的队列缓存大小而动态变化,这也降低了系统发生OOM的概率。

目录
相关文章
|
分布式计算 分布式数据库 Spark
17张图带你彻底理解Hudi Upsert原理
17张图带你彻底理解Hudi Upsert原理
852 1
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
227 0
|
SQL Oracle 关系型数据库
hive中将单行拆分成多行总结
hive 中实现拆分字段到多行
8955 0
|
SQL 分布式计算 Java
数据治理之元数据管理的利器——Atlas入门宝典(二)
随着数字化转型的工作推进,数据治理的工作已经被越来越多的公司提上了日程。作为Hadoop生态最紧密的元数据管理与发现工具,Atlas在其中扮演着重要的位置。但是其官方文档不是很丰富,也不够详细。所以整理了这份文档供大家学习使用。
3144 1
数据治理之元数据管理的利器——Atlas入门宝典(二)
心理健康管理系统 【毕业设计系统】
这篇文章介绍了一个心理健康管理系统的毕业设计项目,展示了系统的功能分布、登录注册界面以及不同用户角色的后台管理界面,并提供了技术栈和效果截图。
心理健康管理系统 【毕业设计系统】
|
Shell 网络安全 开发工具
【已解决】SSL certificate problem: self signed certificate
SSL certificate problem: self signed certificate
1978 2
|
弹性计算 算法 应用服务中间件
nginx配置访问密码,实现用户输入用户名密码才能访
如果我们在 nginx 下搭建了一些站点,但是由于站点内容或者流量的关系,我们并不想让所有人都能正常访问,那么我们可以设置访问认证。只有让用户输入正确的用户名和密码才能正常访问。效果如下:
3272 0
|
12月前
|
XML Java Maven
idea配置maven步骤及常见问题
本文介绍了在IDEA中配置Maven的详细步骤,包括Maven的下载、系统环境变量的配置、Maven本地仓库的设置、镜像加速的配置,以及在IDEA中指定Maven路径和配置文件。同时,还提供了解决每次新建项目需要重新手动配置Maven问题的方法。
idea配置maven步骤及常见问题
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Oracle 关系型数据库 MySQL
flink cdc 转换问题之类型转换如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。