五分钟带你玩转rocketMQ(七)吐血总结延时队列,批处理,条件过滤与日志配置

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 五分钟带你玩转rocketMQ(七)吐血总结延时队列,批处理,条件过滤与日志配置


一.延时队列

定时队列 -它们要在规定的时间之后才能传递

1.修改调用类即可

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Resource(name = "customRocketMQProducer")
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. for (int i = 0; i < 100; i++) {
15. final int index = i;
16. String msg = "demo msg test";
17.             logger.info("开始发送消息:" + msg);
18. Message sendMsg = new Message("DemoTopic", "TagA", String.valueOf(i), msg.getBytes());
19. //预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
20. //设置延时时间 3即为10s
21.             sendMsg.setDelayTimeLevel(3);
22. //默认3秒超时
23. SendResult sendResult = defaultMQProducer.send(sendMsg);
24.             logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
25.         }
26.     }
27. }

二.批处理

成批发送消息提高了传递小消息的性能。

使用限制

同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,并且不支持调度。

此外,一批消息的总大小不应超过1MiB。

如何使用批处理

如果一次只发送不超过1MiB的消息,则很容易使用批处理:

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Resource(name = "customRocketMQProducer")
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. //for (int i = 0; i < 100; i++) {
15. //final int index = i;
16. String msg = "demo msg test";
17.         logger.info("开始发送消息:" + msg);
18.         List<Message> messages = new ArrayList<>();
19.         messages.add(new Message("DemoTopic", "TagA", "OrderID001", "Hello world 0".getBytes()));
20.         messages.add(new Message("DemoTopic", "TagA", "OrderID002", "Hello world 1".getBytes()));
21.         messages.add(new Message("DemoTopic", "TagA", "OrderID003", "Hello world 2".getBytes()));
22. //默认3秒超时
23. SendResult sendResult = defaultMQProducer.send(messages);
24.     }
25. }

条件过滤

在使用时需要开启条件过滤启动配置 否则会报错

The broker does not support consumer to filter message by SQL92

修改broker.conf文件 加入

enablePropertyFilter = true

image.png

同时启动broker 命令为

The broker does not support consumer to filter message by SQL92

筛选固定条件消息

1. 数值比较:>, >=, <, <=, BETWEEN, =;
2. 字符比较  =, <>, IN;
3. IS NULL or IS NOT NULL;
4. 逻辑:AND, OR, NOT;

使用限制:

只有push consumer可以通过SQL92选择消息。接口是:

public void subscribe(final String topic, final MessageSelector messageSelector)

代码示例

调用方法

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Resource(name = "customRocketMQProducer")
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. for (int i = 0; i < 100; i++) {
15. final int index = i;
16. String msg = "demo msg test";
17.             logger.info("开始发送消息:" + msg);
18. Message sendMsg = new Message("DemoTopic",
19. "DemoTag",
20.                     ("Hello RocketMQ " + i).getBytes()
21.             );
22. //消费者根据a进行过滤
23.             sendMsg.putUserProperty("a", String.valueOf(i));
24. SendResult sendResult = defaultMQProducer.send(sendMsg);
25.             logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
26.         }
27.     }
28. }

消费者

