
简介:         Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。         首先,我们先看下它的成员变量: /* the state of each nodes connection *...



    /* the state of each nodes connection */
    // 每个节点连接的状态KafkaClient实例client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    // 批量记录的记录累加器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    // 客户端元数据Metadata实例metadata
    private final Metadata metadata;

    /* the maximum request size to attempt to send to the server */
    // 试图发送到server端的最大请求大小maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    // 从server端获得的请求发送的已确认数量acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    // 一个失败请求在被放弃之前的重试次数retries
    private final int retries;

    /* the clock instance used for getting the time */
    // 获取时间的时钟Time实例time
    private final Time time;

    /* true while the sender thread is still running */
    // Sender线程运行的标志位,为true表示Sender线程一直在运行
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    // 强制关闭的标志位forceClose
    private volatile boolean forceClose;

    /* metrics */
    // 度量指标
    private final SenderMetrics sensors;

    /* param clientId of the client */
    // 客户端的clientId
    private String clientId;

    /* the max time to wait for the server to respond to the request*/
    // 等到server端响应请求的超时时间requestTimeout
    private final int requestTimeout;

     * The main run loop for the sender thread
     * sender线程的主循环
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        // 主循环,一直运行直到close被调用
        while (running) {// 标志位running为true,则一直循环
            try {
            	// 调用待参数的run()方法
            } catch (Exception e) {
            	// 截获异常后记录err级别log信息,输出异常
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            	// 调用调用待参数的run()方法继续处理
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
        // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
        // 关闭客户端
        try {
        } catch (Exception e) {
            log.error("Failed to close network client", e);

        log.debug("Shutdown of Kafka producer I/O thread has completed.");


              调用带参数的run(long now)方法,处理消息的发送;


              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;



消息中间件 Kafka
Kafka Producer整体架构概述及源码分析(下)
Kafka Producer整体架构概述及源码分析
104 0
消息中间件 缓存 Kafka
Kafka Producer整体架构概述及源码分析(上)
Kafka Producer整体架构概述及源码分析
159 0
Kafka Producer整体架构概述及源码分析(上)
消息中间件 存储 缓存
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
消息中间件 存储 设计模式
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
消息中间件 存储 Java
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
上次跟大家分享的文章「Kafka Producer 异步发送消息居然也会阻塞?」中提到了缓冲池,后面再经过一番阅读源码后,发现了这个缓冲池设计的很棒,被它的设计思想优雅到了,所以忍不住跟大家继续分享一波。
322 0
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
消息中间件 缓存 算法
从源码分析如何优雅的使用 Kafka 生产者(下)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
消息中间件 缓存 Java
从源码分析如何优雅的使用 Kafka 生产者(上)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
消息中间件 Kafka Shell
消息中间件 Kafka
从源码分析如何优雅的使用 Kafka 生产者
前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
752 0

