从源码分析如何优雅的使用 Kafka 生产者(上)

简介: 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢?正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。

简单的消息发送


在分析之前先看一个简单的消息发送是怎么样的。


以下代码基于 SpringBoot 构建。


首先创建一个 org.apache.kafka.clients.producer.Producer 的 bean。



主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094


其余几个参数暂时不做讨论,后文会有详细介绍。


接着注入这个 bean 即可调用它的发送函数发送消息。



这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。


但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯异步的方式。


同步


那么我想知道消息到底发送成功没有该怎么办呢?


其实 ProducerAPI 已经帮我们考虑到了,发送之后只需要调用它的 get() 方法即可同步获取发送结果。



发送结果:



这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。


异步


为此我们应当采取异步的方式发送,其实 send() 方法默认则是异步的,只要不手动调用  get() 方法。


但这样就没法获知发送结果。


所以查看 send() 的 API 可以发现还有一个参数。


Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);


Callback 是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。



执行之后的结果:



同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程,这样也能证明是异步回调的。


同时回调的时候会传递两个参数:


  • RecordMetadata 和上文一致的消息发送成功后的元数据。


  • Exception 消息发送过程中的异常信息。


但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。


所以正确的写法应当是:



至于为什么会只有参数一个有值,在下文的源码分析中会一一解释。


源码分析


现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。


首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka 并不是简单的把消息通过网络发送到了 broker 中,在 Java 内部还是经过了许多优化和设计。


发送流程


为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。



从上至下依次是:


  • 初始化以及真正发送消息的 kafka-producer-network-thread IO 线程。


  • 将消息序列化。


  • 得到需要发送的分区。


  • 写入内部的一个缓存区中。


  • 初始化的 IO 线程不断的消费这个缓存来发送消息。


步骤解析


接下来详解每个步骤。


初始化



调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 线程进行缓冲区消费。


初始化 IO 线程处:



可以看到 Sender 线程有需要成员变量,比如:


acks,retries,requestTimeout


等,这些参数会在后文分析。



相关文章
|
9月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
9月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
95 0
|
9月前
|
消息中间件 缓存 Kafka
探究Kafka原理-5.Kafka设计原理和生产者原理解析(下)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
99 0
|
9月前
|
消息中间件 存储 负载均衡
探究Kafka原理-5.Kafka设计原理和生产者原理解析(上)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
118 0
|
9月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
523 4
|
3月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
125 2
|
4月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
67 1
|
5月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
6月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
7月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
109 8

热门文章

最新文章