Kafka(二)【文件存储机制 & 生产者】(1)https://developer.aliyun.com/article/1532285
3、生产者分区
3.1、分区的好处
分区的好处我们太清楚了,之前的 Hadoop、Spark、Flink 都有分区的概念,在 Shuffle 的时候、在 keyBy 的时候,分区的好处显然可以增加并行度,提高我们数据的处理效率;可以负载均衡,不会出现服务器涝的涝死,旱的旱死。
所以,从大数据的存储和计算的角度来看,分区有这么两种好处:
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡。
- 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。
3.2、生产者发送消息的分区策略
(1)默认的分区器 DefaultPartitioner
我们可以看到,在初始化 ProducerRecord 时,有 6 种初始化的传参方式:
接下来我们就利用上面的回调函数可以返回数据的元数据信息,来测试一下是不是像它说的这样:
1、指定分区:
for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like", 0,"","test" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ // 如果异常为空 说明正常执行 System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition()); } } }); }
运行结果:
topic: like,partition: 0 topic: like,partition: 0 topic: like,partition: 0 topic: like,partition: 0 topic: like,partition: 0
面试题:如何把一张 MySQL 表的数据都放到一个 Kafka 的分区当中去?
答:生产者在发送数据时指定数据的分区为 表名 。
2、不指定分区,只指定 key:
kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ // 如果异常为空 说明正常执行 System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition()); } } });
我们指定 key 为 i (i 的值为0,1,2,3,4),运行结果:
topic: like,partition: 0 topic: like,partition: 2 topic: like,partition: 2 topic: like,partition: 2 topic: like,partition: 1
3,不指定 partition 也不指定 key。这种方式其实我们上面在学异步发送的时候已经演示过了,它默认会等这一批都满了(16K)或者达到 linger.ms(默认0ms)才会发送。
我们这里在循环的时候,让线程睡眠 2ms,这样就不会因为默认的 linger.ms 太短立即发送,导致数据量太小,发送太快看不出来粘性分区的特点:
for (int i = 0; i < 25; i++) { kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ // 如果异常为空 说明正常执行 System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition()); } } }); Thread.sleep(2); }
运行结果:
topic: like,partition: 0 topic: like,partition: 0 topic: like,partition: 2 topic: like,partition: 2 topic: like,partition: 2 topic: like,partition: 1 topic: like,partition: 1 topic: like,partition: 0 topic: like,partition: 0 topic: like,partition: 2 topic: like,partition: 0 topic: like,partition: 1 topic: like,partition: 1 topic: like,partition: 1 topic: like,partition: 1 topic: like,partition: 0 topic: like,partition: 2 topic: like,partition: 0 topic: like,partition: 1 topic: like,partition: 1 topic: like,partition: 0 topic: like,partition: 1 topic: like,partition: 0 topic: like,partition: 0 topic: like,partition: 1
我们可以看到,粘性分区是这样的:它每次的分区和上一次的分区是不一样的,并且每次的数据尽可能会放到一个分区去。
3.3、自定义分区器
在工作当中,一些特殊场景我们现有的分区策略是无法实现的,这就需要我们自定义来实现分区器了。
1)需求
实现一个分区器,发送过来的数据如果包含 "大傻春" 就发往 0 号分区,否则发往 1 号分区。
2)实现步骤
1. 实现 Partitioner 接口
2. 重写 partition() 方法
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String val = value.toString(); if (val.contains("大傻春")) return 0; return 1; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
使用自定义分区器:
// 关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName()); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for (int i = 0; i < 5; i++) { ProducerRecord<String, String> record; if (i==3) record = new ProducerRecord<>("like", "" + i, "大傻春"); else record = new ProducerRecord<>("like", ""+i,"test" + i); kafkaProducer.send(record, (RecordMetadata recordMetadata, Exception e)-> { if (e == null){ // 如果异常为空 说明正常执行 System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition()); } }); }
4、生产经验
4.1、生产者如何提高吞吐量
我们外部的数据是放到 RecordAccumulator 的内存队列当中等待发送的,队列中每份数据的大小(batch.size)默认是16K,但我们知道,如果数据迟迟不能达到 batch.size 的话,会根据默认的配置 linger.ms (默认是 0ms)来发送。但事实上,0ms就意味着不需要数据达到 batch.size 也就是 batch.size 这一配置形同虚设不起作用,也就是只要它察觉到有数据就立即发送,这样的效率其实并不高。
为了提高我们的吞吐量,我们当然需要调整这两个配置的参数大小:
- batch.size :内存队列中每个批次的大小,默认 16K
- linger.ms:等待时间,修改为 5-100ms
当然这个参数的值我们需要慎重考虑,就像我们 Flink 当中水位线允许迟到的时间一样,不能说为了保证数据的迟到率最低,就把等待时间设置为几秒,那样 Flink 辛辛苦苦实现的毫秒级延迟有啥用呢。
- compression.type:压缩 snappy
- RecordAccumulator:缓冲区大小,修改为 64 MB
public class CustomProducerParameters { public static void main(String[] args) { // 0. 配置信息 Properties properties = new Properties(); // 连接 kafka properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // key 和 value 的序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // batch.size 单位: KB properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); // linger.ms properties.put(ProducerConfig.LINGER_MS_CONFIG,1); // 压缩 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //默认none,可配置值gzip、snappy、lz4和zstd // 缓冲区大小 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2); // 修改为64MB // 1. 创建生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 2. 发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like","value"+i)); } // 3. 关闭资源 kafkaProducer.close(); } }
4.2、数据可靠性
数据的可靠性,指的其实就是当 Selector 把数据发送到 Broker 之后,是否等待响应(ack)之后再发送数据。
我们知道,ack 的值有三种:
- ack = 0:发送完直接走,不用等响应,可靠性差,但是效率高。(一般不用)
- ack = 1:发送完需要等待 leader 节点响应才能继续发送下一个数据,可靠性中等,效率中等。
- ack = -1:发送完需要等待所有节点(leader 和 其他 fllower)响应才能继续发送下一个数据。可靠性高,效率低。
在生产环境中,acks=0 很少用;acks=1:一般用于传输普通日志,允许丢个别数据;acks=-1:一般用于传输和钱有关的数据,用于对可靠性的要求比较高的场景。
完全可靠性在这种情况下尽管概率特别低,但是仍然不能排除,我们会在下面的数据去重去学习。
代码中配置 ACK 级别:
// acks properties.put(ProducerConfig.ACKS_CONFIG,"1"); // 重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,3);
4.3、数据去重
4.3.1、数据传递语义
就像我们 Flink 中的时间语义,Kafka 生产者也有它的数据传递语义:
- 至少一次(At least once)= ACK级别设置为1 + 分区副本大于等于2 + ISR里应答的最小副本数大于等于2
- 最多一次(At most once)= ACK级别设置为0
- 总结:
- Al least once 可以保证数据不丢失,但是不能保证数据不重复;
- At most once 可以保证数据不重复,但是不能保证数据不丢失。
- 精确一次(Exactly once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失。
Kafka 0.11 版本后,引入了一个重要的特性:幂等性和事务。
4.3.2、幂等性
- 幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。
- 精确一次(Exactly once)= 幂等性 + 至少一次(ack = -1 + 分区副本数 >= 2 + ISR 最小副本数 >= 2)
重复数据的判断标准就是数据的三个属性必须唯一(PID、Partition、SeqNumber),其中 PID 是 Kafka 每次启动自动生成的,Partition是分区号,SeqNumber 是一个单调递增的数。
幂等性只能保证数据在单分区内不会重复。
配置幂等性
enable.idempotence 默认就是 true,flase 关闭。
4.3.3、生产者事务
我们知道,幂等性只能保证数据在单分区内不会重复,但是还是不能保证绝对的唯一,比如 Kafka 挂掉了需要重启,那么重启之后之前数据的 PID 就失效了,所以当有重复的数据时,并不能识别出来。这就需要事务的方式来解决了。
说明:开启事务必须开启幂等性。
其实,Kafka 实现精确一次的保证机制和我们 Flink 在保证端到端一致性时输出端的保证方式是很相似的(幂等写入和事务写入)
Kafka 的事务一共如下 5 个 API:
// 1初始化事务 void initTransactions(); // 2开启事务 void beginTransaction() throws ProducerFencedException; // 3在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4提交事务 void commitTransaction() throws ProducerFencedException; // 5放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
测试:
public class CustomProducerTransaction { public static void main(String[] args) { Properties properties = new Properties(); // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); // 指定对应的 key 和 value 的序列化类型 key.serialize // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer"); // 这两个是等价的 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 指定事务id properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01"); // 1. 创建 Kafka 生产者对象 // 需要指定键值的类型 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); kafkaProducer.initTransactions(); kafkaProducer.beginTransaction(); try { // 2. 发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("like","test"+i)); } kafkaProducer.commitTransaction(); }catch (Exception e){ kafkaProducer.abortTransaction(); // 终止事务 }finally { // 3. 关闭资源 kafkaProducer.close(); } } }
注意:一定要记得手动指定事务id(保证唯一)。我们要把发送数据的代码写进 try-catch 中,如果有异常那么久终止事务。
4.4、数据有序
数据有序的保证一直是流处理领域的一个问题,就像我们的 Flink 通过 水位线和Barrier 对齐算法保证数据容错和有序性。我们 Kafka 中也是一样的,单分区的话我们数据当然是有序的,但因为是多分区,所以分区和分区间的数据不能保证哪个数据先被读取,所以说分区间数据的顺序是无序的。
至于多分区要做到有序,可以把每个分区的数据在消费者这里进行排序,但是这样的效率不是很高,所以我们 Kafka 一般都是只保证单分区有序,个人认为,Kafka只要做到数据的尽量有序就可以了,反正Kafka的数据到下游传递时,一般也都是并行读取,比如 Flink 读取 Kafka 的数据就支持多个 Sink 算子,所以数据到了 Flink 的多个算子链里会出现乱序。
单分区内数据有序,但是不一定发送过去仍然是有序的,这就需要给它增加一些条件了,也就是接下来要学习的用幂等性解决数据乱序。
4.5、数据乱序
Kafka 1.x 版本之后保证数据单分区有序的条件:
- 未开启幂等性
- max.in.flight.requests.per.connection 设置为 1(相当于 NetworkClient 中只能存在一个请求,那数据肯定是有序的,都不要使用幂等性)
- 开启幂等性
- max.in.flight.requests.per.connection 需要设置为 <= 5(启用幂等性之后,Kafka 服务端会缓存最近的 5 个request元数据,所以无论如何,都可以保证最近的5个request的数据都是有序的。而且这个配置的值不能>5,因为Kafka服务端最多缓存5个请求)
解释:
为什么一定可以保证单分区内数据有序呢,因为幂等性就是(ProducerID,Partition,SeqNumber),其中 SeqNumber 要保证单调递增,对应我们的request,如果生产者的 RecoedAccumulator 中某个broker对应的请求队列中 request1和 request2 发送成功,但是request3 却失败了,那么当然会去重试,但是此时request4和request5也发送过去了怎么办。因为我们开启了幂等性,所以当 request1和request2 发送到Kafka的服务端(broker)之后,因为它们是不重复且有序的,所以会立即落盘,但是我们的 request3 由于发送失败,此时 request4和request5又发送过来了,但是由于 request4 和 request5 并不满足幂等性要求,所以不会落盘,而是留在内存当中,所以只有当 request3 再次到来之后,满足幂等性并落盘之后,request4和request5才能落盘。