1. @SpringBootConfiguration
2. public class MQConsumerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
5. @Value("${rocketmq.consumer.namesrvAddr}")
6. private String namesrvAddr;
7. @Value("${rocketmq.consumer.groupName}")
8. private String groupName;
9. @Value("${rocketmq.consumer.consumeThreadMin}")
10. private int consumeThreadMin;
11. @Value("${rocketmq.consumer.consumeThreadMax}")
12. private int consumeThreadMax;
13. @Value("${rocketmq.consumer.topics}")
14. private String topics;
15. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
16. private int consumeMessageBatchMaxSize;
17. @Autowired
18. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
19. 
20. @Bean
21. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
22. if (StringUtils.isEmpty(groupName)){
23. throw new Exception("groupName is null !!!");
24.         }
25. if (StringUtils.isEmpty(namesrvAddr)){
26. throw new Exception("namesrvAddr is null !!!");
27.         }
28. if(StringUtils.isEmpty(topics)){
29. throw new Exception("topics is null !!!");
30.         }
31.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
32.         consumer.setNamesrvAddr(namesrvAddr);
33.         consumer.setConsumeThreadMin(consumeThreadMin);
34.         consumer.setConsumeThreadMax(consumeThreadMax);
35.         consumer.registerMessageListener(mqMessageListenerProcessor);
36. 
37. /**
38.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
39.          * 如果非第一次启动,那么按照上次消费的位置继续消费
40.          */
41.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
42. /**
43.          * 设置消费模型,集群还是广播,默认为集群
44.          */
45.         consumer.setMessageModel(MessageModel.CLUSTERING);
46. /**
47.          * 设置一次消费消息的条数,默认为1条
48.          */
49.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
50. 
51. try {           
52. //根据条件过滤消息
53.             consumer.subscribe("DemoTopic", MessageSelector.bySql("a between 0 and 3"));
54.             consumer.start();
55.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
56.         }catch (MQClientException e){
57.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
58. throw new Exception(e);
59.         }
60. return consumer;
61.     }
62. }

日志配置

image.png

1. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ALL
2. 
3. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ERROR


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
473 30
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
5月前
|
消息中间件 Linux API
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
|
27天前
|
SQL
南大通用GBase 8a配置gcware日志等级,减少日志输出,节省磁盘IO
南大通用GBase 8a配置gcware日志等级,减少日志输出,节省磁盘IO
|
1月前
|
存储 Prometheus 监控
Docker容器内进行应用调试与故障排除的方法与技巧,包括使用日志、进入容器检查、利用监控工具及检查配置等,旨在帮助用户有效应对应用部署中的挑战,确保应用稳定运行
本文深入探讨了在Docker容器内进行应用调试与故障排除的方法与技巧,包括使用日志、进入容器检查、利用监控工具及检查配置等,旨在帮助用户有效应对应用部署中的挑战,确保应用稳定运行。
47 5
|
3月前
|
网络协议 Linux Windows
Rsyslog配置不同端口收集不同设备日志
Rsyslog配置不同端口收集不同设备日志
|
3月前
|
开发工具 git
git显示开发日志+WinSW——将.exe文件注册为服务的一个工具+图床PicGo+kubeconfig 多个集群配置 如何切换
git显示开发日志+WinSW——将.exe文件注册为服务的一个工具+图床PicGo+kubeconfig 多个集群配置 如何切换
48 1
|
3月前
|
数据采集 监控 Java
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
本文是关于SpringBoot日志的详细教程,涵盖日志的定义、用途、SLF4J框架的使用、日志级别、持久化、文件分割及格式配置等内容。
234 0
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
ly~
|
4月前
|
消息中间件 搜索推荐 大数据
一般情况下在 RocketMQ 中添加 access key 的步骤: 一、确定配置文件位置 RocketMQ 的配置文件通常位于安装目录下的 conf 文件夹中。你需要找到 broker.conf 或相关的配置文件。 二、编辑配置文件 打开配置文件,查找与 ACL(访问控制列表)相关的配置部分。 在配置文件中添加以下内容:
大数据广泛应用于商业、金融、医疗和政府等多个领域。在商业上,它支持精准营销、客户细分及流失预测,并优化供应链管理;金融领域则利用大数据进行风险评估、市场预测及欺诈检测;医疗行业通过大数据预测疾病、提供个性化治疗;政府运用大数据进行城市规划和公共安全管理;工业领域则借助大数据进行设备维护、故障预测及质量控制。
ly~
216 2
|
5月前
|
Java 应用服务中间件 HSF
Java应用结构规范问题之配置Logback以仅记录错误级别的日志到一个滚动文件中的问题如何解决
Java应用结构规范问题之配置Logback以仅记录错误级别的日志到一个滚动文件中的问题如何解决
|
5月前
|
Java 应用服务中间件 HSF
Java应用结构规范问题之配置Logback以在控制台输出日志的问题如何解决
Java应用结构规范问题之配置Logback以在控制台输出日志的问题如何解决

热门文章

最新文章