集群订阅即某个消费者集群只消费指定的Topic,而不是消费所有Topic。
- #include "ONSFactory.h"
- using namespace ons;
- // MyMsgListener:创建消费消息的实例
- //pushConsumer拉取到消息后,会主动调用该实例的consume 函数
- class MyMsgListener : public MessageListener
- {
- public:
- MyMsgListener()
- {
- }
- virtual ~MyMsgListener()
- {
- }
- virtual Action consume(Message &message, ConsumeContext &context)
- {
- //自定义消息处理细节
- return CommitMessage; //CONSUME_SUCCESS;
- }
- };
- int main(int argc, char* argv[])
- {
- //pushConsumer创建和工作需要的参数,必须输入
- ONSFactoryProperty factoryInfo;
- factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//您在MQ控制台申请的consumerId
- factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//您在MQ控制台申请的msg topic
- factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
- factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");//阿里云身份验证,在阿里云服务器管理控制台创建
- //create pushConsumer
- PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
- //指定pushConsumer 订阅的消息topic和tag, 注册消息回调函数
- MyMsgListener msglistener;
- pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
- //start pushConsumer
- pushConsumer->start();
- //NOTE:直到不再接收消息,才能调用shutdown;调用shutdown之后,consumer退出,不能接收到任何消息
- //销毁pushConsumer, 在应用退出前,必须销毁Consumer 对象,否则会导致内存泄露等问题
- pushConsumer->shutdown();
- return 0;
- }