发送原理
Kafka的Producer发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator。
- ①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。
- ②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
Kafka的Producer发送消息采用了异步发送的方式,这个过程确实涉及到多个线程以及共享变量。下面详细展开说明这个过程:
1. 主线程 (main thread):
主线程是生产者应用的线程,它负责创建消息并将这些消息发送给Kafka Producer API。主要的操作包括:
- 创建消息:主线程创建消息,将它们封装成ProducerRecord对象。ProducerRecord通常包括消息的主题(topic)、分区(partition)、键(key)和值(value)等信息。
- 发送消息到RecordAccumulator:主线程将创建的消息发送到一个双端队列(deque)叫做RecordAccumulator。这个队列用于缓冲消息,允许Producer线程将消息异步发送到Kafka集群,而不需要等待每条消息都被立刻发送。
2. Sender 线程:
Sender线程是Kafka Producer内部的一个后台线程,它负责从RecordAccumulator中拉取消息并发送到Kafka broker。Sender线程的主要工作如下:
- 从RecordAccumulator拉取消息:Sender线程定期轮询(poll)RecordAccumulator,检查是否有新消息需要发送。这个轮询是异步的,因此主线程不需要等待消息被发送。
- 构建请求:当Sender线程发现有消息需要发送,它会构建一个或多个ProducerRequest,每个请求包含多个消息,以便进行有效的批量发送。
- 发送消息到Kafka broker:Sender线程将构建的请求发送到Kafka broker,等待来自broker的响应。一旦消息被成功接收并记录在Kafka broker中,Sender线程会通知RecordAccumulator,以便它可以更新消息的状态。
3. RecordAccumulator:
RecordAccumulator是Producer内部的一个共享变量,用于暂存即将发送到Kafka broker的消息。主要功能包括:
- 暂存消息:主线程将消息发送到RecordAccumulator中,使其在等待Sender线程处理。
- 管理消息的状态:RecordAccumulator跟踪每条消息的发送状态,以确保消息被成功发送到Kafka broker。一旦消息被成功写入到Kafka broker的日志中,RecordAccumulator会将消息的状态标记为已发送。
- 负责消息批量化:RecordAccumulator也有助于消息的批量发送,以减少网络开销和提高性能。
发送原理小结
总结一下,Kafka的Producer采用异步发送消息的方式,
- 主线程负责创建和发送消息到RecordAccumulator,
- 而Sender线程负责从RecordAccumulator中拉取消息并将其发送到Kafka broker。
- RecordAccumulator充当缓冲区,用于管理消息的状态以及批量发送,以提高性能和降低延迟。
这个架构充分利用了多线程和异步操作,使得Producer能够高效地发送消息到Kafka集群。
重要参数
参数名称 | 描述 |
bootstrap.servers | 生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。 |
key.serializer, value.serializer | 指定发送消息的key和value的序列化类型。要写全类名。(反射获取) |
buffer.memory | RecordAccumulator缓冲区总大小,默认32m。 |
batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader数据落盘后应答. -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1 |
max.in.flight.requests.per.connection | 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。 |
Retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
enable.idempotence | 是否开启幂等性,默认true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是none,不压缩。支持压缩类型:none、gzip、snappy、lz4和zstd。 |