阿里云Kafka幂等生产者与事务生产者

简介: 为了保障消息的精确一致到达,Kafka主要通过幂等 + 事务的方式来保障实现。下面介绍如何在阿里云Kafka中使用幂等和事务的功能。

原理介绍

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

其中at most once 和 at least once在发送端通过是否接收发送的结果来实现。

对于exactly once的情况,目前主要通过幂等性和事务实现。

创建Topic

版本要求:开源版本为2.2.0的专业版实例、Local存储

图片.png

幂等性 Producer

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)

 // 幂等参数设置
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 设置创建的Kafka
        String topic = "Local_Topic_Demo"; //消息所属的Topic,请在控制台申请之后,填写在这里
        
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            Future future1 = producer.send(record1);
            future1.get();//不关心是否发送成功,则不需要这行

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);
            Future future2 = producer.send(record1);
            future2.get();//不关心是否发送成功,则不需要这行

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);
            Future future3 = producer.send(record1);
            future3.get();//不关心是否发送成功,则不需要这行

        } catch(Exception e) {
            e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
        }
AI 代码解读

当前的幂等性只能保证单分区上,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

事务性Producer

可以通过事务(transaction)或者依赖事务型 Producer,实现多分区以及多会话上的消息无重复,这也是幂等性 Producer 和事务型 Producer 的最大区别!

设置事务型 Producer 的方法:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true。
  • 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
        // 幂等参数设置
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 事务支持设置
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //消息所属的Topic,请在控制台申请之后,填写在这里
        String topic = "Local_Topic_Demo"; 

        // 开启事务
        producer.initTransactions();
        producer.beginTransaction();
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            producer.send(record1);

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);

            producer.commitTransaction();
        } catch(Exception e) {
            producer.abortTransaction();
            e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
        }
AI 代码解读

小结

幂等性 Producer 和事务型 Producer 都是 Kafka 社区力图为 Kafka 实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。没有免费的午餐,比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。

参考链接

Kafka消息发送的三种模式
幂等生产者和事务生产者是一回事吗?

相关文章
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
133 16
阿里云消息队列 Kafka 架构及典型应用场景
阿里云消息队列 Kafka 是一款基于 Apache Kafka 的分布式消息中间件,支持消息发布与订阅模型,满足微服务解耦、大数据处理及实时流数据分析需求。其通过存算分离架构优化成本与性能,提供基础版、标准版和专业版三种 Serverless 版本,分别适用于不同业务场景,最高 SLA 达 99.99%。阿里云 Kafka 还具备弹性扩容、多可用区部署、冷热数据缓存隔离等特性,并支持与 Flink、MaxCompute 等生态工具无缝集成,广泛应用于用户行为分析、数据入库等场景,显著提升数据处理效率与实时性。
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
161 61
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
132 10
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
201 5
原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息
原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息
112 0
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
205 2
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
100 2
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
110 1
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问