开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):消息消费基本流程】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12379
消息消费基本流程
如何使用 Java 代码实现消息消费
消费消息的基本流程:
1. 创建消费者 Consumerl,制定消费者组名
2. 指定 Nameserver 地址
3. 订阅主题 Topic 和 Tag
4. 设置回调函数,处理消息
5. 启动消费者 consumer
接下来用代码实现:
public static void main(String[]args) throws Exception{
//1.创建消费者 consumer,制定消费者组名;
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer ( consumerGroup ."group
//2.指定 Name server 地址consumer. setNamesrvAddr ("192.168.P5.135:9876;192.168.25.138:9876");
//3.订阅主题 Topic 和 Tag
consumer, subscribe(topic,"base", sub Expression ""Tag 1");
//4.设置回调函数,处理消息consumer. registerMessageListener (newMessageListenerConcurrently ()(
//接受消息内容
public consumer concurrentlystatus consumer research(List(Wests) massiage Ext) mass, consumercientlycontext context)System. out. println(msg); return ConsumeConcurrentlyStatus .CONSUME SUCCESS;
}
});
//5.启动消费者 consumer.
consumer. start();
}
现在消息以及被成功消费,是由同步的消息发送者发送的,也有消息的内容,已经看到消息的消费了。
把消息遍历一下,遍历 msg,每一个 msg 可以获得一个数组,结果就会正常接收到。
负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处
理的消息不同
public static void main(string[]args) throws Exception{
//实例化消息生产者,指定组名
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer ("group 1");
//指定 Namesrv 地址信息.
consumer. setNamesrvAddr ("local host:9876");
//订阅 Topic
consumer, subscribe("Test","*");
//负载均衡模式消费
consumer. setMessageModel ( MessageModel , CLUSTERING );
//注册回调函数,处理消息
consumer. registerMessageListener (new MessageListenerConcurrently (){
@override
public Consumecon currentlyStatus consume Message(List< MessageExt >msgs,
ConsumeConcurrentlycontext context){
system, out, printf("%s Receive New Messages:%s%n",
Thread, currentThread ().getName(), msgs);
return ConsumeConcurrentlystatus .CONSUME SUCCESS;
}
//启动消息者
consumer, start();
system, out, printf("consumer started,%n");
}