Kafka 怎么顺序消费?面试必备。。。

简介: Kafka 怎么顺序消费?面试必备。。。

前言

本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。


如存在Topic-insert和Topic-update分别是对数据的插入和更新,当insert和update操作为同一数据时,应保证先insert再update。


1、问题引入

kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。


如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?本文主要解决此问题。


2、解决思路

现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。


两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。


使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。


细粒度锁实现:https://blog.csdn.net/qq_38245668/article/details/105891161

PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。


在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况。


处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。


3、实现方案

消息发送:

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

监听代码示例:

KafkaListenerDemo.java

@Component
@Slf4j
public class KafkaListenerDemo {
    // 消费到的数据缓存
    private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    // 数据存储
    private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
    private WeakRefHashLock weakRefHashLock;
    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }
    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
        // 模拟顺序异常,也就是insert后消费,这里线程sleep
        Thread.sleep(1000);
        String id = record.value();
        log.info("接收到insert :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("开始处理 {} 的insert", id);
            // 模拟 insert 业务处理
            Thread.sleep(1000);
            // 从缓存中获取 是否存在有update数据
            if (UPDATE_DATA_MAP.containsKey(id)){
                // 缓存数据存在,执行update
                doUpdate(id);
            }
            log.info("处理 {} 的insert 结束", id);
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }
    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
        String id = record.value();
        log.info("接收到update :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            // 测试使用,不做数据库的校验
            if (!DATA_MAP.containsKey(id)){
                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存
                log.info("消费顺序异常,将update数据 {} 加入缓存", id);
                UPDATE_DATA_MAP.put(id, id);
            }else {
                doUpdate(id);
            }
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }
    void doUpdate(String id) throws InterruptedException{
        // 模拟 update
        log.info("开始处理update::{}", id);
        Thread.sleep(1000);
        log.info("处理update::{} 结束", id);
    }
}

日志(代码中已模拟必现消费顺序异常的场景):

接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。

相关文章
|
1月前
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
4月前
|
消息中间件 存储 负载均衡
Kafka面试题及答案
Kafka面试题及答案
|
1月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
2月前
|
消息中间件 存储 缓存
美团面试: Kafka为啥能实现 10Wtps 到100Wtps ?kafka 如何实现零复制 Zero-copy?
40岁老架构师尼恩分享了Kafka如何实现高性能的秘诀,包括零拷贝技术和顺序写。Kafka采用mmap和sendfile两种零拷贝技术,前者用于读写索引文件,后者用于向消费者发送消息,减少数据在用户空间和内核空间间的拷贝次数,提高数据传输效率。此外,Kafka通过顺序写日志文件,避免了磁盘寻道和旋转延迟,进一步提升了写入性能。尼恩还提供了系列技术文章和PDF资料,帮助读者深入理解这些技术,提升面试竞争力。
美团面试: Kafka为啥能实现 10Wtps 到100Wtps ?kafka 如何实现零复制 Zero-copy?
|
4月前
|
消息中间件 算法 Java
面试官:Kafka中的key有什么用?
面试官:Kafka中的key有什么用?
169 3
面试官:Kafka中的key有什么用?
|
2月前
|
消息中间件 存储 Kafka
面试题:Kafka如何保证高可用?有图有真相
面试题:Kafka如何保证高可用?有图有真相
|
5月前
|
消息中间件 Kafka
面试题Kafka问题之Kafka【线上】积压消费如何解决
面试题Kafka问题之Kafka【线上】积压消费如何解决
40 0
|
4月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
1月前
|
存储 缓存 算法
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
本文介绍了多线程环境下的几个关键概念,包括时间片、超线程、上下文切换及其影响因素,以及线程调度的两种方式——抢占式调度和协同式调度。文章还讨论了减少上下文切换次数以提高多线程程序效率的方法,如无锁并发编程、使用CAS算法等,并提出了合理的线程数量配置策略,以平衡CPU利用率和线程切换开销。
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
|
1月前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
下一篇
DataWorks