阿里云Kafka幂等生产者与事务生产者

简介: 为了保障消息的精确一致到达,Kafka主要通过幂等 + 事务的方式来保障实现。下面介绍如何在阿里云Kafka中使用幂等和事务的功能。

原理介绍

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

其中at most once 和 at least once在发送端通过是否接收发送的结果来实现。

对于exactly once的情况,目前主要通过幂等性和事务实现。

创建Topic

版本要求:开源版本为2.2.0的专业版实例、Local存储

图片.png

幂等性 Producer

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)

 // 幂等参数设置
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 设置创建的Kafka
        String topic = "Local_Topic_Demo"; //消息所属的Topic,请在控制台申请之后,填写在这里
        
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            Future future1 = producer.send(record1);
            future1.get();//不关心是否发送成功,则不需要这行

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);
            Future future2 = producer.send(record1);
            future2.get();//不关心是否发送成功,则不需要这行

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);
            Future future3 = producer.send(record1);
            future3.get();//不关心是否发送成功,则不需要这行

        } catch(Exception e) {
            e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
        }

当前的幂等性只能保证单分区上,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

事务性Producer

可以通过事务(transaction)或者依赖事务型 Producer,实现多分区以及多会话上的消息无重复,这也是幂等性 Producer 和事务型 Producer 的最大区别!

设置事务型 Producer 的方法:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true。
  • 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
        // 幂等参数设置
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 事务支持设置
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //消息所属的Topic,请在控制台申请之后,填写在这里
        String topic = "Local_Topic_Demo"; 

        // 开启事务
        producer.initTransactions();
        producer.beginTransaction();
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            producer.send(record1);

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);

            producer.commitTransaction();
        } catch(Exception e) {
            producer.abortTransaction();
            e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
        }

小结

幂等性 Producer 和事务型 Producer 都是 Kafka 社区力图为 Kafka 实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。没有免费的午餐,比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。

参考链接

Kafka消息发送的三种模式
幂等生产者和事务生产者是一回事吗?

相关文章
|
26天前
|
消息中间件 Java Kafka
掌握Kafka事务,看这篇就够了
先赞后看,南哥助你Java进阶一大半Kafka事务实际上引入了原子多分区写入的概念,播客画了以下流程图,展示了事务在分区级别如何工作。我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。
掌握Kafka事务,看这篇就够了
|
10天前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
86 9
|
2月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
3月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
68 8
|
2月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
2月前
|
消息中间件 Java Kafka
Kafka生产者同步和异步的JavaAPI代码演示
Kafka生产者同步和异步的JavaAPI代码演示
27 0
|
3月前
|
消息中间件 缓存 Kafka
Kafka的producer如何实现幂等性
Kafka的producer如何实现幂等性
137 1
|
3月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现两个阿里云账号下的Kafka进行数据的互相传输
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Java
查询Kafka生产者是否连接到Kafka服务
查询Kafka生产者是否连接到Kafka服务
136 2
下一篇
无影云桌面