阿里云MQ消息重试机制验证

简介:   1 测试概述 1.1 测试目的 阿里云mq消息逻辑消费失败之后的处理方式如官方文档https://help.aliyun.com/document_detail/43490.html 本文的目的是确认阿里公共云MQ产品的消息消费重试机制是否如文档描述。


阿里云mq消息在业务逻辑处理失败之后的处理方式,如官方文档

本文的目的是结合实际业务场景验证阿里公共云MQ产品的消息消费重试机制。

以下共采用以下三种测试场景,重试失败之后的重试次数和间隔时间,消息消费超时情况下的重试策略,应用不稳定情况下的消息重试情况。

以阿里云mq公网环境为例,测试配置如下:

public static final String TOPIC = "topic_yb";    

public static final String PRODUCER_ID = "PID_yinbing";    

public static final String CONSUMER_ID = "CID_yinbing";      

public static final String ACCESS_KEY = "xxxx";    

public static final String SECRET_KEY = "xxxx";    

public static final String TAG = "mq_test_tag";

public static final String ONSADDR = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";

 

测试方案一

1.1 测试用例

1、 生产者发送一条普通消息。

2、 消费者订阅消息,模拟消费失败,并直接返回 Action.ReconsumeLater 。

3、 观察现象:当消费者未能正常消费时,观察消息重试次数及时间间隔。

1.2 测试代码

1、 生产者代码如下:

String msgpre = "mq send exception message test 1";

Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, msgbody.getBytes());

 try {

     SendResult sendResult = producer.send(message);

     assert sendResult != null;

     System.out.println(new Date() + ", msgid is "+ sendResult.getMessageId());

 } catch (ONSClientException e) {

     System.out.println("发送失败");

 }

代码描述:发送1次普通消息

2、 消费者代码:

consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl() {

   @Override

   public Action consume(Message message, ConsumeContext consumeContext) {

           System.out.println(new Date() + ", msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());

           return Action.ReconsumeLater;

        }

   }

代码描述:消费者订阅该消息,模拟消费失败,直接返回Action.ReconsumeLater

 

 

1.3 测试过程

1、 生产者发送消息

代码日志描述:发送1次普通消息

d239e1c74181bca640aaea08ecd8744ec6bc80ee

2、 消费者订阅消息

baa36ed345086c092f20319da1754a64cfb3f7cc

3、 消息重试间隔时间

消费序次

消费时间

消费间隔

官网文档间隔

0

13:44:52

 

 

1

13:45:19

27

10

2

13:45:49

30

30

3

13:46:49

1

1

4

13:48:49

2

2

5

13:51:49

3

3

6

13:55:49

4

4

7

14:00:49

5

5

8

14:06:50

6分1秒

6

9

14:13:59

7分9秒

7

10

14:21:59

8

8

11

14:30:59

9

9

12

14:41:00

10分1秒

10

13

15:01:00

20

20

14

15:31:00

30

30

15

16:31:00

1小时

1小时

16

18:31:00

2小时

2小时

17

 

 

4、 消息轨迹查询

查询重试的消息,包括16次重试次数,一共消费了17次

01ea7352be090b6a38c8a0f410377329fa5823b5

 

1.4 测试结果

经测试,消费者短时间内返回消费失败结果的情况下,得出如下结论:

1、 消费者未能正常消费时,MQ将重新投递消息;

2、 消费者重试时间间隔与阿里云官方文档基本一致;

3、 消费者重试超过16次之后不再进行重试,与阿里云官方文档一致;

测试方案二

2.1 测试用例

1、生产者发送一条普通消息。

2、消费者订阅该消息,并睡眠3分钟(模拟复杂业务消费),消息消费超时时间设置为2分钟。

3、观察消费超时情况下的消息重试策略。

2.2 测试代码

1、生产者代码如下:

 

代码与方案一一致

2、消费者代码:

consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListener() {

@Override

   public Action consume(Message message, ConsumeContext consumeContext) {

       System.out.println(new Date() + ", sleep begin, msgid is " + message.getMsgID());

       try {

Thread.sleep(3*60*1000);

             } catch (InterruptedException e) {

e.printStackTrace();

             }

           System.out.println(new Date() + ", sleep end, msgid is " + message.getMsgID());

           System.out.println(new Date() + ", consume commit, msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());

           return Action.CommitMessage;

   }

});

 

代码描述:消费者订阅并消费消息,并睡眠3分钟。

2.3 测试过程

1、生产者发送消息

     代码日志描述:发送一次普通消息

 ca5a6fa2452afc6a367220c1dde392b5e27346bd


2、消费者订阅消息

   消费者19:34:23第一次消费消息,sleep 3分钟后,19:37:23 客户端向mq服务端发送消息消费成功的确认。

6dac64d814f31f0e7b10a4d1b410c183222bc3c4

3、查看消息订阅情况

这段时间,2分钟的消费超时机制并没有被触发,此时查看消息的消费记录,发现该消息id只消费一次,处于消费成功的状态。

ad6e61916ebb060b5c8cb92567e62da19e8befe3

 

2.4 测试结果

经测试,消费者执行长时间后(3分钟),返回MQ消费成功信息情况下,得出如下结论:

1、消费者未能在MQ超时时间窗口内返回消费成功的信息时,MQ未重新投递消息;

2、阿里云官方文档描述” 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费”,与实际验证结果不符。

测试方案三

3.1 测试用例

1、在消息重试投递的情况下。

2、消费者订阅该消息,并重启应用。

3、观察现象:应用重启时,是否会出现瞬间同时多次重复消费同一条消息的情况。

3.2 测试代码

测试代码与方案一代码一致

3.3 测试过程

1、生产者代码发送消息

代码日志描述:发送5条普通消息

7dad679f003ee3c70ec622c43df254850e773a10

2、消费者订阅消息,消费失败,导致消息重复投递

8441894b144cea5d43d71dc99e52617583a3eff1

3、重启消费端应用,查看重新投递的消息

935feaf4d73dd53d771ebc39363c395986e4c102

发现,5条消息在第三次重复投递的过程中,投放了15条消息

 

3.4 测试结果

经测试,消息重复投递的情况下,应用重启后,得出如下结论:

1、应用重启瞬间,同时多次重复消费同一条消息;

 

 

 

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 Serverless
【实践】快速学会使用阿里云消息队列RabbitMQ版
云消息队列 RabbitMQ 版是一款基于高可用分布式存储架构实现的 AMQP 0-9-1协议的消息产品。云消息队列 RabbitMQ 版兼容开源 RabbitMQ 客户端,解决开源各种稳定性痛点(例如消息堆积、脑裂等问题),同时具备高并发、分布式、灵活扩缩容等云消息服务优势。
126 2
|
7月前
|
消息中间件 安全 API
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
317 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
|
7月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
195 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
|
7月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
268 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
|
4月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
84 9
|
7月前
|
消息中间件 Cloud Native Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(7)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
132 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(7)
|
7月前
|
消息中间件 Cloud Native Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(6)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
130 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(6)
|
7月前
|
消息中间件 Apache 数据安全/隐私保护
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(3)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
131 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(3)
|
7月前
|
消息中间件 Apache RocketMQ
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(5)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
139 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(5)
|
7月前
|
消息中间件 Apache RocketMQ
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(8)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
135 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(8)