开发者社区 > 云原生 > 消息队列 > 正文

c#测试消息队列RocketMQ不成功

c#测试消息队列RocketMQ版时,编译到ONSClient4CPPPINVOKE.ONSFactory_getInstance()出错。参数如下所示: Ons_Topic = "anxxxx"; Ons_AccessKey = "LTAIxxxxxxxguE"; Ons_SecretKey = "LwxxxxxxxxxxxxxxxxcIQ"; private static string Ons_GroupId = "GID_xxx"; private static string Ons_NameSrv = "http://xxxxxx.mq-internet-access.mq-internet.aliyuncs.com:80";

应该是什么原因造成的?

展开
收起
mudye 2021-02-18 12:28:32 985 0
1 条回答
写回答
取消 提交回答
  • 下一站是幸福

    一般的,我们在RocketMQ处理消息的时候,可能会在消费者中使用类似下面的代码

    consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });

    1
    2
    3
    4
    5
    6
    7
    8
    

    如果消息被成功消费的话,会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态,但是如果消息消费失败的话,又会怎么处理呢?

    其实我们只要找到ConsumeConcurrentlyStatus这个枚举就能知道RocketMQ是如何处理了,代码如下:

    public enum ConsumeConcurrentlyStatus { /** * Success consumption / CONSUME_SUCCESS, /* * Failure consumption,later try to consume */ RECONSUME_LATER; }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    很明显,如果无法返回CONSUME_SUCCESS状态,那么就返回RECONSUME_LATER,过一会再尝试消费即可。那么第二个问题来了,既然这条消息消费失败了,总不能一直卡着后面的消息也等着吧,那么消费失败的消息肯定需要放到另一个Topic中,让它一个人等着被再次消费

    所以这时会有一个重试队列,用于暂时保存因为各种异常而导致Consumer端无法消费的消息,重试队列的名称是在原队列的名称前加上%RETRY%(这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的) RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中

    原文链接:https://blog.csdn.net/LO_YUN/article/details/104301740

    2021-04-02 22:04:10
    赞同 展开评论 打赏

多个子产品线联合打造金融级高可用消息服务以及对物联网的原生支持,覆盖多行业。

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    行业实践:RocketMQ 业务集成典型行业应用和实践 立即下载
    技术揭秘:RocketMQ 5.0 云原生架构升级之路 立即下载
    RocketMQ Summit 2022 开源生态发展 立即下载