Kafka解惑之Old Producer(2)——Sync Analysis

简介: 上接Kafka解惑之Old Producer(1)—— Beginning欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。 一下子扩展的有点多,我们还是先回到DefaultEventHandler上来,当调用producer.send方法发送消息的时候,紧接着就是调用DefaultEventHandler的handle方法。

上接Kafka解惑之Old Producer(1)—— Beginning

欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。

一下子扩展的有点多,我们还是先回到DefaultEventHandler上来,当调用producer.send方法发送消息的时候,紧接着就是调用DefaultEventHandler的handle方法。下面是handle方法的主要内容,虽然行数有点多,但是这是Producer中最最核心的一块,需要反复研磨,方能一探究竟:

def handle(events: Seq[KeyedMessage[K,V]]) {
  val serializedData = serialize(events)
  var outstandingProduceRequests = serializedData
  var remainingRetries = config.messageSendMaxRetries + 1
  val correlationIdStart = correlationId.get()
  while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
    topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
    if (topicMetadataRefreshInterval >= 0 &&
        Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
      CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
      sendPartitionPerTopicCache.clear()
      topicMetadataToRefresh.clear
      lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
    }
    outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
    if (outstandingProduceRequests.nonEmpty) {
      Thread.sleep(config.retryBackoffMs)
      CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
      sendPartitionPerTopicCache.clear()
      remainingRetries -= 1
      producerStats.resendRate.mark()
    }
  }
}

注意handle方法的参数是个Seq[KeyedMessage]类型的,而不是KeyedMessage。虽然Demo中用的只是单个KeyedMessage,最后调用底层的handle方法都是转换为Seq类型,你可以把Seq看成是java中的List,在Scala中表示序列,指的是一类具有一定长度的可迭代访问的对象,其中每个元素均带有一个从0开始计数的固定索引位置。

这个handle方法中首先是调用serialize(events)方法对消息进行序列化操作,这个容易理解,就是通过serializer.class参数指定的序列化类进行序列化。

其次获取所发送消息对应的元数据信息,然后将一坨消息(也有可能是一条)转换为HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]格式,其中key:Int表示broker的id,value是TopicAndPartition与消息集的Map,对应的方法为dispatchSerializedData()。因为客户端发消息是发到对应的broker上,所以要对每个消息找出对应的leader副本所在的broker的位置,然后将要发送的消息集分类,每个broker对应其各自所要接收的消息。而TopicAndPartition是针对broker上的存储层的,每个TopicAndPartition对应特定的当前的存储文件(Segment文件),将消息写入到存储文件中。

获取元数据信息并不是每次发送消息都要向metadata.broker.list所指定地址中的服务索要拉取,而是向缓存中的元数据进行拉取,拉取失败后才向metadata.broker.list所指定地址中的服务发送元数据更新的请求进行拉取。很多朋友会把metadata.broker.list看成是broker的地址,这个不完全正确,官网解释:

This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

因为这个地址只提供给客户端拉取元数据信息之用,而剩下的动作比如发送消息是通过与元数据信息中的broker地址建立连接之后再进行操作,这也就意味着metadata.broker.list可以和broker的真正地址没有任何交集。你完全可以为metadata.broker.list配置一个“伪装”接口地址,这个接口配合kafka的传输格式并提供相应的元数据信息,这样方便集中式的配置管理(可以集成到配置中心中)。为了简化说明,我们姑且可以狭义的认为metadata.broker.list指的就是kafka broker的地址。

缓存中的元数据每隔topic.metadata.refresh.interval.ms才去broker拉取元数据信息,可以参考上面大段源码中的if语句:

 if (topicMetadataRefreshInterval >= 0 &&
        Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval)

topic.metadata.refresh.interval.ms参数的默认值是600*1000ms,也就是10分钟。如果设置为0,则每次发送消息时都要先向broker拉取元数据信息;如果设置为负数,那么只有在元数据获取失败的情况下才会请求元数据信息。由于这个老版的Scala的Producer请求元数据和发送消息是在同一个线程中完成的,所以此处会有延迟的隐患,具体的笔者会在后面的案例分析环节为大家详细介绍。

接下去所要做的工作就是查看是否需要压缩,如果客户端设置了压缩,则根据compression.type参数配置的压缩方式对消息进行压缩处理。0.8.2.x版本支持gzip和snappy的压缩方式,1.0.0版本还支持lz4的压缩方式。compression.type参数的默认值值none,即不需要压缩。

最后根据brokerId分组发送消息。这个分组发送的过程就与ProducerPool有关了,我们前面提到在实例化Producer的时候引入了DefaultEventHandler和ProducerPool。这个ProducerPool保存的是生产者和broker的连接,每个连接对应一个SyncProducer对象。SyncProducer包装了NIO网络层的操作,每个SyncProducer都是一个与对应broker的socket连接,是真正发送消息至broker中的执行者。

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerPool(val config: ProducerConfig) extends Logging {
  private val syncProducers = new HashMap[Int, SyncProducer]

当调用最上层的send方法发送消息的时候,下面的执行顺序为DefaultEventHandler.handle()->DefaultEventHandler.dispatchSerializedData()->DefaultEventHandler.send()。在底层的DefaultEventHandler.send方法定义为:

private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])

这个方法就需要根据brokerId从ProducerPool中的HashMap中找到对应SyncProducer,然后在将“messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]”这个消息发送到SyncProducer对应的broker上。如果在获取缓存中的元数据失败的时候就需要重新向broker拉取元数据,或者定时(topic.metadata.refresh.interval.ms)向broker端请求元数据的数据,都会有可能更新ProducerPool的信息,对应的方法为ProducerPool.updateProducer():

def updateProducer(topicMetadata: Seq[TopicMetadata]) {
  val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
  topicMetadata.foreach(tmd => {
    tmd.partitionsMetadata.foreach(pmd => {
      if(pmd.leader.isDefined) {
        newBrokers += pmd.leader.get
      }
    })
  })
  lock synchronized {
    newBrokers.foreach(b => {
      if(syncProducers.contains(b.id)){
        syncProducers(b.id).close()
        syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
      } else
        syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
    })
  }
}

会Java的读者看这段代码的时候应该能看出来个90%以上,解释下这段代码:首先是找到更新的元数据中所有的brorker(更具体的来说是broker的id、主机地址host和端口号port三元组信息);之后在查到原有的ProducerPool中是否有相应的SyncProducer,如果有则关闭之后再重新建立;如果没有则新建。SyncProducer底层是阻塞式的NIO,所以关闭再建立会有一定程度上的开销,相关细节如下:

channel = SocketChannel.open()
if(readBufferSize > 0)
  channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0)
  channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true)
channel.socket.setSoTimeout(readTimeoutMs)
channel.socket.setKeepAlive(true)
channel.socket.setTcpNoDelay(true)
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)

玩过NIO的读者对这段代码相比很是熟络,虽然是scala版的。如果没有接触过NIO,那么可以先看看这一篇:攻破JAVA NIO技术壁垒

说道这里我们用一副结构图来说明下Old Producer的大致脉络(注:图中的所有操作都是在一个线程中执行的):
这里写图片描述


欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。

目录
相关文章
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
103 4
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
76 0
|
5月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
233 1
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
173 4
|
5月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
155 0
|
5月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
157 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
69 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9

热门文章

最新文章