开发者社区 > 云原生 > 云消息队列 > 正文

消息队列 MQ 如何订阅消息


消息发送成功后,需要启动订阅方进行消息订阅。本文以 TCP Java SDK 为例,介绍如何通过调用相关协议及开发语言的 SDK/API 来完成消息订阅。
TCP 接入点域名,请 前往查看。

调用 TCP Java SDK 订阅消息


您可以运行以下示例代码来启动订阅端,并测试订阅消息的功能。请按照说明正确设置相关参数。目前控制台提供了 Java,C++, .NET,PHP 的示例代码。

  1. public class ConsumerTest {
  2.     public static void main(String[] args) {
  3.         Properties properties = new Properties();
  4.         // 您在MQ控制台创建的Consumer ID
  5.         properties.put(PropertyKeyConst.ConsumerId, "XXX");
  6.         // 鉴权用AccessKey,在阿里云服务器管理控制台创建
  7.         properties.put(PropertyKeyConst.AccessKey, "XXX");
  8.         // 鉴权用SecretKey,在阿里云服务器管理控制台创建
  9.         properties.put(PropertyKeyConst.SecretKey, "XXX");
  10.         // 设置 TCP 接入域名(此处以公共云公网环境接入为例)
  11.         properties.put(PropertyKeyConst.ONSAddr,
  12.           "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
  13.         Consumer consumer = ONSFactory.createConsumer(properties);
  14.         consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
  15.             public Action consume(Message message, ConsumeContext context) {
  16.                 System.out.println("Receive: " + message);
  17.                 return Action.CommitMessage;
  18.             }
  19.         });
  20.         consumer.start();
  21.         System.out.println("Consumer Started");
  22.     }
  23. }


查看消息订阅是否成功


完成上述步骤后,您可以在 MQ 控制台查看订阅端是否启动成功,即消息订阅是否成功。目前控制台查看消费者状态仅支持 TCP 客户端,暂不支持 HTTP 以及 MQTT 客户端。

  1. 在 MQ 控制台左侧菜单栏依次单击发布订阅管理>订阅管理。

  2. 找到要查看的 Topic,单击右侧操作选项里的消费者状态。 如果是否在线显示为是,则说明订阅端已成功启动。
    如果消费者状态是否在线显示为否,请参考消费者状态问题排查

完成以上所有步骤后,您就成功接入了 MQ 服务,可以用 MQ 进行消息发送和订阅了。

展开
收起
猫饭先生 2017-10-26 13:47:43 2460 0
0 条回答
写回答
取消 提交回答

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    企业互联网架构之消息队列 立即下载
    基于消息队列RocketMQ的大型分布式应用上云最佳实践 立即下载
    云原生消息队列Apache RocketMQ 立即下载