Kafka解惑之Old Producer(3)——Async Analysis

简介: 上接: 1. Kafka解惑之Old Producer(1)—— Beginning 2. Kafka解惑之Old Producer(2)——Sync Analysis讲述完了Sync模式下的结构脉络,下面就来聊一聊Async的,Async会将客户端所要发送的消息暂存在LinkedBlockingQueue中,然后通过特制的ProducerSendThread来根据条件发送。

上接:
1. Kafka解惑之Old Producer(1)—— Beginning
2. Kafka解惑之Old Producer(2)——Sync Analysis

讲述完了Sync模式下的结构脉络,下面就来聊一聊Async的,Async会将客户端所要发送的消息暂存在LinkedBlockingQueue中,然后通过特制的ProducerSendThread来根据条件发送。这个LinkedBlockingQueue的长度大小是通过queue.buffering.max.messages这个参数设置的,默认值为10000。启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。

在讲述Old Producer的开篇,我们展示过sync和async的代码,不妨这里在赘述一下:

  config.producerType match {
    case "sync" =>
    case "async" =>
      sync = false
      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                       queue,
                                                       eventHandler,
                                                       config.queueBufferingMaxMs,
                                                       config.batchNumMessages,
                                                       config.clientId)
      producerSendThread.start()
  }

这里可以看到sync的情况什么都不需要做,而async的情况就需要开启ProducerSendThread, 而在ProducerSendThread构造函数列表中的queue就是指客户端暂存消息的LinkedBlockingQueue。
注意ProducerSendThread构造函数列表中的config.queueBufferingMaxMs和config.batchNumMessages两个参数分表对应Producer端的配置queue.buffering.max.ms和batch.num.messages,默认值分别为5000和200,这两个参数后面会有用途。

注意在Scala语言中是没有break语句的,这点与Java不同,Scala的相当于每个case语句末尾都加上了一个break语句,所以读到相关源码的同学不要误以为是Java中语法的那种情况。

在async模式下,消息发送不是直接调用sync模式下的DefaultEventHandler的handle()方法,而是调用kafka.producer.Producer的asyncSend方法如下(只展示主要内容):

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
  for (message <- messages) {
    val added = config.queueEnqueueTimeoutMs match {
      case 0  =>
        queue.offer(message)
      case _  =>
        try {
          if (config.queueEnqueueTimeoutMs < 0) {
            queue.put(message)
            true
          } else {
            queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
          }
        }
        catch {
          case _: InterruptedException =>
            false
        }
    }}}

可以看到消息入队(存入LinkedBlockingQueue)中受到queue.enqueue.timeout.ms参数的影响,这个参数表示当LinkedBlockingQueue中消息达到上限queue.buffering.max.messages个数时所需要等待的时间。如果queue.enqueue.timeout.ms参数设置为0,表示不需要等待直接丢弃消息;如果设置为-1(默认值)则队列满时会阻塞等待。

消息存入LinkedBlockingQueue中就需要一个异步的线程ProducerSendThread来执行发送消息的操作,这个操作主要是通过kafka.producer.async.ProducerSendThread类中的processEvents方法来执行。processEvents方法的具体细节如下:

private def processEvents() {
  var lastSend = Time.SYSTEM.milliseconds
  var events = new ArrayBuffer[KeyedMessage[K,V]]
  var full: Boolean = false

  // drain the queue until you get a shutdown command
  Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS))
                    .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
    currentQueueItem =>
      // check if the queue time is reached. This happens when the poll method above returns after a timeout and
      // returns a null object
      val expired = currentQueueItem == null
      if(currentQueueItem != null) {
        events += currentQueueItem
      }

      // check if the batch size is reached
      full = events.size >= batchSize

      if(full || expired) {
        // if either queue time has reached or batch size has reached, dispatch to event handler
        tryToHandle(events)
        lastSend = Time.SYSTEM.milliseconds
        events = new ArrayBuffer[KeyedMessage[K,V]]
      }
  }
  // send the last batch of events
  tryToHandle(events)
  if(queue.size > 0)
    throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
      .format(queue.size))
}

一长串Scala源码会不会看的一头雾水?这里来一步一步的分析一下:

  1. 首先持续的拉取queue(LinkedBlockingQueue)中的消息,注意这里用的是poll(long timeout, TimeUnit unit)方法,这个表示这里会等待一段时间之后再拉取队列中的消息,这个等待的时间由queueTime也就是queue.buffering.max.ms这个参数设置。比如我们设置成1000时(默认为5000),它会大致缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
  2. 如果拉取到了消息,那么就存储缓存events(ArrayBuffer[KeyedMessage[K,V]])中,等到events中的消息大于batchSize大小,也就是batch.num.messages个数时再调用tryToHandle(events)来处理消息。
  3. tryToHandle(events)就是调用DefaultEventHandler类中的handle()方法,接下去的工作就和Sync模式的相同。
  4. 如果在等待的时间内没有获取到相应的消息,那么无需等待 events.size >= batchSize条件的满足就可以发送消息。

在讲述Sync模式的时候笔者画过一份结构图,这里也来画一幅Async结构图来收尾,与Sync模式的类似,具体如下:
这里写图片描述


欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。

目录
相关文章
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
103 4
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
76 0
|
5月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
233 1
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
173 4
|
5月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
155 0
|
5月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
157 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
69 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9

热门文章

最新文章