在Kafka中确保不消费重复数据,特别是在像扣款这样的关键业务场景中,是非常重要的。为了实现这一目标,我们可以采取以下几种策略:
1. 消息幂等性:
Kafka提供了一种名为消息幂等性的特性,可以确保生产者发送的消息不会被重复处理。当启用消息幂等性时,每条消息都会被赋予一个唯一的ID(Producer ID + Sequence Number),Kafka会使用这个ID来识别和过滤重复的消息。即使生产者发送了重复的消息,Kafka也会确保消费者不会消费到重复的数据。
示例代码:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class IdempotentProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE); // 设置重试次数为最大值
props.put("enable.idempotence", "true"); // 启用幂等性
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Produced record with offset " + metadata.offset());
}
}
});
} finally {
producer.close();
}
}
}
在上述代码中,我们创建了一个启用了消息幂等性的生产者。即使我们多次发送相同的消息,Kafka也会确保这些消息只会被处理一次。
2. 消费者端去重:
除了在生产者端保证消息的幂等性外,还可以在消费者端进行去重操作。消费者可以维护一个已处理消息的ID列表,每次消费消息时,都先检查该列表,以确保不会处理重复的消息。
示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class DeduplicationConsumer {
private static final Set<String> processedMessages = new HashSet<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 禁止自动提交位移
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!processedMessages.contains(messageId)) {
// 处理消息
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
// 标记消息为已处理
processedMessages.add(messageId);
// 手动提交位移
consumer.commitSync();
} else {
System.out.println("Skipping duplicate message: " + messageId);
}
}
}
} finally {
consumer.close();
}
}
}
在上述代码中,我们创建了一个消费者,维护了一个已处理消息的ID列表 processedMessages
。每次消费消息时,都会检查这个列表,以确保不会处理重复的消息。
3. Exactly-Once语义:
Kafka还提供了一种名为Exactly-Once语义的特性,可以确保生产者发送的消息不会被重复处理,并且消费者在处理消息时也不会丢失消息或产生重复数据。这是通过Kafka事务API来实现的,允许生产者在发送消息时进行事务性操作,以确保消息的完整性和一致性。
示例代码:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import java.util.Properties;
public class TransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
try {
producer.abortTransaction();
} catch (KafkaException e) {
e.printStackTrace();
}
exception.printStackTrace();
} else {
producer.commitTransaction();
System.out.println("Produced record with offset " + metadata.offset());
}
}
});
} catch (ProducerFencedException | KafkaException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
在上述代码中,我们创建了一个启用了事务性的生产者,并使用 beginTransaction()
和 commitTransaction()
方法来确保消息的Exactly-Once语义。