Kafka(二)【文件存储机制 & 生产者】(2)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: Kafka(二)【文件存储机制 & 生产者】

Kafka(二)【文件存储机制 & 生产者】(1)https://developer.aliyun.com/article/1532285

3、生产者分区

3.1、分区的好处

分区的好处我们太清楚了,之前的 Hadoop、Spark、Flink 都有分区的概念,在 Shuffle 的时候、在 keyBy 的时候,分区的好处显然可以增加并行度,提高我们数据的处理效率;可以负载均衡,不会出现服务器涝的涝死,旱的旱死。

所以,从大数据的存储和计算的角度来看,分区有这么两种好处:

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡。
  2. 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。

3.2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

我们可以看到,在初始化 ProducerRecord 时,有 6 种初始化的传参方式:

接下来我们就利用上面的回调函数可以返回数据的元数据信息,来测试一下是不是像它说的这样:

1、指定分区:

for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", 0,"","test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果:

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0

面试题:如何把一张 MySQL 表的数据都放到一个 Kafka 的分区当中去?

答:生产者在发送数据时指定数据的分区为 表名 。

2、不指定分区,只指定 key:

kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });

我们指定 key 为 i (i 的值为0,1,2,3,4),运行结果:

topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1

3,不指定 partition 也不指定 key。这种方式其实我们上面在学异步发送的时候已经演示过了,它默认会等这一批都满了(16K)或者达到 linger.ms(默认0ms)才会发送。

我们这里在循环的时候,让线程睡眠 2ms,这样就不会因为默认的 linger.ms 太短立即发送,导致数据量太小,发送太快看不出来粘性分区的特点:

for (int i = 0; i < 25; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
            Thread.sleep(2);
        }

运行结果:

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 1

我们可以看到,粘性分区是这样的:它每次的分区和上一次的分区是不一样的,并且每次的数据尽可能会放到一个分区去。

3.3、自定义分区器

在工作当中,一些特殊场景我们现有的分区策略是无法实现的,这就需要我们自定义来实现分区器了。

1)需求

       实现一个分区器,发送过来的数据如果包含 "大傻春" 就发往 0 号分区,否则发往 1 号分区。

2)实现步骤

1. 实现 Partitioner 接口

2. 重写 partition() 方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String val = value.toString();
        if (val.contains("大傻春"))
            return 0;
        return 1;
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}

使用自定义分区器:

// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
 
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
// 发送数据
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record;
    if (i==3)
        record = new ProducerRecord<>("like", "" + i, "大傻春");
    else
        record = new ProducerRecord<>("like", ""+i,"test" + i);
 
kafkaProducer.send(record, (RecordMetadata recordMetadata, Exception e)-> {
    if (e == null){ // 如果异常为空 说明正常执行
           System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());
       }
    });
}

4、生产经验

4.1、生产者如何提高吞吐量

       我们外部的数据是放到 RecordAccumulator 的内存队列当中等待发送的,队列中每份数据的大小(batch.size)默认是16K,但我们知道,如果数据迟迟不能达到 batch.size 的话,会根据默认的配置 linger.ms (默认是 0ms)来发送。但事实上,0ms就意味着不需要数据达到 batch.size 也就是 batch.size 这一配置形同虚设不起作用,也就是只要它察觉到有数据就立即发送,这样的效率其实并不高。

       为了提高我们的吞吐量,我们当然需要调整这两个配置的参数大小:

  • batch.size :内存队列中每个批次的大小,默认 16K
  • linger.ms:等待时间,修改为 5-100ms

       当然这个参数的值我们需要慎重考虑,就像我们 Flink 当中水位线允许迟到的时间一样,不能说为了保证数据的迟到率最低,就把等待时间设置为几秒,那样 Flink 辛辛苦苦实现的毫秒级延迟有啥用呢。

  • compression.type:压缩 snappy
  • RecordAccumulator:缓冲区大小,修改为 64 MB
public class CustomProducerParameters {
    public static void main(String[] args) {
 
        // 0. 配置信息
        Properties properties = new Properties();
        // 连接 kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        // key 和 value 的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // batch.size 单位: KB
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //默认none,可配置值gzip、snappy、lz4和zstd
        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2); // 修改为64MB
 
        // 1. 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
 
        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","value"+i));
        }
 
        // 3. 关闭资源
        kafkaProducer.close();
    }
}

4.2、数据可靠性

       数据的可靠性,指的其实就是当 Selector 把数据发送到 Broker 之后,是否等待响应(ack)之后再发送数据。

我们知道,ack 的值有三种:

  1. ack = 0:发送完直接走,不用等响应,可靠性差,但是效率高。(一般不用)
  2. ack = 1:发送完需要等待 leader 节点响应才能继续发送下一个数据,可靠性中等,效率中等。
  3. ack = -1:发送完需要等待所有节点(leader 和 其他 fllower)响应才能继续发送下一个数据。可靠性高,效率低。

       在生产环境中,acks=0 很少用;acks=1:一般用于传输普通日志,允许丢个别数据;acks=-1:一般用于传输和钱有关的数据,用于对可靠性的要求比较高的场景。

完全可靠性在这种情况下尽管概率特别低,但是仍然不能排除,我们会在下面的数据去重去学习。

代码中配置 ACK 级别:

// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);

4.3、数据去重

4.3.1、数据传递语义

