开发者社区> 问答> 正文

C/C++ SDK集群方式订阅消息如何实现?


集群订阅即某个消费者集群只消费指定的Topic,而不是消费所有Topic。

  1. #include "ONSFactory.h"
  2. using namespace ons;
  3. // MyMsgListener:创建消费消息的实例
  4. //pushConsumer拉取到消息后,会主动调用该实例的consume 函数
  5. class MyMsgListener : public MessageListener
  6. {
  7.     public:
  8.         MyMsgListener()
  9.         {
  10.         }
  11.         virtual ~MyMsgListener()
  12.         {
  13.         }
  14.         virtual Action consume(Message &message, ConsumeContext &context)
  15.         {
  16.             //自定义消息处理细节
  17.             return CommitMessage; //CONSUME_SUCCESS;
  18.         }
  19. };
  20. int main(int argc, char* argv[])
  21. {
  22.     //pushConsumer创建和工作需要的参数,必须输入
  23.     ONSFactoryProperty factoryInfo;
  24.     factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//您在MQ控制台申请的consumerId
  25.     factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//您在MQ控制台申请的msg topic
  26.     factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
  27.     factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
  28.     //create pushConsumer
  29.     PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
  30.     //指定pushConsumer 订阅的消息topic和tag, 注册消息回调函数
  31.     MyMsgListener  msglistener;
  32.     pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
  33.     //start pushConsumer
  34.     pushConsumer->start();
  35.     //NOTE:直到不再接收消息,才能调用shutdown;调用shutdown之后,consumer退出,不能接收到任何消息
  36.     //销毁pushConsumer, 在应用退出前,必须销毁Consumer 对象,否则会导致内存泄露等问题
  37.     pushConsumer->shutdown();
  38.     return 0;
  39. }

展开
收起
猫饭先生 2017-10-27 10:03:31 2041 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
一个跨平台的云服务SDK需要什么 立即下载
使用C++11开发PHP7扩展 立即下载
GPON Class C++ SFP O;T Transce 立即下载