以前都是作为消息接收方,接收消息。记得当时做支付的时候,接收第三方支付公司的各种消息,如支付成功、支付失败、退款成功、退款失败。
有的公司在消息推送的准确性与及时性方面做的很好,有的就很差,尤其是几家欧洲公司,如果我方消费失败,该消息会阻塞在队头,导致整个消费队列卡住,无法消费后面消息,引起我们的各种吐槽。
天道好轮回,苍天饶过了谁,终于轮到我当发送方了。虽然这个项目难度不是很高,但这是自己第一个推送相关的项目,所以记录一下。
原因
本来提供了拉单功能,消费方可定时拉单。
但定时拉单有几个问题,一是时效性问题,如消费方每N分钟拉取一次数据,则最大延迟为N分钟,时效性不好;二是拉取到的订单信息可能不全,对于信息不全的订单,消费方需要定时重新获取,增加了复杂性。
推单方案则能解决以上问题,首先只有信息完整才推单,其次是信息完整后立即推单。这既能保证时效性,同时也能降低对接复杂度。
方案调研
以前没做过推单功能,仔细想了一下,需要先调研三方面内容:推送什么内容、如何推送、推送失败如何处理。
推送内容
如何确定给消费方推送的内容呢?
通过与消费方沟通、咨询公司业务方、调查竞品,确定好推送内容。
不过仍有未考虑到的地方,在与接收方联调推单模式时,有的接收方想要批量推送功能。
目前设计为一单一单推送,基于现状也不会做批量推送功能,还是因为时效性问题,有消息就尽快推送给接收方,接收方收到后尽快流转。批量推送会让整体时效降低,而且实现上也会更加复杂。
但这个示例说明调研上有缺漏,今后可多调研几家,有些需求可以选择不做,但要知晓需求。
如何推送
如何将数据推送出去呢?
当然得有个平台!因为推送要考虑很多细节,如数据安全、异常处理、接口管理等,好在有部门已经做了推送的管理平台。提供出两种方案,同步方案和异步方案。
同步方案
同步方式,是消息发送方,直接调用接收方接口,能够立即感知结果
异步方案
异步方式,接收方将消息推送到消息队列,接收方按需进行消费
区别
同步和异步方式有如下区别
- 同步方式可直接知道接收方处理结果;异步方式无法知道处理结果
- 同步方式由发送方保证消息推送成功;异步方式由接收方自行保证
异步方式在设计上,只支持同类型的消息推送给同一个接收方,但商家订单属于不同接收方,使用异步方式会导致信息泄露。而且异步方式也无法知道消息接收情况,所以最终选择同步方案。
异常处理
推送流程比较简单,消费mq消息,给接收者推送订单信息。我们永远不要相信网络和接收方,总是出现各种各样的问题,当消息消费失败后如何处理呢?
当然希望Consumer在消费消息包的时候,如果出现一些异常,希望消息包不被直接丢弃,而是可以过段时间继续消费,同时不产生阻塞。简单来说就是重试。
rabbitmq
以前用的rabbitmq,消费异常后,可以将消息包重新放入主队列的队尾,有两种方案:
死信队列
basic_consume
设置ack模式- 声明死信队列,设置
x-message-ttl=30000
,x-dead-letter-routing-key=主队列名
- 声明主队列,设置
x-dead-letter-routing-key=死信队列名
- 主队列通过
RoutingKey
绑定到Exchange
如果消费逻辑出现异常,消费脚本会调用basic\_reject(),消息包会被RabbitMQ Requeue到死信队列
中。30s超时后,消息包会重新进入主队列的队尾。
重新投递
通过chanel.basicAck(tagId, false)与chanel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), true, MessageProperties.PERSISTENT\_TEXT\_PLAIN, body.getbytes()); 搭配可将消息放回消息队尾,这两个函数一个不再将未被确认的消息发送回队列,一个用于重新投递消息。
rocketmq
现在公司使用的rocketmq,基于我对rocketmq的理解,它也是可以进行无阻塞重试的,因为有重试队列嘛:
重试队列
消费失败后,消息会进入到 RocketMQ 的重试队列中。
- 比如说消费者所属的消息组名称为
AAAConsumerGroup
- 其重试队列名称就叫做
%RETRY%AAAConsumerGroup
- 重试队列中的消息过一段时间会再次发送给消费者,如果还是无法正常执行会再次进入重试队列
- 默认重试16次,还是无法执行,消息就会从重试队列进入到死信队列
因为公司对rocketmq做了一些更改,所以找同学确认重试机制。问了很多同学,大家都说是阻塞性重试,消息阻塞在队头,直到重试成功或达到重试上限。没办法,只能再找对应的研发同学进行确认。
原来他们设计了两种配置,消息有序和消息无序。在有序情况下,消费失败后会阻塞在队头,直到重试成功;无序情况下,会进入重试队列,根据设置的重试间隔和重试次数进行重试,不会阻塞。如此一来,重试问题也解决了。
之所以关注这一点,是因为阻塞性重试会导致后面的消息无法推送,对功能产生影响。
而且即使是非阻塞性重试,也最好设置重试上限,如果异常太多,容易导致消息生产方压力过大,产生崩溃。需要明白多次投递失败的责任方在接收者。
监控
项目上线后,需要进行监控,否则无法感知运行情况。数据团队同学给力,很快整理好报表,能够实时查看推单成功数量、推单失败数量、订单信息完整时间、推单失败细节。
通过这些数据,能快速发现隐藏问题,也能分析出各接收方的情况,有的接收方确实是一言难尽。
发展
项目上线后,能够确定订单信息完整时间,并能在订单信息完整后,将数据推送给相关方。
后续会优化拉单功能,保证拉到的订单都是完整订单,同时会更改系统中判断信息完整的逻辑,将判断模块做简化和收敛,为今后的系统更新做好准备。
资料
- 消息队列RocketMQ版消费消息失败是否会重新消费
- 消息队列中消息消费失败后的处理机制
- rabbitmq消息重回队列
- rabbitmq重试机制
- RabbitMQ的消息确认机制
- 团队使用RabbitMQ几个场景
- https://www.rabbitmq.com/documentation.html
- rabbitmq死信队列
- RabbitMQ 死信队列 + TTL介绍
- RocketMQ 死信队列 | 消费者出现异常如何处理?
- RocketMQ消费消息失败的处理办法
最后
大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)
我的个人博客为:https://shidawuhen.github.io/
往期文章回顾: