开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):消费者广播模式和负载均衡模式】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12380
消费者广播模式和负载均衡模式
主要内容
1. 负载均衡模式
2. 广播模式
现在消费消息一共有两种模式,分别是广播模式和负载均衡模式,那么这两个的区别就用图来表示一下
负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处
理的消息不同
public static void main(string[]args) throws Exception{
//实例化消息生产者,指定组名
DefaultMQPushConsumer consumer=newDefaultMQPushConsumer ("group 1");
//指定Namesrv地址信息.
consumer. setNamesrvAddr ("local host:9876");
//订阅Topic
consumer, subscribe("Test","*");
//负载均衡模式消费
consumer. setMessageModel ( MessageModel , CLUSTERING );
//注册回调函数,处理消息
consumer. registerMessageListener (new MessageListenerConcurrently (){
@override
public Consumecon currentlyStatus consume Message(List< MessageExt >msgs,
ConsumeConcurrentlycontext context){
system, out, printf("%s Receive New Messages:%s%n",
Thread, currentThread ().getName(), msgs);
return ConsumeConcurrentlystatus .CONSUME SUCCESS;
}
//启动消息者
consumer, start();
system, out, printf("consumer started,%n");
}
广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
public static void main(string[]args) throws Exception{
//实例化消息生产者,指定组名
DefaultMQPushConsumer consumer=newDefaultMqPushConsumer ("group 1"):
//指定 Namesrv 地址信息.
consumer, setNamesrvAddr ("local host:9876");
//订阅Topic
consumer. subscribe("Test","*");
//广播模式消费
consumer. setMessageMode ]( MessageMode ]. BROADCASTING );
//注册回调函数,处理消息
consumer. registerMessageListener (new Message Listenerconcurrently (){
@override
public ConsumeConcurrentlyStatus consume Message(List< MessageExt >msgs,
ConsumeConcurrentlyContext context){
system, out. printf("%s Receive New Messages:%s%n",
Thread currentThread (),getName(), msgs);
return Consume ConcurrentlyStatus ,CONSUME SUCCESS;
}
}) ;
//启动消息者
consumer, start();
system, out, printf("Consumer started.%n");