如何可以重现 生产者重试机制? 将超时时间设置很短,也是发送了一次? 是有开关?
本问题来自阿里云开发者社区的【11大垂直技术领域开发者社群】。 点击链接欢迎加入感兴趣的技术领域群。
以阿里云mq公网环境为例,测试配置如下:
public static final String TOPIC = "topic_yb";
public static final String PRODUCER_ID = "PID_yinbing";
public static final String CONSUMER_ID = "CID_yinbing";
public static final String ACCESS_KEY = "xxxx";
public static final String SECRET_KEY = "xxxx";
public static final String TAG = "mq_test_tag";
public static final String ONSADDR = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
1 测试方案一 1.1 测试用例 1ã生产者发送一条普通消息。
2ã消费者订阅消息,模拟消费失败,并直接返回 Action.ReconsumeLater 。
3ã观察现象:当消费者未能正常消费时,观察消息重试次数及时间间隔。
1.2 测试代码 1ã生产者代码如下:
String msgpre = "mq send exception message test 1";
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, msgbody.getBytes());
try {
SendResult sendResult = producer.send(message);
assert sendResult != null;
System.out.println(new Date() + ", msgid is "+ sendResult.getMessageId());
} catch (ONSClientException e) {
System.out.println("发送失败");
}
代码描述:发送1次普通消息
2ã消费者代码:
consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
System.out.println(new Date() + ", msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());
return Action.ReconsumeLater;
}
}
代码描述:消费者订阅该消息,模拟消费失败,直接返回Action.ReconsumeLater
1.3 测试过程 1ã生产者发送消息
代码日志描述:发送1次普通消息
d239e1c74181bca640aaea08ecd8744ec6bc80ee
2ã消费者订阅消息
baa36ed345086c092f20319da1754a64cfb3f7cc
3ã消息重试间隔时间
消费序次
消费时间
消费间隔
官网文档间隔
0
13:44:52
1
13:45:19
27秒
10秒
2
13:45:49
30秒
30秒
3
13:46:49
1分
1分
4
13:48:49
2分
2分
5
13:51:49
3分
3分
6
13:55:49
4分
4分
7
14:00:49
5分
5分
8
14:06:50
6分1秒
6分
9
14:13:59
7分9秒
7分
10
14:21:59
8分
8分
11
14:30:59
9分
9分
12
14:41:00
10分1秒
10分
13
15:01:00
20分
20分
14
15:31:00
30分
30分
15
16:31:00
1小时
1小时
16
18:31:00
2小时
2小时
17
无
4ã消息轨迹查询
查询重试的消息,包括16次重试次数,一共消费了17次
01ea7352be090b6a38c8a0f410377329fa5823b5
1.4 测试结果 经测试,消费者短时间内返回消费失败结果的情况下,得出如下结论:
1ã消费者未能正常消费时,MQ将重新投递消息;
2ã消费者重试时间间隔与阿里云官方文档基本一致;
3ã消费者重试超过16次之后不再进行重试,与阿里云官方文档一致;
2 测试方案二 2.1 测试用例 1、生产者发送一条普通消息。
2、消费者订阅该消息,并睡眠3分钟(模拟复杂业务消费),消息消费超时时间设置为2分钟。
3、观察消费超时情况下的消息重试策略。
2.2 测试代码 1、生产者代码如下:
代码与方案一一致
2、消费者代码:
consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
System.out.println(new Date() + ", sleep begin, msgid is " + message.getMsgID());
try {
Thread.sleep(3601000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date() + ", sleep end, msgid is " + message.getMsgID());
System.out.println(new Date() + ", consume commit, msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());
return Action.CommitMessage;
}
});
代码描述:消费者订阅并消费消息,并睡眠3分钟。
2.3 测试过程 1、生产者发送消息
代码日志描述:发送一次普通消息
ca5a6fa2452afc6a367220c1dde392b5e27346bd
2、消费者订阅消息
消费者19:34:23第一次消费消息,sleep 3分钟后,19:37:23 客户端向mq服务端发送消息消费成功的确认。
6dac64d814f31f0e7b10a4d1b410c183222bc3c4
3、查看消息订阅情况
这段时间,2分钟的消费超时机制并没有被触发,此时查看消息的消费记录,发现该消息id只消费一次,处于消费成功的状态。
ad6e61916ebb060b5c8cb92567e62da19e8befe3
2.4 测试结果 经测试,消费者执行长时间后(3分钟),返回MQ消费成功信息情况下,得出如下结论:
1、消费者未能在MQ超时时间窗口内返回消费成功的信息时,MQ未重新投递消息;
2、阿里云官方文档描述” 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费”,与实际验证结果不符。
3 测试方案三 3.1 测试用例 1、在消息重试投递的情况下。
2、消费者订阅该消息,并重启应用。
3、观察现象:应用重启时,是否会出现瞬间同时多次重复消费同一条消息的情况。
3.2 测试代码 测试代码与方案一代码一致
3.3 测试过程 1、生产者代码发送消息
代码日志描述:发送5条普通消息
7dad679f003ee3c70ec622c43df254850e773a10
2、消费者订阅消息,消费失败,导致消息重复投递
8441894b144cea5d43d71dc99e52617583a3eff1 3、重启消费端应用,查看重新投递的消息
935feaf4d73dd53d771ebc39363c395986e4c102
发现,5条消息在第三次重复投递的过程中,投放了15条消息
3.4 测试结果 经测试,消息重复投递的情况下,应用重启后,得出如下结论:
1、应用重启瞬间,同时多次重复消费同一条消息;
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。