就像我们 Flink 中的时间语义,Kafka 生产者也有它的数据传递语义:

  • 至少一次(At least once)= ACK级别设置为1 + 分区副本大于等于2 + ISR里应答的最小副本数大于等于2
  • 最多一次(At most once)= ACK级别设置为0
  • 总结:
  • Al least once 可以保证数据不丢失,但是不能保证数据不重复
  • At most once 可以保证数据不重复,但是不能保证数据不丢失
  • 精确一次(Exactly once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失

Kafka 0.11 版本后,引入了一个重要的特性:幂等性和事务

4.3.2、幂等性
  • 幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。
  • 精确一次(Exactly once)= 幂等性 + 至少一次(ack = -1 + 分区副本数 >= 2 + ISR 最小副本数 >= 2)

       重复数据的判断标准就是数据的三个属性必须唯一(PID、Partition、SeqNumber),其中 PID 是 Kafka 每次启动自动生成的,Partition是分区号,SeqNumber 是一个单调递增的数。

       幂等性只能保证数据在单分区内不会重复

配置幂等性

enable.idempotence 默认就是 true,flase 关闭。

4.3.3、生产者事务

我们知道,幂等性只能保证数据在单分区内不会重复,但是还是不能保证绝对的唯一,比如 Kafka 挂掉了需要重启,那么重启之后之前数据的 PID 就失效了,所以当有重复的数据时,并不能识别出来。这就需要事务的方式来解决了。

说明:开启事务必须开启幂等性

其实,Kafka 实现精确一次的保证机制和我们 Flink 在保证端到端一致性时输出端的保证方式是很相似的(幂等写入和事务写入)

Kafka 的事务一共如下 5 个 API:

// 1初始化事务
void initTransactions();
 
// 2开启事务
void beginTransaction() throws ProducerFencedException;
 
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;
 
// 4提交事务
void commitTransaction() throws ProducerFencedException;
 
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

测试:

public class CustomProducerTransaction {
 
    public static void main(String[] args) {
 
        Properties properties = new Properties();
 
        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
 
        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 
        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");
 
        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        kafkaProducer.initTransactions();
 
        kafkaProducer.beginTransaction();
 
        try {
            // 2. 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("like","test"+i));
            }
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            kafkaProducer.abortTransaction();   // 终止事务
        }finally {
            // 3. 关闭资源
            kafkaProducer.close();
        }
    }
}

注意:一定要记得手动指定事务id(保证唯一)。我们要把发送数据的代码写进 try-catch 中,如果有异常那么久终止事务。

4.4、数据有序

       数据有序的保证一直是流处理领域的一个问题,就像我们的 Flink 通过 水位线和Barrier 对齐算法保证数据容错和有序性。我们 Kafka 中也是一样的,单分区的话我们数据当然是有序的,但因为是多分区,所以分区和分区间的数据不能保证哪个数据先被读取,所以说分区间数据的顺序是无序的。

       至于多分区要做到有序,可以把每个分区的数据在消费者这里进行排序,但是这样的效率不是很高,所以我们 Kafka 一般都是只保证单分区有序,个人认为,Kafka只要做到数据的尽量有序就可以了,反正Kafka的数据到下游传递时,一般也都是并行读取,比如 Flink 读取 Kafka 的数据就支持多个 Sink 算子,所以数据到了 Flink 的多个算子链里会出现乱序。

       单分区内数据有序,但是不一定发送过去仍然是有序的,这就需要给它增加一些条件了,也就是接下来要学习的用幂等性解决数据乱序。

4.5、数据乱序

Kafka 1.x 版本之后保证数据单分区有序的条件:

  • 未开启幂等性
  • max.in.flight.requests.per.connection 设置为 1(相当于 NetworkClient 中只能存在一个请求,那数据肯定是有序的,都不要使用幂等性)
  • 开启幂等性
  • max.in.flight.requests.per.connection 需要设置为 <= 5(启用幂等性之后,Kafka 服务端会缓存最近的  5 个request元数据,所以无论如何,都可以保证最近的5个request的数据都是有序的。而且这个配置的值不能>5,因为Kafka服务端最多缓存5个请求)

解释

       为什么一定可以保证单分区内数据有序呢,因为幂等性就是(ProducerID,Partition,SeqNumber),其中 SeqNumber 要保证单调递增,对应我们的request,如果生产者的 RecoedAccumulator 中某个broker对应的请求队列中 request1和 request2 发送成功,但是request3 却失败了,那么当然会去重试,但是此时request4和request5也发送过去了怎么办。因为我们开启了幂等性,所以当 request1和request2 发送到Kafka的服务端(broker)之后,因为它们是不重复且有序的,所以会立即落盘,但是我们的 request3 由于发送失败,此时 request4和request5又发送过来了,但是由于 request4 和 request5 并不满足幂等性要求,所以不会落盘,而是留在内存当中,所以只有当 request3 再次到来之后,满足幂等性并落盘之后,request4和request5才能落盘。        


相关文章
|
1月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
1月前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
115 0
|
15天前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
18 1
|
21天前
|
消息中间件 存储 Kafka
Kafka(二)【文件存储机制 & 生产者】(1)
Kafka(二)【文件存储机制 & 生产者】
|
1月前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
69 2
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
14天前
|
消息中间件 Kafka API
深入解析Kafka消息传递的可靠性保证机制
深入解析Kafka消息传递的可靠性保证机制
17 0
|
1月前
|
消息中间件 缓存 安全
Kafka 的生产者优秀架构设计
Kafka 的生产者优秀架构设计
38 0
|
1月前
|
消息中间件 负载均衡 Java
深入了解Kafka中生产者的神奇力量
深入了解Kafka中生产者的神奇力量
33 0
|
1月前
|
消息中间件 监控 Java
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
68 0
|
12天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章