Kafka解惑之Old Producer(3)——Async Analysis

简介: 上接: 1. Kafka解惑之Old Producer(1)—— Beginning 2. Kafka解惑之Old Producer(2)——Sync Analysis讲述完了Sync模式下的结构脉络,下面就来聊一聊Async的,Async会将客户端所要发送的消息暂存在LinkedBlockingQueue中,然后通过特制的ProducerSendThread来根据条件发送。

上接:
1. Kafka解惑之Old Producer(1)—— Beginning
2. Kafka解惑之Old Producer(2)——Sync Analysis

讲述完了Sync模式下的结构脉络,下面就来聊一聊Async的,Async会将客户端所要发送的消息暂存在LinkedBlockingQueue中,然后通过特制的ProducerSendThread来根据条件发送。这个LinkedBlockingQueue的长度大小是通过queue.buffering.max.messages这个参数设置的,默认值为10000。启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。

在讲述Old Producer的开篇,我们展示过sync和async的代码,不妨这里在赘述一下:

  config.producerType match {
    case "sync" =>
    case "async" =>
      sync = false
      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                       queue,
                                                       eventHandler,
                                                       config.queueBufferingMaxMs,
                                                       config.batchNumMessages,
                                                       config.clientId)
      producerSendThread.start()
  }

这里可以看到sync的情况什么都不需要做,而async的情况就需要开启ProducerSendThread, 而在ProducerSendThread构造函数列表中的queue就是指客户端暂存消息的LinkedBlockingQueue。
注意ProducerSendThread构造函数列表中的config.queueBufferingMaxMs和config.batchNumMessages两个参数分表对应Producer端的配置queue.buffering.max.ms和batch.num.messages,默认值分别为5000和200,这两个参数后面会有用途。

注意在Scala语言中是没有break语句的,这点与Java不同,Scala的相当于每个case语句末尾都加上了一个break语句,所以读到相关源码的同学不要误以为是Java中语法的那种情况。

在async模式下,消息发送不是直接调用sync模式下的DefaultEventHandler的handle()方法,而是调用kafka.producer.Producer的asyncSend方法如下(只展示主要内容):

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
  for (message <- messages) {
    val added = config.queueEnqueueTimeoutMs match {
      case 0  =>
        queue.offer(message)
      case _  =>
        try {
          if (config.queueEnqueueTimeoutMs < 0) {
            queue.put(message)
            true
          } else {
            queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
          }
        }
        catch {
          case _: InterruptedException =>
            false
        }
    }}}

可以看到消息入队(存入LinkedBlockingQueue)中受到queue.enqueue.timeout.ms参数的影响,这个参数表示当LinkedBlockingQueue中消息达到上限queue.buffering.max.messages个数时所需要等待的时间。如果queue.enqueue.timeout.ms参数设置为0,表示不需要等待直接丢弃消息;如果设置为-1(默认值)则队列满时会阻塞等待。

消息存入LinkedBlockingQueue中就需要一个异步的线程ProducerSendThread来执行发送消息的操作,这个操作主要是通过kafka.producer.async.ProducerSendThread类中的processEvents方法来执行。processEvents方法的具体细节如下:

private def processEvents() {
  var lastSend = Time.SYSTEM.milliseconds
  var events = new ArrayBuffer[KeyedMessage[K,V]]
  var full: Boolean = false

  // drain the queue until you get a shutdown command
  Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS))
                    .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
    currentQueueItem =>
      // check if the queue time is reached. This happens when the poll method above returns after a timeout and
      // returns a null object
      val expired = currentQueueItem == null
      if(currentQueueItem != null) {
        events += currentQueueItem
      }

      // check if the batch size is reached
      full = events.size >= batchSize

      if(full || expired) {
        // if either queue time has reached or batch size has reached, dispatch to event handler
        tryToHandle(events)
        lastSend = Time.SYSTEM.milliseconds
        events = new ArrayBuffer[KeyedMessage[K,V]]
      }
  }
  // send the last batch of events
  tryToHandle(events)
  if(queue.size > 0)
    throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
      .format(queue.size))
}

一长串Scala源码会不会看的一头雾水?这里来一步一步的分析一下:

  1. 首先持续的拉取queue(LinkedBlockingQueue)中的消息,注意这里用的是poll(long timeout, TimeUnit unit)方法,这个表示这里会等待一段时间之后再拉取队列中的消息,这个等待的时间由queueTime也就是queue.buffering.max.ms这个参数设置。比如我们设置成1000时(默认为5000),它会大致缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
  2. 如果拉取到了消息,那么就存储缓存events(ArrayBuffer[KeyedMessage[K,V]])中,等到events中的消息大于batchSize大小,也就是batch.num.messages个数时再调用tryToHandle(events)来处理消息。
  3. tryToHandle(events)就是调用DefaultEventHandler类中的handle()方法,接下去的工作就和Sync模式的相同。
  4. 如果在等待的时间内没有获取到相应的消息,那么无需等待 events.size >= batchSize条件的满足就可以发送消息。

在讲述Sync模式的时候笔者画过一份结构图,这里也来画一幅Async结构图来收尾,与Sync模式的类似,具体如下:
这里写图片描述


欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。

目录
相关文章
|
2月前
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
167 0
|
25天前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
38 2
|
2月前
|
消息中间件 缓存 Kafka
原理剖析| 一文搞懂 Kafka Producer(上)
本文介绍了Apache Kafka 3.7的Producer使用及原理,讲解了如何创建和使用Producer,展示了一个发送消息的示例代码,并介绍了ProducerRecord和Callback接口。ProducerRecord包含topic、partition等属性,Callback用于发送消息后的回调处理。接着阐述了send、flush和close方法的功能。文章还探讨了核心组件,包括ProducerMetadata、RecordAccumulator、Sender和TransactionManager,以及消息发送流程。最后,讨论了元数据刷新、分区选择、消息攒批和超时处理等实现细节。
49 0
原理剖析| 一文搞懂 Kafka Producer(上)
|
2月前
|
消息中间件 Kafka
Kafka - 3.x Producer 生产者最佳实践
Kafka - 3.x Producer 生产者最佳实践
78 0
|
2月前
|
消息中间件 缓存 Kafka
kafka源码解析——第一篇:producer
kafka源码解析——第一篇:producer
52 0
|
2月前
|
消息中间件 Kafka Apache
Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)
Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。
|
10月前
|
消息中间件 缓存 Java
聊聊 Kafka: Producer 的网络模型
聊聊 Kafka: Producer 的网络模型
103 0
|
29天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
29天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0