一文了解Kafka的消息收集器RecordAccumulate

简介: 一文了解Kafka的消息收集器RecordAccumulate

〇、前言

在上一篇文章《连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka》中,我们介绍了Main Thread的工作原理,那么在本篇文章中,我们继续介绍第二部分内容:RecordAccumulator

在介绍原理之前,大家再重温一下Producer端的整体架构,图示如下所示:

这个图看不懂没有关系,我们会在介绍Producer端原理时一一介绍每个部分的含义及其所复杂的功能。

一、RecordAccumulator

在上文中,我们介绍了主线程(Main Thread)的执行流程,当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner),最后会暂存到消息收集器(RecordAccumulator)中,那么,本节就来针对其进行介绍。

RecordAccumulator的主要作用是暂存Main Thread发送过来的消息,然后Sender Thread就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数,提升性能效率。通过参数buffer.memory可以设置缓存大小(默认32M)。

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3210241024);

由于RecordAccumulator的缓存空间有限,如果空间被占满,那么当我们再次调用KafkaProducer的send(...)方法的时候,就会出现阻塞(默认60秒,可以通过参数max.block.ms来配置),如果阻塞超时,则会抛出异常。

properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000);

在RecordAccumulator中,我们通过getOrCreateDeque(...)方法来创建存储消息的数据结构,即:存储ProducerBatch实例对象的双向队列Deque;源码如下所示:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; // 主题分区:双向队列
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
    Deque<ProducerBatch> d = this.batches.get(tp);
    if (d != null)
        return d;
    d = new ArrayDeque<>(); // 创建双向队列
    Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
    if (previous == null)
        return d;
    else
        return previous;
}

其对应关系是通过一个主题分区对应双向队列Deque<ProducerBatch>,维护在batches中的,如下图所示:

这时可能会有同学问?我记得调用KafkaProducer发送消息的时候,我们发送的是ProducerRecord实例对象,怎么在Deque双向队列中存储的是ProducerBatch实例对象,他们两个有啥区别呢?ProducerRecord是我们使用KafkaProducer发送消息时拼装的单条消息,而ProducerBatch可以看做是针对一批消息进行的封装,因为会在RecordAccumulator中执行tryAppend方法将一批消息拼装在一起,可以减少网络请求次数从而提升吞吐量。

Kafka通过ByteBuffer来实现字节形式的网络传输,为了减少频繁创建/释放ByteBuffer所造成的资源消耗,Kafka还提供了缓冲池(BufferPool)来实现ByteBuffer的回收,再其内部维护了Deque<ByteBuffer> free变量来保存空闲ByteBuffer,还提供了Deque<Condition> waiters变量来保存阻塞等待中的线程。

如果待分配的size等于缓冲池中ByteBuffer的大小(可由batch.size参数进行配置,默认为16Kb),则直接从free队列中拿出空余的ByteBuffer供其使用;否则,判断如果缓冲池中空闲ByteBuffer的内存总和加上非缓冲池内存大小是大于待分配size的,则采用非缓冲池加上缓冲池混合释放内存的方式进行内存分配。代码如下所示:

关于batch.size参数,除了可以影响BufferPool中缓存的ByteBuffer是否被立刻复用之外,还与创建ProducerBatch有关。当我们通过KafkaProducer发送一条由ProducerRecord封装的消息,并交由RecordAccumulate处理时,会执行如下步骤:

1】根据主题分区寻找对应的双向队列Deque,从中获取ProducerBatch;

2】如果这个ProducerBatch还有剩余空间,则直接写入;如果无法写入,则继续执行如下逻辑;

3】如果待保存的消息size小于等于batch.size,则创建batch.size大小的ProducerBatch,当使用完毕后,交由BufferPool管理复用;

4】如果待保存的消息size大于batch.size,那么就创建消息size大小的ProducerBatch,这段内存区域不会被复用。

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

相关文章
|
消息中间件 存储 Kubernetes
kafka/pulsar on k8s
kafka/pulsar on k8s
kafka/pulsar on k8s
|
消息中间件 缓存 人工智能
Kafka生产者客户端几种异常Case详解
1生产者UserCallBack异常 异常日志 ERROR Error executing user-provided callback on message for topic-partition &#39;Topic1-0&#39; (org.apache.kafka.clients.producer.internals.ProducerBatch) 通常还会有具体的异常栈信息 异常源码 ProducerBatch#completeFutureAndFireCallbacks
Kafka生产者客户端几种异常Case详解
|
消息中间件 存储 算法
聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group
聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group
931 0
|
3月前
|
消息中间件 Java 开发工具
消息队列 MQ使用问题之如何使用DefaultMQPushConsumer来消费消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 负载均衡 大数据
【夏之以寒-Kafka专栏 01】Kafka的消息是采用Pull模式还是Push模式?
Kafka采用Pull模式为主,消费者主动拉取消息,保证控制和灵活性;同时融合Push模式,如自动Partition再分配和有序消息传递,实现高可用和负载均衡。专栏提供全面资源和面试题,助力Kafka学习。
91 0
|
消息中间件 设计模式 Java
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
423 0
|
消息中间件 Java Kafka
聊聊 Kafka: Consumer 源码解析之 poll 模型
聊聊 Kafka: Consumer 源码解析之 poll 模型
824 0
RabbmitMQ学习笔记-producer的return Listern机制
retuen 主要处理message 不可达的问题,生产中也遇到过,例如exchanger 未建立、或者queue 和exchanger未绑定关系。这些消息应该让生产者知晓并做相应处理。
44 0
|
消息中间件 Kafka
Kafka单线程Consumer及参数详解
Kafka单线程Consumer及参数详解
487 57
|
消息中间件 运维 监控
【kafka原理】 消费者偏移量__consumer_offsets_相关解析
我们在kafka的log文件中发现了还有很多以 __consumer_offsets_的文件夹;总共50个; 由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。 __consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。__consumer_offsets 的每条消息格
【kafka原理】 消费者偏移量__consumer_offsets_相关解析