开发者社区 > 云原生 > 云消息队列 > 正文

c#测试消息队列RocketMQ不成功

c#测试消息队列RocketMQ版时,编译到ONSClient4CPPPINVOKE.ONSFactory_getInstance()出错。参数如下所示: Ons_Topic = "anxxxx"; Ons_AccessKey = "LTAIxxxxxxxguE"; Ons_SecretKey = "LwxxxxxxxxxxxxxxxxcIQ"; private static string Ons_GroupId = "GID_xxx"; private static string Ons_NameSrv = "http://xxxxxx.mq-internet-access.mq-internet.aliyuncs.com:80";

应该是什么原因造成的?

展开
收起
mudye 2021-02-18 12:28:32 1038 0
1 条回答
写回答
取消 提交回答
  • 下一站是幸福

    一般的,我们在RocketMQ处理消息的时候,可能会在消费者中使用类似下面的代码

    consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });

    1
    2
    3
    4
    5
    6
    7
    8
    

    如果消息被成功消费的话,会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态,但是如果消息消费失败的话,又会怎么处理呢?

    其实我们只要找到ConsumeConcurrentlyStatus这个枚举就能知道RocketMQ是如何处理了,代码如下:

    public enum ConsumeConcurrentlyStatus { /** * Success consumption / CONSUME_SUCCESS, /* * Failure consumption,later try to consume */ RECONSUME_LATER; }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    很明显,如果无法返回CONSUME_SUCCESS状态,那么就返回RECONSUME_LATER,过一会再尝试消费即可。那么第二个问题来了,既然这条消息消费失败了,总不能一直卡着后面的消息也等着吧,那么消费失败的消息肯定需要放到另一个Topic中,让它一个人等着被再次消费

    所以这时会有一个重试队列,用于暂时保存因为各种异常而导致Consumer端无法消费的消息,重试队列的名称是在原队列的名称前加上%RETRY%(这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的) RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中

    原文链接:https://blog.csdn.net/LO_YUN/article/details/104301740

    2021-04-02 22:04:10
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    移动互联网测试到质量的转变 立即下载
    给ITer的技术实战进阶课-阿里CIO学院独家教材(四) 立即下载
    F2etest — 多浏览器兼容性测试整体解决方案 立即下载