原理介绍
所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
其中at most once 和 at least once在发送端通过是否接收发送的结果来实现。
对于exactly once的情况,目前主要通过幂等性和事务实现。
创建Topic
版本要求:开源版本为2.2.0的专业版实例、Local存储
幂等性 Producer
指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)
// 幂等参数设置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
// 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
// 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 设置创建的Kafka
String topic = "Local_Topic_Demo"; //消息所属的Topic,请在控制台申请之后,填写在这里
try{
ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
Future future1 = producer.send(record1);
future1.get();//不关心是否发送成功,则不需要这行
ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
producer.send(record2);
Future future2 = producer.send(record1);
future2.get();//不关心是否发送成功,则不需要这行
ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
producer.send(record3);
Future future3 = producer.send(record1);
future3.get();//不关心是否发送成功,则不需要这行
} catch(Exception e) {
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
当前的幂等性只能保证单分区上,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
事务性Producer
可以通过事务(transaction)或者依赖事务型 Producer,实现多分区以及多会话上的消息无重复,这也是幂等性 Producer 和事务型 Producer 的最大区别!
设置事务型 Producer 的方法:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
// 幂等参数设置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 事务支持设置
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
// 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
// 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//消息所属的Topic,请在控制台申请之后,填写在这里
String topic = "Local_Topic_Demo";
// 开启事务
producer.initTransactions();
producer.beginTransaction();
try{
ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
producer.send(record1);
ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
producer.send(record2);
ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
producer.send(record3);
producer.commitTransaction();
} catch(Exception e) {
producer.abortTransaction();
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
小结
幂等性 Producer 和事务型 Producer 都是 Kafka 社区力图为 Kafka 实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。没有免费的午餐,比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。