开发者社区> 问答> 正文

Java SDK如何订阅消息


本文介绍如何通过 MQ SDK 进行消息订阅。
请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考 订阅关系一致文档。
TCP 接入点域名,请 前往查看。
MQ 支持两种订阅方式。


  • 集群订阅:同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

  • 广播订阅:同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。
    1. // 广播订阅方式设置
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);


示例代码

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10.     public static void main(String[] args) {
  11.         Properties properties = new Properties();
  12.         // 您在控制台创建的 Consumer ID
  13.         properties.put(PropertyKeyConst.ConsumerId, "XXX");
  14.         // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  15.         properties.put(PropertyKeyConst.AccessKey, "XXX");
  16.         // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  17.         properties.put(PropertyKeyConst.SecretKey, "XXX");
  18.         // 设置 TCP 接入域名(此处以公共云生产环境为例)
  19.         properties.put(PropertyKeyConst.ONSAddr,
  20.           "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  21.           // 集群订阅方式 (默认)
  22.           // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  23.           // 广播订阅方式
  24.           // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
  25.         Consumer consumer = ONSFactory.createConsumer(properties);
  26.         consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag
  27.             public Action consume(Message message, ConsumeContext context) {
  28.                 System.out.println("Receive: " + message);
  29.                 return Action.CommitMessage;
  30.             }
  31.         });
  32.         //订阅另外一个Topic
  33.         consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag
  34.             public Action consume(Message message, ConsumeContext context) {
  35.                 System.out.println("Receive: " + message);
  36.                 return Action.CommitMessage;
  37.             }
  38.         });
  39.         consumer.start();
  40.         System.out.println("Consumer Started");
  41.     }
  42. }

注意:广播消费模式下,控制台无法设置消息堆积报警,无法进行消息堆积查询。因此,也可以创建多个 Consumer ID 来达到广播模式的效果。详情请参考文档 多个 Consumer ID 模式

展开
收起
猫饭先生 2017-10-26 14:06:18 2053 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
The Reactive Cloud Native Arch 立即下载
JAVA开发手册1.5.0 立即下载