我想从最新的消息开始消费。使用的RocketMQ集群模式。这个该怎么做到呢?
在RocketMQ中,你可以通过设置消费者的消费起始位置来实现从最新的消息开始消费。你可以使用DefaultMQPushConsumer的setConsumeFromWhere API来实现这一点。这个API有三个参数选项:CONSUME_FROM_MAX_OFFSET,CONSUME_FROM_FIRST_OFFSET和CONSUME_FROM_TIMESTAMP。
在RocketMQ中,可以通过设置消费者的消息消费模式,来从最新的消息开始消费。在集群模式下,可以通过设置消费者的“消费模式”参数,来实现从最新的消息开始消费。具体可以参考以下步骤:
在RocketMQ中,要做到从最新的消息开始消费,需要将Consumer的消费策略设置为CONSUME_FROM_LAST_OFFSET。
RocketMQ集群消费模式:
一个ConsumerGroup中的Consumer实例根据队列分配策略算法为Consumer分配队列,平均分摊(默认)消费消息。
一个消费队列会分配一个消费者,有且只有一个;一个消费者可能消费0到多个队列。
当某个消费实例不能正常提供服务后,所涉及的消息队列会自动派送给其他消费实例,会按照故障实例的消费进度继续消费。
broker上的消费进度并不会实时更新,消费实例会定时将消费的进度同步到broker上。
重复消息的情况是存在的。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/