Kafka请求队列源码实现-RequestChannel请求通道(上)

简介: Kafka请求队列源码实现-RequestChannel请求通道

Kafka如何实现请求队列

  • 核心类
  • image.png

Kafka服务端,即Broker,负责消息的持久化,是个不断接收外部请求、处理请求,然后发送处理结果的Java进程。


Broker的高处理性能在于高效保存排队中的请求。

Broker底层请求对象的建模

请求队列的实现原理

Broker请求处理方面的核心监控指标。


Broker与Clients主要基于Request/Response机制交互,所以看看如何建模或定义Request和Response。

请求(Request)

定义了Kafka Broker支持的各类请求。

  • RequestChannel#Request
  • image.png
  • trait关键字类似于Java的interface。从代码中,我们可以知道,

ShutdownRequest只做标志位。当Broker进程关闭时,RequestHandler会发送ShutdownRequest到专属请求处理线程。该线程接收到此请求后,会主动触发Broker关闭流程。

Request才是真正的定义各类Clients端或Broker端请求的实现类。


属性

processor

Processor线程的序号,即该请求由哪个Processor线程接收处理。

  • Broker端参数num.network.threads控制Broker每个监听器上创建的Processor线程数
  • image.png
  • 假设listeners配置为PLAINTEXT://localhost:9092,SSL://localhost:9093,则默认情况下Broker启动时会创建6个Processor线程,每3个为一组,分别给listeners参数中设置的两个监听器使用,每组的序号分别是0、1、2。


为什么保存Processor线程序号?

当Request被后面的I/O线程处理完成后,还要依靠Processor线程发送Response给请求方,因此,Request必须记录它之前被哪个Processor线程接收。

Processor线程只是网络接收线程,并不会执行真正的I/O线程才负责的Request请求处理逻辑。

context

  • 用于标识请求上下文信息,RequestContext类维护Request的所有上下文信息。
  • image.png

RequestContext类

image.png

startTimeNanos

维护Request对象被创建的时间,用于计算各种时间统计指标。


请求对象中的很多JMX(Java Management Extensions)指标,特别是时间类统计指标,都需要startTimeNanos字段,纳秒单位的时间戳信息,可实现细粒度时间统计精度。

memoryPool

一个非阻塞式内存缓冲区,用于避免Request对象无限使用内存。

内存缓冲区的接口类MemoryPool,实现类SimpleMemoryPool。可重点关注下SimpleMemoryPool#tryAllocate,怎么为Request对象分配内存。

    @Override
    public ByteBuffer tryAllocate(int sizeBytes) {
        if (sizeBytes < 1)
            throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
        if (sizeBytes > maxSingleAllocationSize)
            throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
        long available;
        boolean success = false;
        //in strict mode we will only allocate memory if we have at least the size required.
        //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
        //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
        long threshold = strict ? sizeBytes : 1;
        while ((available = availableMemory.get()) >= threshold) {
            success = availableMemory.compareAndSet(available, available - sizeBytes);
            if (success)
                break;
        }
        if (success) {
            maybeRecordEndOfDrySpell();
        } else {
            if (oomTimeSensor != null) {
                startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
            }
            log.trace("refused to allocate buffer of size {}", sizeBytes);
            return null;
        }
        ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
        bufferToBeReturned(allocated);
        return allocated;
    }

buffer

真正保存Request对象内容的字节缓冲区。Request发送方须按Kafka RPC协议规定格式向该缓冲区写入字节,否则抛InvalidRequestException

  • 该逻辑由RequestContext#parseRequest实现。
  • image.png
目录
相关文章
|
7月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
7月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
504 4
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
39 2
|
2月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
44 1
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
123 4
|
4月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
6月前
|
消息中间件 监控 Java
Java一分钟之-Kafka:分布式消息队列
【6月更文挑战第11天】Apache Kafka是一款高性能的消息队列,适用于大数据处理和实时流处理,以发布/订阅模型和分布式设计处理大规模数据流。本文介绍了Kafka基础,包括生产者、消费者、主题和代理,以及常见问题:分区选择、偏移量管理和监控不足。通过Java代码示例展示了如何创建生产者和消费者。理解并妥善处理这些问题,结合有效的监控和配置优化,是充分发挥Kafka潜力的关键。
112 0
|
7月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
137 1
|
7月前
|
消息中间件 存储 负载均衡
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解

热门文章

最新文章