Kafka源码分析之Sender

简介:         Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。         首先,我们先看下它的成员变量: /* the state of each nodes connection *...

        Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。

        首先,我们先看下它的成员变量:

    /* the state of each nodes connection */
    // 每个节点连接的状态KafkaClient实例client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    // 批量记录的记录累加器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    // 客户端元数据Metadata实例metadata
    private final Metadata metadata;

    /* the maximum request size to attempt to send to the server */
    // 试图发送到server端的最大请求大小maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    // 从server端获得的请求发送的已确认数量acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    // 一个失败请求在被放弃之前的重试次数retries
    private final int retries;

    /* the clock instance used for getting the time */
    // 获取时间的时钟Time实例time
    private final Time time;

    /* true while the sender thread is still running */
    // Sender线程运行的标志位,为true表示Sender线程一直在运行
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    // 强制关闭的标志位forceClose
    private volatile boolean forceClose;

    /* metrics */
    // 度量指标
    private final SenderMetrics sensors;

    /* param clientId of the client */
    // 客户端的clientId
    private String clientId;

    /* the max time to wait for the server to respond to the request*/
    // 等到server端响应请求的超时时间requestTimeout
    private final int requestTimeout;
        既然是一个线程,我们看下它的主要运行逻辑run()方法,代码如下:

    /**
     * The main run loop for the sender thread
     * sender线程的主循环
     */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        // 主循环,一直运行直到close被调用
        while (running) {// 标志位running为true,则一直循环
            try {
            	// 调用待参数的run()方法
                run(time.milliseconds());
            } catch (Exception e) {
            	
            	// 截获异常后记录err级别log信息,输出异常
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            	// 调用调用待参数的run()方法继续处理
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        
        // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        
        // 关闭客户端
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }
        Sender线程的主循环在run()方法内,其主要处理逻辑为:

        1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用:

              调用带参数的run(long now)方法,处理消息的发送;

        2、当close被调用时,running被设置为false,while主循环退出:

              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;

              2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;

              2.3、关闭客户端。



相关文章
|
消息中间件 Kafka
Kafka Producer整体架构概述及源码分析(下)
Kafka Producer整体架构概述及源码分析
104 0
|
消息中间件 缓存 Kafka
Kafka Producer整体架构概述及源码分析(上)
Kafka Producer整体架构概述及源码分析
159 0
Kafka Producer整体架构概述及源码分析(上)
|
消息中间件 存储 缓存
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
|
消息中间件 存储 设计模式
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
|
消息中间件 存储 Java
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
上次跟大家分享的文章「Kafka Producer 异步发送消息居然也会阻塞?」中提到了缓冲池,后面再经过一番阅读源码后,发现了这个缓冲池设计的很棒,被它的设计思想优雅到了,所以忍不住跟大家继续分享一波。
322 0
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
|
消息中间件 缓存 算法
从源码分析如何优雅的使用 Kafka 生产者(下)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
|
消息中间件 缓存 Java
从源码分析如何优雅的使用 Kafka 生产者(上)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
|
消息中间件 Kafka Shell
【kafka源码】Topic的创建源码分析(附视频)
【kafka源码】Topic的创建源码分析(附视频)
【kafka源码】Topic的创建源码分析(附视频)
|
消息中间件 Kafka
从源码分析如何优雅的使用 Kafka 生产者
前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
752 0

热门文章

最新文章