c#测试消息队列RocketMQ版时,编译到ONSClient4CPPPINVOKE.ONSFactory_getInstance()出错。参数如下所示: Ons_Topic = "anxxxx"; Ons_AccessKey = "LTAIxxxxxxxguE"; Ons_SecretKey = "LwxxxxxxxxxxxxxxxxcIQ"; private static string Ons_GroupId = "GID_xxx"; private static string Ons_NameSrv = "http://xxxxxx.mq-internet-access.mq-internet.aliyuncs.com:80";
应该是什么原因造成的?
一般的,我们在RocketMQ处理消息的时候,可能会在消费者中使用类似下面的代码
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
1
2
3
4
5
6
7
8
如果消息被成功消费的话,会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态,但是如果消息消费失败的话,又会怎么处理呢?
其实我们只要找到ConsumeConcurrentlyStatus这个枚举就能知道RocketMQ是如何处理了,代码如下:
public enum ConsumeConcurrentlyStatus { /** * Success consumption / CONSUME_SUCCESS, /* * Failure consumption,later try to consume */ RECONSUME_LATER; }
1
2
3
4
5
6
7
8
9
10
很明显,如果无法返回CONSUME_SUCCESS状态,那么就返回RECONSUME_LATER,过一会再尝试消费即可。那么第二个问题来了,既然这条消息消费失败了,总不能一直卡着后面的消息也等着吧,那么消费失败的消息肯定需要放到另一个Topic中,让它一个人等着被再次消费
所以这时会有一个重试队列,用于暂时保存因为各种异常而导致Consumer端无法消费的消息,重试队列的名称是在原队列的名称前加上%RETRY%(这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的) RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中
原文链接:https://blog.csdn.net/LO_YUN/article/details/104301740
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/