简单的消息发送
在分析之前先看一个简单的消息发送是怎么样的。
以下代码基于 SpringBoot 构建。
首先创建一个 org.apache.kafka.clients.producer.Producer
的 bean。
主要关注 bootstrap.servers
,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094
。
其余几个参数暂时不做讨论,后文会有详细介绍。
接着注入这个 bean 即可调用它的发送函数发送消息。
这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。
但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯异步
的方式。
同步
那么我想知道消息到底发送成功没有该怎么办呢?
其实 Producer
的 API
已经帮我们考虑到了,发送之后只需要调用它的 get()
方法即可同步获取发送结果。
发送结果:
这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。
异步
为此我们应当采取异步的方式发送,其实 send()
方法默认则是异步的,只要不手动调用 get()
方法。
但这样就没法获知发送结果。
所以查看 send()
的 API 可以发现还有一个参数。
Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);
Callback
是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。
执行之后的结果:
同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程
,这样也能证明是异步回调的。
同时回调的时候会传递两个参数:
RecordMetadata
和上文一致的消息发送成功后的元数据。
Exception
消息发送过程中的异常信息。
但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。
所以正确的写法应当是:
至于为什么会只有参数一个有值,在下文的源码分析中会一一解释。
源码分析
现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。
首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka
并不是简单的把消息通过网络发送到了 broker
中,在 Java 内部还是经过了许多优化和设计。
发送流程
为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。
从上至下依次是:
- 初始化以及真正发送消息的
kafka-producer-network-thread
IO 线程。
- 将消息序列化。
- 得到需要发送的分区。
- 写入内部的一个缓存区中。
- 初始化的 IO 线程不断的消费这个缓存来发送消息。
步骤解析
接下来详解每个步骤。
初始化
调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer
。比较麻烦的是初始化 Sender
线程进行缓冲区消费。
初始化 IO 线程处:
可以看到 Sender 线程有需要成员变量,比如:
acks,retries,requestTimeout
等,这些参数会在后文分析。