一.延时队列
定时队列 -它们要在规定的时间之后才能传递
1.修改调用类即可
1. @RestController 2. @RequestMapping("/test") 3. public class TestControllor { 4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 5. 6. /** 7. * 使用RocketMq的生产者 8. */ 9. @Resource(name = "customRocketMQProducer") 10. private DefaultMQProducer defaultMQProducer; 11. 12. @RequestMapping("/send") 13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 14. for (int i = 0; i < 100; i++) { 15. final int index = i; 16. String msg = "demo msg test"; 17. logger.info("开始发送消息:" + msg); 18. Message sendMsg = new Message("DemoTopic", "TagA", String.valueOf(i), msg.getBytes()); 19. //预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h 20. //设置延时时间 3即为10s 21. sendMsg.setDelayTimeLevel(3); 22. //默认3秒超时 23. SendResult sendResult = defaultMQProducer.send(sendMsg); 24. logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次"); 25. } 26. } 27. }
二.批处理
成批发送消息提高了传递小消息的性能。
使用限制
同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,并且不支持调度。
此外,一批消息的总大小不应超过1MiB。
如何使用批处理
如果一次只发送不超过1MiB的消息,则很容易使用批处理:
1. @RestController 2. @RequestMapping("/test") 3. public class TestControllor { 4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 5. 6. /** 7. * 使用RocketMq的生产者 8. */ 9. @Resource(name = "customRocketMQProducer") 10. private DefaultMQProducer defaultMQProducer; 11. 12. @RequestMapping("/send") 13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 14. //for (int i = 0; i < 100; i++) { 15. //final int index = i; 16. String msg = "demo msg test"; 17. logger.info("开始发送消息:" + msg); 18. List<Message> messages = new ArrayList<>(); 19. messages.add(new Message("DemoTopic", "TagA", "OrderID001", "Hello world 0".getBytes())); 20. messages.add(new Message("DemoTopic", "TagA", "OrderID002", "Hello world 1".getBytes())); 21. messages.add(new Message("DemoTopic", "TagA", "OrderID003", "Hello world 2".getBytes())); 22. //默认3秒超时 23. SendResult sendResult = defaultMQProducer.send(messages); 24. } 25. }
条件过滤
在使用时需要开启条件过滤启动配置 否则会报错
The broker does not support consumer to filter message by SQL92
修改broker.conf文件 加入
enablePropertyFilter = true
同时启动broker 命令为
The broker does not support consumer to filter message by SQL92
筛选固定条件消息
1. 数值比较:>, >=, <, <=, BETWEEN, =; 2. 字符比较 =, <>, IN; 3. IS NULL or IS NOT NULL; 4. 逻辑:AND, OR, NOT;
使用限制:
只有push consumer可以通过SQL92选择消息。接口是:
public void subscribe(final String topic, final MessageSelector messageSelector)
代码示例
调用方法
1. @RestController 2. @RequestMapping("/test") 3. public class TestControllor { 4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 5. 6. /** 7. * 使用RocketMq的生产者 8. */ 9. @Resource(name = "customRocketMQProducer") 10. private DefaultMQProducer defaultMQProducer; 11. 12. @RequestMapping("/send") 13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 14. for (int i = 0; i < 100; i++) { 15. final int index = i; 16. String msg = "demo msg test"; 17. logger.info("开始发送消息:" + msg); 18. Message sendMsg = new Message("DemoTopic", 19. "DemoTag", 20. ("Hello RocketMQ " + i).getBytes() 21. ); 22. //消费者根据a进行过滤 23. sendMsg.putUserProperty("a", String.valueOf(i)); 24. SendResult sendResult = defaultMQProducer.send(sendMsg); 25. logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次"); 26. } 27. } 28. }
消费者
1. @SpringBootConfiguration 2. public class MQConsumerConfiguration { 3. 4. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); 5. @Value("${rocketmq.consumer.namesrvAddr}") 6. private String namesrvAddr; 7. @Value("${rocketmq.consumer.groupName}") 8. private String groupName; 9. @Value("${rocketmq.consumer.consumeThreadMin}") 10. private int consumeThreadMin; 11. @Value("${rocketmq.consumer.consumeThreadMax}") 12. private int consumeThreadMax; 13. @Value("${rocketmq.consumer.topics}") 14. private String topics; 15. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") 16. private int consumeMessageBatchMaxSize; 17. @Autowired 18. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; 19. 20. @Bean 21. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception { 22. if (StringUtils.isEmpty(groupName)){ 23. throw new Exception("groupName is null !!!"); 24. } 25. if (StringUtils.isEmpty(namesrvAddr)){ 26. throw new Exception("namesrvAddr is null !!!"); 27. } 28. if(StringUtils.isEmpty(topics)){ 29. throw new Exception("topics is null !!!"); 30. } 31. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); 32. consumer.setNamesrvAddr(namesrvAddr); 33. consumer.setConsumeThreadMin(consumeThreadMin); 34. consumer.setConsumeThreadMax(consumeThreadMax); 35. consumer.registerMessageListener(mqMessageListenerProcessor); 36. 37. /** 38. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 39. * 如果非第一次启动,那么按照上次消费的位置继续消费 40. */ 41. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); 42. /** 43. * 设置消费模型,集群还是广播,默认为集群 44. */ 45. consumer.setMessageModel(MessageModel.CLUSTERING); 46. /** 47. * 设置一次消费消息的条数,默认为1条 48. */ 49. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); 50. 51. try { 52. //根据条件过滤消息 53. consumer.subscribe("DemoTopic", MessageSelector.bySql("a between 0 and 3")); 54. consumer.start(); 55. LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); 56. }catch (MQClientException e){ 57. LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); 58. throw new Exception(e); 59. } 60. return consumer; 61. } 62. }
日志配置
1. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ALL 2. 3. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ERROR