Kafka源码分析之InFlightRequests

简介:         InFlightRequests是对已经被发送或正在被发送但是均未接收到响应的客户端请求集合的一个封装,在它内部,有两个重要的变量,如下: // 每个连接最大执行中请求数 private final int maxInFlightRequestsPerC...

        InFlightRequests是对已经被发送或正在被发送但是均未接收到响应的客户端请求集合的一个封装,在它内部,有两个重要的变量,如下:

    // 每个连接最大执行中请求数
    private final int maxInFlightRequestsPerConnection;
    
    // 节点node至客户端请求双端队列Deque<ClientRequest>的映射集合
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();

        其中,requests是节点node至客户端请求双端队列Deque<ClientRequest>的映射集合,其中key为字符串形式的节点node,value为一个双端队列Deque,其中的元素为客户端请求ClientRequest。当有新请求需要处理时,会在队首入列,而实际被处理的请求,则是从队尾出列,保证入列早的请求先得到处理。maxInFlightRequestsPerConnection不消多说,它是限制每个连接,即每个node对应客户端请求队列大小的一个阈值。

        既然InFlightRequests本质上是对客户端请求按照node区分的一个双端队列映射集合,那么我们来看下它的队列入队及出队操作。

        首先,入队是通过add()方法来完成的,代码如下:

    /**
     * Add the given request to the queue for the connection it was directed to
     * 将给定请求添加至其对于连接的队列
     */
    public void add(ClientRequest request) {
    	
    	// 从requests集合中根据给定请求的目的地node获取Deque<ClientRequest>双端队列reqs
        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
        
        // 如果双端队列reqs为nul
        if (reqs == null) {
        	// 构造一个双端队列ArrayDeque类型的reqs
            reqs = new ArrayDeque<>();
            
            // 将请求目的地node至reqs的对应关系添加到requests集合
            this.requests.put(request.request().destination(), reqs);
        }
        
        // reqs队列首部添加请求request,使用的是addFirst()方法
        reqs.addFirst(request);
    }
        逻辑很简单,大体如下:

        1、从requests集合中根据给定请求的目的地node获取Deque<ClientRequest>双端队列reqs;

        2、如果双端队列reqs为nul:

              2.1、构造一个双端队列ArrayDeque类型的reqs;

              2.2、将请求目的地node至reqs的对应关系添加到requests集合;

        3、reqs队列首部添加请求request,使用的是addFirst()方法。

        这里,最关键的一点是它将请求添加至双端队列的队首,使用的是addFirst()方法。

        出队是通过completeNext()来实现的,代码如下:

    /**
     * Get the oldest request (the one that that will be completed next) for the given node
     * 获取给定节点node的时间最久执行中请求,作为接下来要完成的请求
     */
    public ClientRequest completeNext(String node) {
        
    	// 根据给定节点node获取客户端请求双端队列reqs,并从poll出队尾元素
    	// add时是通过addFirst()方法添加到队首的,所以队尾的元素是时间最久的,也是应该先处理的
    	return requestQueue(node).pollLast();
    }
        completeNext()方法根据给定节点node获取客户端请求双端队列reqs,并从poll出队尾元素。所以,这里我们简单总结下,add时是通过addFirst()方法添加到队首的,所以队尾的元素是时间最久的,也是应该先处理的,故出队应该用pollLast(),将存储时间最久的元素移出进行处理。
 


相关文章
|
消息中间件 缓存 Kafka
Kafka Producer整体架构概述及源码分析(上)
Kafka Producer整体架构概述及源码分析
215 0
Kafka Producer整体架构概述及源码分析(上)
|
消息中间件 Kafka
Kafka Producer整体架构概述及源码分析(下)
Kafka Producer整体架构概述及源码分析
132 0
|
消息中间件 存储 缓存
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
|
消息中间件 存储 设计模式
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
|
消息中间件 存储 Java
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
上次跟大家分享的文章「Kafka Producer 异步发送消息居然也会阻塞?」中提到了缓冲池,后面再经过一番阅读源码后,发现了这个缓冲池设计的很棒,被它的设计思想优雅到了,所以忍不住跟大家继续分享一波。
403 0
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
|
消息中间件 存储 JSON
Kafka 分区重分配源码分析
上一篇跟大家描述了 Kafka 集群扩容的方案与过程,这次就跟大家详细描述 Kafka 分区重分配的实现细节。
189 0
Kafka 分区重分配源码分析
|
消息中间件 缓存 算法
从源码分析如何优雅的使用 Kafka 生产者(下)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
|
消息中间件 缓存 Java
从源码分析如何优雅的使用 Kafka 生产者(上)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
|
消息中间件 Kafka Shell
【kafka源码】Topic的创建源码分析(附视频)
【kafka源码】Topic的创建源码分析(附视频)
【kafka源码】Topic的创建源码分析(附视频)
|
消息中间件 Kafka
从源码分析如何优雅的使用 Kafka 生产者
前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
775 0
下一篇
无影云桌面