Kafka如何实现请求队列
- 核心类
Kafka服务端,即Broker,负责消息的持久化,是个不断接收外部请求、处理请求,然后发送处理结果的Java进程。
Broker的高处理性能在于高效保存排队中的请求。
Broker底层请求对象的建模
请求队列的实现原理
Broker请求处理方面的核心监控指标。
Broker与Clients主要基于Request/Response机制交互,所以看看如何建模或定义Request和Response。
请求(Request)
定义了Kafka Broker支持的各类请求。
- RequestChannel#Request
- trait关键字类似于Java的interface。从代码中,我们可以知道,
ShutdownRequest只做标志位。当Broker进程关闭时,RequestHandler会发送ShutdownRequest到专属请求处理线程。该线程接收到此请求后,会主动触发Broker关闭流程。
Request才是真正的定义各类Clients端或Broker端请求的实现类。
属性
processor
Processor线程的序号,即该请求由哪个Processor线程接收处理。
- Broker端参数
num.network.threads
控制Broker每个监听器上创建的Processor线程数 - 假设listeners配置为PLAINTEXT://localhost:9092,SSL://localhost:9093,则默认情况下Broker启动时会创建6个Processor线程,每3个为一组,分别给listeners参数中设置的两个监听器使用,每组的序号分别是0、1、2。
为什么保存Processor线程序号?
当Request被后面的I/O线程处理完成后,还要依靠Processor线程发送Response给请求方,因此,Request必须记录它之前被哪个Processor线程接收。
Processor线程只是网络接收线程,并不会执行真正的I/O线程才负责的Request请求处理逻辑。
context
- 用于标识请求上下文信息,RequestContext类维护Request的所有上下文信息。
RequestContext类
startTimeNanos
维护Request对象被创建的时间,用于计算各种时间统计指标。
请求对象中的很多JMX(Java Management Extensions)指标,特别是时间类统计指标,都需要startTimeNanos字段,纳秒单位的时间戳信息,可实现细粒度时间统计精度。
memoryPool
一个非阻塞式内存缓冲区,用于避免Request对象无限使用内存。
内存缓冲区的接口类MemoryPool,实现类SimpleMemoryPool。可重点关注下SimpleMemoryPool#tryAllocate,怎么为Request对象分配内存。
@Override public ByteBuffer tryAllocate(int sizeBytes) { if (sizeBytes < 1) throw new IllegalArgumentException("requested size " + sizeBytes + "<=0"); if (sizeBytes > maxSingleAllocationSize) throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize); long available; boolean success = false; //in strict mode we will only allocate memory if we have at least the size required. //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize) long threshold = strict ? sizeBytes : 1; while ((available = availableMemory.get()) >= threshold) { success = availableMemory.compareAndSet(available, available - sizeBytes); if (success) break; } if (success) { maybeRecordEndOfDrySpell(); } else { if (oomTimeSensor != null) { startOfNoMemPeriod.compareAndSet(0, System.nanoTime()); } log.trace("refused to allocate buffer of size {}", sizeBytes); return null; } ByteBuffer allocated = ByteBuffer.allocate(sizeBytes); bufferToBeReturned(allocated); return allocated; }
buffer
真正保存Request对象内容的字节缓冲区。Request发送方须按Kafka RPC协议规定格式向该缓冲区写入字节,否则抛InvalidRequestException。
- 该逻辑由RequestContext#parseRequest实现。