广播消息
广播消息概述
广播消息就是向所有用户发送消息。 如果我们希望所有订阅者都能收到有关某个主题的消息,可以使用广播消息。
举个例子 生产者发送10条消息,有2个订阅者,则这两个订阅者会分别收到10条消息, 而与广播模式相对应的集群模式这是 2个订阅者一共收到10条消息。
Rocketmq 消费者默认是集群的方式消费的,使用广播模式进行消费需要显示设置
核心:消费端设置消息模型 consumer.setMessageModel(MessageModel.BROADCASTING);
演示步骤
- 启动2个或者2个以上的消费者
- 启动生产者发送消息
- 观察2个消费者的消息接收情况 :两个Consumer收到了同样的消息,OK.
生产者:
package com.artisan.rocketmq.broadcast; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 19:22 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class BroadcastProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group"); producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); producer.start(); for (int i = 0; i < 4; i++){ Message msg = new Message("TopicTest", "TagA", "OrderID188", ("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
消费者:
package com.artisan.rocketmq.broadcast; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 19:27 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class BroadcastConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group"); consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //广播,全量消费 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt ext : msgs){ System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }
测试结果:
生产者:
消费者1:
消费者2:
延时消息
概述
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
使用场景
举个例子: 电商系统,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
延时机制
org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel
当前支持的延迟时间
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
分别对应级别
1 2 3....................
设置消息时延
Message message = new Message; message.setDelayTimeLevel(3)
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。
实现原理
延迟队列的核心思路: 【利用中间队列临时存储】—>所有的延迟消息由producer消息发憷之后,都会存放在一个topic下 (SHCEDULE_TOPIC_XXXX), 不同的延迟级别对应不同的队列序号,当延迟时间到了之后,由定时线程读取转换为普通的消息存到真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。
示例
生产者:
package com.artisan.rocketmq.schedule; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.Date; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 17:23 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer"); producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); producer.start(); int totalMessagesToSend = 3; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); //延时消费 6-->2分钟 message.setDelayTimeLevel(6); // Send the message producer.send(message); } System.out.printf("message send is completed .%n" + new Date()); producer.shutdown(); } }
消费者:
package com.artisan.rocketmq.schedule; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.Date; import java.util.List; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 17:23 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); consumer.subscribe("TestTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println(new Date() + "Receive message[msgId=" + message.getMsgId() + "] " + "message content is :" + new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); //System.out.printf("Consumer Started.%n"); } }
设置的延迟level为6 ,对应的时间间隔是两分钟,OK。
批量消息
批量消息概述
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息
此外,这一批消息的总大小不应超过4MB。rocketmq建议每次批量消息大小大概在1MB。当消息大小超过4MB时,需要将消息进行分割
示例
生产者
package com.artisan.rocketmq.batch; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 21:27 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class BatchProducer { public static void main(String[] args) throws Exception { /** * rocketMq 支持消息批量发送 * 同一批次的消息应具有:相同的主题,相同的waitStoreMsgOK,并且不支持定时任务。 * <strong> 同一批次消息建议大小不超过~1M </strong>,消息最大不能超过4M,需要 * 对msg进行拆分 */ DefaultMQProducer producer = new DefaultMQProducer("batch_group"); producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); producer.start(); String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); ListSplitter splitter = new ListSplitter(messages); /** * 对批量消息进行拆分 */ while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
消息拆分
package com.artisan.rocketmq.batch; import org.apache.rocketmq.common.message.Message; import java.util.Iterator; import java.util.List; import java.util.Map; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 21:35 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; //遍历消息准备拆分 for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //for log overhead if (tmpSize > SIZE_LIMIT) { if (nextIndex - currIndex == 0) { nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } }
消费者
package com.artisan.rocketmq.batch; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author 小工匠 * @version v1.0 * @create 2019-11-10 21:38 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group"); consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); consumer.subscribe("BatchTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs){ System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
代码
请移步: https://github.com/yangshangwei/rocketmqMaster