RocketMQ使用教程相关系列 目录
目录
第一节:介绍
顺序消息含义介绍
原理解析
第二节:顺序消息-生产者和消息者步骤说明
顺序消息生产者代码实现步骤
顺序消息消费者代码实现步骤
第三节:顺序消息生产者
效果:
第四节:顺序消息消费者
效果:
第一节:介绍
顺序消息含义介绍
顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
原理解析
在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);
而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
第二节:顺序消息-生产者和消息者步骤说明
顺序消息生产者代码实现步骤
1.创建消息生产者producer并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息,选择的send方法有三个参数:
* 参数一:消息对象
* 参数二:消息队列的选择器
* 参数三:选择队列的业务标识 6.关闭生产者producer
顺序消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:顺序消息生产者
public class Producer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group"); // 2.指定Nameserver地址 producer.setNamesrvAddr("192.168.88.131:9876"); // 3.启动producer producer.start(); System.out.println("生产者启动"); for (int i = 0; i < 20; i++) { // 4.创建消息对象,指定主题Topic、Tag和消息体 Message msg = new Message("Topic_order_demo", "Tag_order_demo", ("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 5.发送消息 /** * 参数一:消息对象 * 参数二:消息队列的选择器 * 参数三:选择队列的业务标识 */ SendResult result = producer.send(msg, new MessageQueueSelector() { /** * * @param mqs:队列集合 * @param msg:消息对象 * @param arg:业务标识的参数 * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer index = (Integer) arg; return mqs.get(index); } }, 1); System.out.println("发送结果:" + msg.toString()); } // 6.关闭生产者producer producer.shutdown(); System.out.println("生产者关闭"); } }
public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); //消息拉取最大条数 consumer.setConsumeMessageBatchMaxSize(2); //3.订阅主题Topic和Tag consumer.subscribe("Topic_order_demo", "*"); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerOrderly() { //接受消息内容 @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : msgs) { try { //获取主题 String topic = msg.getTopic(); //获取标签 String tags = msg.getTags(); //获取信息 byte[] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+ ",result"+ result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); //重试 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }); //5.启动消费者consumer consumer.start(); } }