一、Kafka 文件存储机制
Kafka中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
topic 只是逻辑上的概念,而 partition 是物理上的概念。每个partition对应于一个log文件,该 log 文件中存储的就是 producer 生产的数据。Producer生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个 partition 分为多个 segment 。每个 segment 对应两个文件 ——“.index”文件和“.log”文件(一个存储当前文件的索引范围,一个存储真正的数据)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,like 这个 topic 有三个分区,则其对应的文件夹为 like-0,like-1,like-2:
00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index 00000000000000239430.log
这种存储规则很像我们 Spark Shuffle 阶段产生的文件的存储规则,顺便回忆一下 Spark 的 Shuffle 阶段:
Shuffle 过程中每个 Map 任务会产生两个文件,即数据文件和索引文件。其中,数据文件是存储当前 Map 任务的输出结果,而索引文件中则存储了数据文件中的数据的分区信息。下一个阶段的 Reduce 任务就是根据索引文件来获取属于自己处理的那个分区的数据。
其实 Spark 常见的有两种 Shuffle 策略:HashShuffle 和 SortShuffle ,但是这属于 Spark 调优的内容了,现在不必深究。
二、Kafka 生产者
1、生产者消息发送流程
我们知道,Kafka 的三大组成部分:Producer、Broker、Consumer。接下来我们要学习的部分就是 Kafka 的生产者是如何把数据发送给 Kafka 集群(Broker)的。
1.1、发送原理
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
main 线程:
- 拦截器是可选的(一般用 flume 的拦截器,配置更简单)
- 序列化器(用的是Kafka自己的序列化器,而不是Java的Serializable,因为Java的序列化器太繁重)
- 分区器(决定数据往哪个分区存储)
- RecordAccumulator 默认 32MB(内存),分区器会把数据发送到这里,一个分区会创建一个队列,方便数据管理(都是在内存中完成的),队列中每份数据的大小(batch.size)默认是16K
sender线程:
- 只有数据积累到 batch.size 之后,sender 才会发送数据,默认16K
- 如果队列中的一份数据迟迟满不了(达不到 batch.size),sender 会等待 linger.ms 后发送这份数据,默认是 0ms
- NetworkClient:我们发送数据的请求会被放到一个队列(InFlightRequests 队列,通过源码可以看到其底层数据结构Map> 以 broker 为 key,队列中放的是一个个发送请求)中去,当第一个请求发出后未得到响应仍然会继续发送第二个请求,但是这个队列的大小默认是 5,也就是说,如果五个请求都发出去后且都没有得到响应,那么就不可以再发出请求,因为默认每个 broker 最多缓存5个请求。
- Sender线程通过Selector把上面的请求和数据一起发送到Kafka集群,当数据发送到Kafka集群后,Kafka集群会进行副本的复制并返回对应的acks信息。acks有三种应答级别,分别是ACK=0、ACK=1和ACK=all或ACK=-1(默认级别)。ACK=0:生产者在消息发送后不会等待来自服务器的任何确认。这意味着生产者无法知道消息是否成功存储在Kafka集群中。这个级别提供了最高的吞吐量,但在可靠性方面是最低的,因为可能会丢失消息。
ACK=1:生产者会等待直到消息的领导者副本(Leader Replica)确认接收到消息。一旦领导者副本存储了消息,生产者会收到一个确认。这个级别在性能和数据可靠性之间提供了一个平衡。但如果领导者副本在确认后发生故障,而消息还未复制到追随者副本(Follower Replicas),则消息可能会丢失。
ACK=all或ACK=-1(默认级别):生产者会等待消息被所有的同步副本(ISR,In-Sync Replicas)确认。这意味着只有当所有的同步副本都已经接收并存储了消息,生产者才会收到一个确认。这个级别提供了最高的数据可靠性,但可能会牺牲一些性能,因为需要等待所有副本的确认。 - 如果数据发送成功,就把 InFlighRequests 队列中的请求缓存给清除掉,并且把对应 RecordAccuulator 中的数据清除掉。
- 如果数据发送失败,会进行重试,重试的次数默认是(Intege.MAX_VALUE),也就是死磕到底,直到发送成功为止。
参数名称 |
描述 |
bootstrap.servers |
生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。 |
key.serializer和value.serializer |
指定发送消息的key和value的序列化类型。一定要写全类名。 |
buffer.memory |
RecordAccumulator缓冲区总大小,默认32m。 |
batch.size |
缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms |
如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。 |
0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。 |
|
max.in.flight.requests.per.connection |
允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。 |
当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
|
retry.backoff.ms |
两次重试之间的时间间隔,默认是100ms。 |
是否开启幂等性,默认true,开启幂等性。 |
|
compression.type |
生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。 |
2、异步发送 API
send 方法有两种传参方式,一种有回调函数,一种不带回调函数。
2.1、普通异步发送
所谓的异步发送指的是外部的数据使用异步的方式吧数据发送到 RecordAccumulator 的内存队列当中去,异步的体现就是外部数据发送到队列中后,并不会等待 RecordAccumulator 把数据传给 Broker 节点并返回成功的消息才继续发送;而是直接把数据扔到 RecordAccumulator 的内存队列后就撒手不管了。
案例演示
导入依赖:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> </dependencies>
注意:使用 kafka 前必须启动 zookeeper,不然报错无法使用。
public class CustomProducer { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 指定对应的 key 和 value 的序列化类型 key.serialize // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer"); // 这两个是等价的 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 1. 创建 Kafka 生产者对象 // 需要指定键值的类型 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 2. 发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like","test"+i)); } // 3. 关闭资源 kafkaProducer.close(); } }
我们可以看到,一条消息就是一个 ProducerRecord 对象,不管你的消息多么短,哪怕是一个标点,它也会被包装进一个对象里面去。
运行结果:
我们发现,我们的编程步骤和我们上面Kafka的发送原理中生产者的发送步骤是一致的,只不过我们这里没有设置拦截器和分区器,这是一个最简单的 Kafka 生产者程序了。
2.2、带回调函数的异步发送
现在我们来使用带回调函数的异步发送:
只需要修改send方法即可:
for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ // 如果异常为空 说明正常执行 System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition()); } } }); }
运行结果:
这种带回调函数的方法可以让我们知道数据更多的元数据信息(比如主题、分区...)。
2.3、同步发送 API
所谓的同步发送,就是外部数据发送到 RecordAccumulator 的内存队列后,必须等待数据被 Selector 发送到 Broker 并返回响应信息才能继续发送。代码的实现很简单,只需要在send方法之后加一个 get() 即可:
for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like","test"+i)).get(); }
Kafka(二)【文件存储机制 & 生产者】(2)https://developer.aliyun.com/article/1532289