RocketMQ4.9.x版本中,defaultMQPushConsumer.shutdown()后再start(),消费组没连上收不到消息,有遇到过这样问题的吗?
是的,我曾经遇到过类似的问题。这可能是由于 RocketMQ 4.9.x 版本中的 bug 导致的。
在 RocketMQ 4.9.x 版本中,默认情况下,如果调用 shutdown()
方法关闭了消费组,则无法再次调用 start()
方法重新连接并接收消息。这是因为 shutdown()
方法会断开与 RocketMQ 服务器的所有连接,并释放所有的资源,包括队列和主题等。因此,在关闭消费组之后,即使再次调用 start()
方法也无法恢复到以前的状态。
为了解决这个问题,建议在每次关闭消费组之前,先使用 suspend()
方法暂停消费,然后再使用 resume()
方法恢复消费。这样可以在不完全关闭消费组的情况下停止接收消息,并在需要的时候重新开始消费。
此外,还有一种解决方案是在关闭消费组之后,重新创建一个新的消费组对象来代替原来的消费组。但是这种方法需要重新订阅所有的话题,并且在生产环境中可能不太适合,因为会增加额外的操作复杂性和系统负载。
有可能的原因如下:
在 RocketMQ 中,默认情况下,当 DefaultMQPushConsumer
对象调用 shutdown()
方法时,它会取消所有待处理的消息,从而使消费者退出消息队列。
如果您希望再次启动消费者,请重新创建 DefaultMQPushConsumer
对象,并使用 start()
方法初始化新的连接。您可以使用新的对象重新订阅主题,并重新注册监听器,如:
// 关闭原消费者的监听器
defaultMQPushConsumer.shutdown();
// 新建一个消费者的实例
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group");
// 启动新的消费者
defaultMQPushConsumer.start();
// 订阅新的话题
defaultMQPushConsumer.subscribe(topic, "*");
// 注册新的监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently());
// 关闭原来的消费者
defaultMQPushConsumer.shutdown();
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/