开发者社区> 问答> 正文

RocketMq 时间点消费消息无效怎么办?:报错

我的业务场景是,前10天一直有消息堆积,业务方没有消费,第11天业务方开始消费,但希望从当前时间开始消费,以前的消息不处理;

通过看官方提供的例子,找到如下测试代码,但测试时发现,重新定义一个新消费组,1分钟前的消息依然能消费,请高手指点。

 
 public static void main(String[] args) throws InterruptedException, MQClientException {    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer23");

consumer.setNamesrvAddr("10.7.13.83:9876;10.7.13.84:9876");                  
consumer.subscribe("TopicTest1", "*");                    // 一个新的订阅组第一次启动从指定时间点开始消费         
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
// 设置时间点
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(
           System.currentTimeMillis() - (1000 * 60 * 1)));

consumer.registerMessageListener(new MessageListenerConcurrently() {         
@Override          
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
           ConsumeConcurrentlyContext context) {

    MessageExt msg = msgs.get(0);                   
    System.out.println(new String(msg.getBody()));
    String keys = msg.getKeys();
    System.out.println("keys="+keys);                 
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
});                
/**  * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>  */                  consumer.start();

System.out.println("Consumer Started.");
}

展开
收起
kun坤 2020-06-14 07:59:01 1131 0
1 条回答
写回答
取消 提交回答
  • 这是你业务逻辑做的吧,本来mq就是这样消费的。

    ######
    consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(
               System.currentTimeMillis() - (1000 * 60 * 1)));
    你设置的是从一分钟前开始消费啊。。。######回复 @云中飞雪 : 你新的组的group以及topic和现在一样吗?######是的,我期望是,当我用一个新的组去消费的时候,从当前时间开始消费,10天前的消息不消费
    2020-06-14 07:59:07
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载