从源码分析如何优雅的使用 Kafka 生产者(下)

简介: 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。

序列化消息


在调用 send() 函数后其实第一步就是序列化,毕竟我们的消息需要通过网络才能发送到 Kafka。



其中的 valueSerializer.serialize(record.topic(), record.value()); 是一个接口,我们需要在初始化时候指定序列化实现类。



我们也可以自己实现序列化,只需要实现 org.apache.kafka.common.serialization.Serializer 接口即可。


路由分区


接下来就是路由分区,通常我们使用的 Topic 为了实现扩展性以及高性能都会创建多个分区。


如果是一个分区好说,所有消息都往里面写入即可。


但多个分区就不可避免需要知道写入哪个分区。


通常有三种方式。


指定分区


可以在构建 ProducerRecord 为每条消息指定分区。



这样在路由时会判断是否有指定,有就直接使用该分区。



这种一般在特殊场景下会使用。


自定义路由策略



如果没有指定分区,则会调用 partitioner.partition 接口执行自定义分区策略。

而我们也只需要自定义一个类实现

org.apache.kafka.clients.producer.Partitioner 接口,同时在创建 KafkaProducer 实例时配置 partitioner.class 参数。



通常需要自定义分区一般是在想尽量的保证消息的顺序性。


或者是写入某些特有的分区,由特别的消费者来进行处理等。


默认策略


最后一种则是默认的路由策略,如果我们啥都没做就会执行该策略。


该策略也会使得消息分配的比较均匀。


来看看它的实现:



简单的来说分为以下几步:


  • 获取 Topic 分区数。


  • 将内部维护的一个线程安全计数器 +1。


  • 与分区数取模得到分区编号。


其实这就是很典型的轮询算法,所以只要分区数不频繁变动这种方式也会比较均匀。


写入内部缓存


send() 方法拿到分区后会调用一个 append() 函数:



该函数中会调用一个 getOrCreateDeque() 写入到一个内部缓存中 batches



消费缓存


在最开始初始化的 IO 线程其实是一个守护线程,它会一直消费这些数据。



通过图中的几个函数会获取到之前写入的数据。这块内容可以不必深究,但其中有个 completeBatch 方法却非常关键。



调用该方法时候肯定已经是消息发送完毕了,所以会调用 batch.done() 来完成之前我们在 send() 方法中定义的回调接口。



从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。


Producer 参数解析


发送流程讲完了再来看看 Producer 中比较重要的几个参数。


acks


acks 是一个影响消息吞吐量的一个关键参数。



主要有 [all、-1, 0, 1] 这几个选项,默认为 1。


由于 Kafka 不是采取的主备模式,而是采用类似于 Zookeeper 的主备模式。


前提是 Topic 配置副本数量 replica > 1


acks = all/-1 时:


意味着会确保所有的 follower 副本都完成数据的写入才会返回。


这样可以保证消息不会丢失!


但同时性能和吞吐量却是最低的。


acks = 0 时:


producer 不会等待副本的任何响应,这样最容易丢失消息但同时性能却是最好的!


acks = 1 时:


这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。


一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。


batch.size


这个参数看名称就知道是内部缓存区的大小限制,对他适当的调大可以提高吞吐量。


但也不能极端,调太大会浪费内存。小了也发挥不了作用,也是一个典型的时间和空间的权衡。



上图是几个使用的体现。


retries


retries 该参数主要是来做重试使用,当发生一些网络抖动都会造成重试。


这个参数也就是限制重试次数。


但也有一些其他问题。


  • 因为是重发所以消息顺序可能不会一致,这也是上文提到就算是一个分区消息也不会是完全顺序的情况。


  • 还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现消息重复。这种只能是消费者进行幂等处理。


高效的发送方式


如果消息量真的非常大,同时又需要尽快的将消息发送到 Kafka。一个 producer 始终会收到缓存大小等影响。


那是否可以创建多个 producer 来进行发送呢?


  • 配置一个最大 producer 个数。


  • 发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List 中,保存时做好同步处理防止并发问题。


  • 获取发送者时可以按照默认的分区策略使用轮询的方式获取(保证使用均匀)。


这样在大量、频繁的消息发送场景中可以提高发送效率减轻单个 producer 的压力。


关闭 Producer


最后则是 Producer 的关闭,Producer 在使用过程中消耗了不少资源(线程、内存、网络等)因此需要显式的关闭从而回收这些资源。



默认的 close() 方法和带有超时时间的方法都是在一定的时间后强制关闭。


但在过期之前都会处理完剩余的任务。


所以使用哪一个得视情况而定。


相关文章
|
6月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
6月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
76 0
|
6月前
|
消息中间件 缓存 Kafka
探究Kafka原理-5.Kafka设计原理和生产者原理解析(下)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
82 0
|
6月前
|
消息中间件 存储 负载均衡
探究Kafka原理-5.Kafka设计原理和生产者原理解析(上)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
104 0
|
6月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
485 4
|
23天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
54 2
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
35 1
|
2月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
3月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
4月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
87 8
下一篇
无影云桌面