开发者社区> 问答> 正文

如何可以重现 生产者重试机制? 将超时时间设置很短,也是发送了一次? 是有开关?

如何可以重现 生产者重试机制? 将超时时间设置很短,也是发送了一次? 是有开关?

本问题来自阿里云开发者社区的【11大垂直技术领域开发者社群】。 点击链接欢迎加入感兴趣的技术领域群。

展开
收起
游客pklijor6gytpx 2019-10-23 16:41:11 536 0
1 条回答
写回答
取消 提交回答
  • 以阿里云mq公网环境为例,测试配置如下:

    public static final String TOPIC = "topic_yb";

    public static final String PRODUCER_ID = "PID_yinbing";

    public static final String CONSUMER_ID = "CID_yinbing";

    public static final String ACCESS_KEY = "xxxx";

    public static final String SECRET_KEY = "xxxx";

    public static final String TAG = "mq_test_tag";

    public static final String ONSADDR = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";

    1 测试方案一 1.1 测试用例 1、生产者发送一条普通消息。

    2、消费者订阅消息,模拟消费失败,并直接返回 Action.ReconsumeLater 。

    3、观察现象:当消费者未能正常消费时,观察消息重试次数及时间间隔。

    1.2 测试代码 1、生产者代码如下:

    String msgpre = "mq send exception message test 1";

    Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, msgbody.getBytes());

    try {

     SendResult sendResult = producer.send(message);
    
     assert sendResult != null;
    
     System.out.println(new Date() + ", msgid is "+ sendResult.getMessageId());
    

    } catch (ONSClientException e) {

     System.out.println("发送失败");
    

    }

    代码描述:发送1次普通消息

    2、消费者代码:

    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl() {

    @Override

    public Action consume(Message message, ConsumeContext consumeContext) {

           System.out.println(new Date() + ", msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());
    
           return Action.ReconsumeLater;
    
        }
    

    }

    代码描述:消费者订阅该消息,模拟消费失败,直接返回Action.ReconsumeLater

    1.3 测试过程 1、生产者发送消息

    代码日志描述:发送1次普通消息

    d239e1c74181bca640aaea08ecd8744ec6bc80ee

    2、消费者订阅消息

    baa36ed345086c092f20319da1754a64cfb3f7cc

    3、消息重试间隔时间

    消费序次

    消费时间

    消费间隔

    官网文档间隔

    0

    13:44:52

    1

    13:45:19

    27秒

    10秒

    2

    13:45:49

    30秒

    30秒

    3

    13:46:49

    1分

    1分

    4

    13:48:49

    2分

    2分

    5

    13:51:49

    3分

    3分

    6

    13:55:49

    4分

    4分

    7

    14:00:49

    5分

    5分

    8

    14:06:50

    6分1秒

    6分

    9

    14:13:59

    7分9秒

    7分

    10

    14:21:59

    8分

    8分

    11

    14:30:59

    9分

    9分

    12

    14:41:00

    10分1秒

    10分

    13

    15:01:00

    20分

    20分

    14

    15:31:00

    30分

    30分

    15

    16:31:00

    1小时

    1小时

    16

    18:31:00

    2小时

    2小时

    17

    4、消息轨迹查询

    查询重试的消息,包括16次重试次数,一共消费了17次

    01ea7352be090b6a38c8a0f410377329fa5823b5

    1.4 测试结果 经测试,消费者短时间内返回消费失败结果的情况下,得出如下结论:

    1、消费者未能正常消费时,MQ将重新投递消息;

    2、消费者重试时间间隔与阿里云官方文档基本一致;

    3、消费者重试超过16次之后不再进行重试,与阿里云官方文档一致;

    2 测试方案二 2.1 测试用例 1、生产者发送一条普通消息。

    2、消费者订阅该消息,并睡眠3分钟(模拟复杂业务消费),消息消费超时时间设置为2分钟。

    3、观察消费超时情况下的消息重试策略。

    2.2 测试代码 1、生产者代码如下:

    代码与方案一一致

    2、消费者代码:

    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListener() {

    @Override

    public Action consume(Message message, ConsumeContext consumeContext) {

       System.out.println(new Date() + ", sleep begin, msgid is " + message.getMsgID());
    
       try {
    

    Thread.sleep(3601000);

             } catch (InterruptedException e) {
    

    e.printStackTrace();

             }
    
           System.out.println(new Date() + ", sleep end, msgid is " + message.getMsgID());
    
           System.out.println(new Date() + ", consume commit, msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());
    
           return Action.CommitMessage;
    

    }

    });

    代码描述:消费者订阅并消费消息,并睡眠3分钟。

    2.3 测试过程 1、生产者发送消息

     代码日志描述:发送一次普通消息
    

    ca5a6fa2452afc6a367220c1dde392b5e27346bd

    2、消费者订阅消息

    消费者19:34:23第一次消费消息,sleep 3分钟后,19:37:23 客户端向mq服务端发送消息消费成功的确认。

    6dac64d814f31f0e7b10a4d1b410c183222bc3c4

    3、查看消息订阅情况

    这段时间,2分钟的消费超时机制并没有被触发,此时查看消息的消费记录,发现该消息id只消费一次,处于消费成功的状态。

    ad6e61916ebb060b5c8cb92567e62da19e8befe3

    2.4 测试结果 经测试,消费者执行长时间后(3分钟),返回MQ消费成功信息情况下,得出如下结论:

    1、消费者未能在MQ超时时间窗口内返回消费成功的信息时,MQ未重新投递消息;

    2、阿里云官方文档描述” 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费”,与实际验证结果不符。

    3 测试方案三 3.1 测试用例 1、在消息重试投递的情况下。

    2、消费者订阅该消息,并重启应用。

    3、观察现象:应用重启时,是否会出现瞬间同时多次重复消费同一条消息的情况。

    3.2 测试代码 测试代码与方案一代码一致

    3.3 测试过程 1、生产者代码发送消息

    代码日志描述:发送5条普通消息

    7dad679f003ee3c70ec622c43df254850e773a10

    2、消费者订阅消息,消费失败,导致消息重复投递

    8441894b144cea5d43d71dc99e52617583a3eff1 3、重启消费端应用,查看重新投递的消息

    935feaf4d73dd53d771ebc39363c395986e4c102

    发现,5条消息在第三次重复投递的过程中,投放了15条消息

    3.4 测试结果 经测试,消息重复投递的情况下,应用重启后,得出如下结论:

    1、应用重启瞬间,同时多次重复消费同一条消息;

    2019-10-23 18:51:25
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载