RocketMQ使用教程相关系列 目录
目录
第一节:介绍
限制:
消费者消费模式
第二节:广播消息-生产者和消息者步骤说明
广播消息生产者代码实现步骤
广播消息消费者代码实现步骤
第三节:广播消息生产者
效果:
第四节:广播消息消费者A
效果:
第五节:广播消息消费者B
效果:
第六节:总结
第一节:介绍
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
限制:
不支持顺序消息
消费者消费模式
消费者的消费模式分为两种
负载均衡模式:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
第二节:广播消息-生产者和消息者步骤说明
广播消息生产者代码实现步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象集合,指定主题Topic、Tag和消息体
5.发送集合消息
6.关闭生产者producer
注:与批量消息的生产者代码一模一样
广播消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.默认均衡轮询消费模式 改为广播模式
4.订阅主题Topic和Tag
5.设置回调函数,处理消息
6.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:广播消息生产者
public class Producer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_broadcasting_group"); // 2.指定Nameserver地址 producer.setNamesrvAddr("192.168.88.131:9876"); // 3.启动producer producer.start(); System.out.println("生产者启动"); List<Message> msgs = new ArrayList<Message>(); // 4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ for (int i = 0; i < 20; i++) { Message msg = new Message("Topic_broadcasting_demo", "Tag_broadcasting_demo", ("Hello 虚竹,这是广播消息" + i).getBytes()); msgs.add(msg); } // 5.发送消息 SendResult result = producer.send(msgs); // 发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); // 线程睡1秒 TimeUnit.SECONDS.sleep(1); // 6.关闭生产者producer producer.shutdown(); System.out.println("生产者关闭"); } }
public class ConsumerA { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 默认均衡轮询消费模式 改为广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); // 3.订阅主题Topic和Tag consumer.subscribe("Topic_broadcasting_demo", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println( "A----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); System.out.println("消费者启动"); } }
第五节:广播消息消费者B
public class ConsumerB { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 默认均衡轮询消费模式 改为广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); // 3.订阅主题Topic和Tag consumer.subscribe("Topic_broadcasting_demo", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println( "B----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); System.out.println("消费者启动"); } }
第六节:总结
消费模式默认为负载均衡模式
修改消费模式的方法是: consumer.setMessageModel();
设置为广播模式:consumer.setMessageModel(MessageModel.BROADCASTING);
设置负载均衡模式:consumer.setMessageModel(MessageModel.CLUSTERING);
参考:
顺序消息是否支持集群消费和广播消费?
https://help.aliyun.com/knowledge_detail/54357.html