我的业务场景是,前10天一直有消息堆积,业务方没有消费,第11天业务方开始消费,但希望从当前时间开始消费,以前的消息不处理;
通过看官方提供的例子,找到如下测试代码,但测试时发现,重新定义一个新消费组,1分钟前的消息依然能消费,请高手指点。
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer23");
consumer.setNamesrvAddr("10.7.13.83:9876;10.7.13.84:9876"); consumer.subscribe("TopicTest1", "*"); // 一个新的订阅组第一次启动从指定时间点开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); // 设置时间点 consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3( System.currentTimeMillis() - (1000 * 60 * 1)));
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(<span style="color:#0000ff;">0); <span style="color:#808080;font-style:italic;"> <span style="color:#808080;font-style:italic;">
System.out.println(new String(msg.getBody())); String keys = msg.getKeys(); System.out.println("keys="+keys); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start();
System.out.println("Consumer Started."); }
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。