【Kafka】kafka 如何不消费重复数据?

简介: 【4月更文挑战第7天】【Kafka】kafka 如何不消费重复数据?

image.png

在Kafka中确保不消费重复数据,特别是在像扣款这样的关键业务场景中,是非常重要的。为了实现这一目标,我们可以采取以下几种策略:

1. 消息幂等性

Kafka提供了一种名为消息幂等性的特性,可以确保生产者发送的消息不会被重复处理。当启用消息幂等性时,每条消息都会被赋予一个唯一的ID(Producer ID + Sequence Number),Kafka会使用这个ID来识别和过滤重复的消息。即使生产者发送了重复的消息,Kafka也会确保消费者不会消费到重复的数据。

示例代码:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class IdempotentProducer {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", Integer.MAX_VALUE); // 设置重试次数为最大值
        props.put("enable.idempotence", "true"); // 启用幂等性
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
   
   
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
            producer.send(record, new Callback() {
   
   
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
   
   
                    if (exception != null) {
   
   
                        exception.printStackTrace();
                    } else {
   
   
                        System.out.println("Produced record with offset " + metadata.offset());
                    }
                }
            });
        } finally {
   
   
            producer.close();
        }
    }
}

在上述代码中,我们创建了一个启用了消息幂等性的生产者。即使我们多次发送相同的消息,Kafka也会确保这些消息只会被处理一次。

2. 消费者端去重

除了在生产者端保证消息的幂等性外,还可以在消费者端进行去重操作。消费者可以维护一个已处理消息的ID列表,每次消费消息时,都先检查该列表,以确保不会处理重复的消息。

示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

public class DeduplicationConsumer {
   
   
    private static final Set<String> processedMessages = new HashSet<>();

    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false"); // 禁止自动提交位移
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
   
   
            while (true) {
   
   
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
   
   
                    String messageId = record.key();
                    if (!processedMessages.contains(messageId)) {
   
   
                        // 处理消息
                        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
                                record.partition(), record.offset(), record.key(), record.value());

                        // 标记消息为已处理
                        processedMessages.add(messageId);

                        // 手动提交位移
                        consumer.commitSync();
                    } else {
   
   
                        System.out.println("Skipping duplicate message: " + messageId);
                    }
                }
            }
        } finally {
   
   
            consumer.close();
        }
    }
}

在上述代码中,我们创建了一个消费者,维护了一个已处理消息的ID列表 processedMessages。每次消费消息时,都会检查这个列表,以确保不会处理重复的消息。

3. Exactly-Once语义

Kafka还提供了一种名为Exactly-Once语义的特性,可以确保生产者发送的消息不会被重复处理,并且消费者在处理消息时也不会丢失消息或产生重复数据。这是通过Kafka事务API来实现的,允许生产者在发送消息时进行事务性操作,以确保消息的完整性和一致性。

示例代码:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;

import java.util.Properties;

public class TransactionalProducer {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.initTransactions();

        try {
   
   
            producer.beginTransaction();

            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
            producer.send(record, new Callback() {
   
   
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
   
   
                    if (exception != null) {
   
   
                        try {
   
   
                            producer.abortTransaction();
                        } catch (KafkaException e) {
   
   
                            e.printStackTrace();
                        }
                        exception.printStackTrace();
                    } else {
   
   
                        producer.commitTransaction();
                        System.out.println("Produced record with offset " + metadata.offset());
                    }
                }
            });
        } catch (ProducerFencedException | KafkaException e) {
   
   
            e.printStackTrace();
        } finally {
   
   
            producer.close();
        }
    }
}

在上述代码中,我们创建了一个启用了事务性的生产者,并使用 beginTransaction()commitTransaction() 方法来确保消息的Exactly-Once语义。

相关文章
|
1天前
|
消息中间件 JSON druid
Druid:通过 Kafka 加载流数据
Druid:通过 Kafka 加载流数据
42 0
|
1天前
|
消息中间件 关系型数据库 Kafka
Flink CDC可以从Kafka消费数据并写入到Doris中
Flink CDC可以从Kafka消费数据并写入到Doris中
294 2
|
1天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
41 0
|
1天前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
75 2
|
1天前
|
消息中间件 关系型数据库 MySQL
Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql
Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql
32 0
|
1天前
|
消息中间件 存储 Kafka
【Kafka】Kafka 的日志保留期与数据清理策略
【4月更文挑战第13天】【Kafka】Kafka 的日志保留期与数据清理策略
|
1天前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
|
1天前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
51 1
|
1天前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
1天前
|
机器学习/深度学习 消息中间件 人工智能
机器学习PAI报错问题之读取kafka数据报错如何解决
人工智能平台PAI是是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务;本合集将收录PAI常见的报错信息和解决策略,帮助用户迅速定位问题并采取相应措施,确保机器学习项目的顺利推进。