RocketMQ使用教程相关系列 目录
目录
第一节:项目准备
第二节:普通消息-生产者和消息者步骤说明
普通消息生产者代码实现步骤
普通消息消费者代码实现步骤
第三节:普通消息-同步消息
生产者
效果:
消费者
效果:
第四节:普通消息-异步消息
生产者
效果:
消费者:
效果:
分析:
第五节:普通消息-单向消息
生产者
效果:
消费者
效果:
分析:
第六节:总结
第一节:项目准备
rocketmq-client依赖
第二节:普通消息-生产者和消息者步骤说明
普通消息生产者代码实现步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象 Message,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
普通消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:普通消息-同步消息
同步消息这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
生产者
消费者
public class ConsumerSync { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 消息拉取最大条数 consumer.setConsumeMessageBatchMaxSize(2); // 3.订阅主题Topic和Tag consumer.subscribe("Topic_demo_sync", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 获取主题 String topic = msg.getTopic(); // 获取标签 String tags = msg.getTags(); // 获取信息 byte[] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); } }1.public class ConsumerSync { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 消息拉取最大条数 consumer.setConsumeMessageBatchMaxSize(2); // 3.订阅主题Topic和Tag consumer.subscribe("Topic_demo_sync", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 获取主题 String topic = msg.getTopic(); // 获取标签 String tags = msg.getTags(); // 获取信息 byte[] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); } }
消费者:
1.public class ConsumerASync { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 消息拉取最大条数 consumer.setConsumeMessageBatchMaxSize(2); // 3.订阅主题Topic和Tag consumer.subscribe("Topic_demo_async", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 获取主题 String topic = msg.getTopic(); // 获取标签 String tags = msg.getTags(); // 获取信息 byte[] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); } }
分析:
从消费者代码中可以看出:异步消费者的实现方式和同步消费者实现方式并无区别。
从消费者的结果来看:可以看出异步消费者接收的结果是无序的。
第五节:普通消息-单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
生产者
public class Producer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group"); // 2.指定Nameserver地址 producer.setNamesrvAddr("192.168.88.131:9876"); // 3.启动producer producer.start(); System.out.println("生产者启动"); for (int i = 0; i < 20; i++) { // 4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("Topic_demo", "Tag_demo", ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 5.发送单向消息 producer.sendOneway(msg); System.out.println("发送结果:" + msg); // 线程睡1秒 } // 6.关闭生产者producer producer.shutdown(); System.out.println("生产者关闭"); } }
消费者
1.public class Consumer { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 消息拉取最大条数 consumer.setConsumeMessageBatchMaxSize(2); // 3.订阅主题Topic和Tag consumer.subscribe("Topic_demo", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 获取主题 String topic = msg.getTopic(); // 获取标签 String tags = msg.getTags(); // 获取信息 byte[] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); } }
分析:
从消费结果来看,消费的顺序是无序的
第六节:总结
同步消息、异步消息和单向消息的消费者实现方式是一样的。
同步消息、异步消息和单向消息的区别在于消息的发送方。
异步消息生产者没有返回值,需要使用 SendCallback 接收异步返回结果的回调。
异步消息生产者,在关闭实例之前,建议进行休眠。
单向消息也是没有返回值的,并且它的消费者也是无序消费。
单向消息和异步消息的区别是单向消息不需要 SendCallback 来接收异步返回结果的回调。