Pulsar 也会重复消费?

简介: 排查了一个问题:在使用 Pulsar 消费时,发生了同一条消息反复消费的情况。

排查


当他告诉我这个现象的时候我就持怀疑态度,根据之前使用的经验 Pulsar 在官方文档以及 API 中都解释过:


网络异常,图片无法展示
|

网络异常,图片无法展示
|


只有当设置了消费的 ackTimeout 并超时消费时才会重复投递消息,默认情况下是关闭的,查看代码也确实没有开启。


那会不会是调用了 negativeAcknowledge() 方法呢(调用该方法也会触发重新投递),因为我们使了一个第三方库 github.com/majusko/pul… 只有当抛出异常时才会调用该方法。


查阅代码之后也没有地方抛出异常,甚至整个过程中都没看到异常产生;这就有点诡异了。


复现


为了捋清楚整个事情的来龙去脉,详细了解了他的使用流程;


其实也就是业务出现了 bug,他在消息消费时 debug 然后进行单步调试,当走完一次调试后,没多久马上又收到了同样的消息。


但奇怪的是也不是每次 debug 后都能重复消费,我们都说如果一个 bug 能 100% 完全复现,那基本上就解决一大半了。


所以我们排查的第一步就是完全复现这个问题。


为了排除掉是 IDEA 的问题(虽然极大概率不太可能)既然是 debug 的时候产生的问题,那其实转换到代码也就是 sleep 嘛,所以我们打算在消费逻辑里直接 sleep 一段时间看能否复现。


经过测试,sleep 几秒到几十秒都无法复现,最后索性 sleep 一分钟,神奇的事情发生了,每次都成功复现!


既然能成功复现那就好说了,因为我自己的业务代码也有使用到 Pulsar 的地方,为了方便调试就准备在自己的项目里再复现一次。


结果诡异的事情再次发生,我这里又不能复现了。


虽然这才是符合预期的,但这就没法调了呀。


本着相信现代科学的前提,我们俩唯一的区别就是项目不一样了,为此我对比了两边的代码。


@PulsarConsumer(
            topic = xx,
            clazz = Xx.class,
            subscriptionType = SubscriptionType.Shared
    )
    public void consume(Data msg) {
        log.info("consume msg:{}", msg.getOrderId());
        Lock lock = redisLockRegistry.obtain(msg.getOrderId());
        if (lock.tryLock()) {
            try {
                orderService.do(msg.getOrderId());
            } catch (Exception e) {
                log.error("consumer msg:{} err:", msg.toString(), e);
            } finally {
                lock.unlock();
            }
        }
    }


结果不出所料,同事那边的代码加了锁;一个基于 Redis 的分布式锁,这时我一拍大腿不会是解锁的时候超时了导致抛了异常吧。


为了验证这个问题,在能复现的基础上我在框架的 Pulsar 消费处打了断点:


网络异常,图片无法展示
|


网络异常,图片无法展示
|


果然破案了,异常提示已经非常清楚了:加锁已经过了超时时间。


进入异常后直接 negative 消息,同时异常也被吃掉了,所以之前没有发现。


网络异常,图片无法展示
|


查阅了 RedisLockRegistry 的源码,默认超时时间正好是一分钟,所以之前我们 sleep 几十秒也无法复现这个问题。


总结


事后我向同事了解了下为啥这里要加锁,因为我看下来完全没有加锁的必要;结果他是因为从别人那里复制的代码才加上的,压根没想那么多。


所以这事也能得出一些教训:


  • ctrl C/V 虽然方便,但也得充分考虑自己的业务场景。


  • 使用一些第三方 API 时,需要充分了解其作用、参数。


相关文章
|
负载均衡 算法 应用服务中间件
面试题:Nginx有哪些负载均衡算法?Nginx位于七层网络结构中的哪一层?
字节跳动面试题:Nginx有哪些负载均衡算法?Nginx位于七层网络结构中的哪一层?
420 0
|
文字识别 安全 Java
SpringBoot3.x和OCR构建车牌识别系统
本文介绍了一个基于Java SpringBoot3.x框架的车牌识别系统,详细阐述了系统的设计目标、需求分析及其实现过程。利用Tesseract OCR库和OpenCV库,实现了车牌图片的识别与处理,确保系统的高准确性和稳定性。文中还提供了具体的代码示例,展示了如何构建和优化车牌识别服务,以及如何处理特殊和异常车牌。通过实际应用案例,帮助读者理解和应用这一解决方案。
|
监控 Linux
Zabbix 5.0 LTS的agent服务部署实战篇
文章介绍了如何在CentOS 7.6操作系统上部署Zabbix 5.0 LTS版本的agent服务,包括配置软件源、安装agent、修改配置文件、启动服务,并在Zabbix web界面添加监控。
468 4
Zabbix 5.0 LTS的agent服务部署实战篇
|
人工智能 资源调度 算法
算法金 | 一个强大的算法模型,GPR !!
高斯过程回归(GPR)是基于高斯过程的非参数贝叶斯方法,用于捕捉数据的非线性关系并提供不确定性估计。它利用核函数描述输入数据的潜在函数,如径向基函数(RBF)用于平滑建模。GPR通过最大化对数似然函数选择超参数。代码示例展示了如何使用`sklearn`库进行GPR,生成模拟数据,训练模型,并用RBF核函数进行预测,最后通过绘图展示预测结果及置信区间。
555 3
算法金 | 一个强大的算法模型,GPR !!
|
存储 算法 物联网
海量数据实时计算利器:深入探索Tec(一个假设性技术框架)
总之,Tec作为海量数据实时计算利器,在推动数字化转型、提升业务效率、保障数据安全等方面发挥着重要作用。随着技术的不断进步和应用场景的不断拓展,Tec的未来发展前景将更加广阔。
|
算法 计算机视觉
图像处理之霍夫变换圆检测算法
图像处理之霍夫变换圆检测算法
286 0
|
消息中间件 存储 Kafka
几种 MQ 顺序消息的实现方式
几种 MQ 顺序消息的实现方式
|
数据安全/隐私保护
加密与签名的区别
加密与签名的区别
373 0
|
消息中间件 缓存 监控
mq如何保证消息顺序性
mq如何保证消息顺序性
|
JSON 安全 Java
手把手教你使用Flask框架构建Python接口以及如何请求该接口
手把手教你使用Flask框架构建Python接口以及如何请求该接口