五分钟带你玩转rocketMQ(七)吐血总结延时队列,批处理,条件过滤与日志配置

简介: 五分钟带你玩转rocketMQ(七)吐血总结延时队列,批处理,条件过滤与日志配置


一.延时队列

定时队列 -它们要在规定的时间之后才能传递

1.修改调用类即可

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Resource(name = "customRocketMQProducer")
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. for (int i = 0; i < 100; i++) {
15. final int index = i;
16. String msg = "demo msg test";
17.             logger.info("开始发送消息:" + msg);
18. Message sendMsg = new Message("DemoTopic", "TagA", String.valueOf(i), msg.getBytes());
19. //预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
20. //设置延时时间 3即为10s
21.             sendMsg.setDelayTimeLevel(3);
22. //默认3秒超时
23. SendResult sendResult = defaultMQProducer.send(sendMsg);
24.             logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
25.         }
26.     }
27. }

二.批处理

成批发送消息提高了传递小消息的性能。

使用限制

同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,并且不支持调度。

此外,一批消息的总大小不应超过1MiB。

如何使用批处理

如果一次只发送不超过1MiB的消息,则很容易使用批处理:

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Resource(name = "customRocketMQProducer")
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. //for (int i = 0; i < 100; i++) {
15. //final int index = i;
16. String msg = "demo msg test";
17.         logger.info("开始发送消息:" + msg);
18.         List<Message> messages = new ArrayList<>();
19.         messages.add(new Message("DemoTopic", "TagA", "OrderID001", "Hello world 0".getBytes()));
20.         messages.add(new Message("DemoTopic", "TagA", "OrderID002", "Hello world 1".getBytes()));
21.         messages.add(new Message("DemoTopic", "TagA", "OrderID003", "Hello world 2".getBytes()));
22. //默认3秒超时
23. SendResult sendResult = defaultMQProducer.send(messages);
24.     }
25. }

条件过滤

在使用时需要开启条件过滤启动配置 否则会报错

The broker does not support consumer to filter message by SQL92

修改broker.conf文件 加入

enablePropertyFilter = true

image.png

同时启动broker 命令为

The broker does not support consumer to filter message by SQL92

筛选固定条件消息

1. 数值比较:>, >=, <, <=, BETWEEN, =;
2. 字符比较  =, <>, IN;
3. IS NULL or IS NOT NULL;
4. 逻辑:AND, OR, NOT;

使用限制:

只有push consumer可以通过SQL92选择消息。接口是:

public void subscribe(final String topic, final MessageSelector messageSelector)

代码示例

调用方法

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Resource(name = "customRocketMQProducer")
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. for (int i = 0; i < 100; i++) {
15. final int index = i;
16. String msg = "demo msg test";
17.             logger.info("开始发送消息:" + msg);
18. Message sendMsg = new Message("DemoTopic",
19. "DemoTag",
20.                     ("Hello RocketMQ " + i).getBytes()
21.             );
22. //消费者根据a进行过滤
23.             sendMsg.putUserProperty("a", String.valueOf(i));
24. SendResult sendResult = defaultMQProducer.send(sendMsg);
25.             logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
26.         }
27.     }
28. }

消费者

1. @SpringBootConfiguration
2. public class MQConsumerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
5. @Value("${rocketmq.consumer.namesrvAddr}")
6. private String namesrvAddr;
7. @Value("${rocketmq.consumer.groupName}")
8. private String groupName;
9. @Value("${rocketmq.consumer.consumeThreadMin}")
10. private int consumeThreadMin;
11. @Value("${rocketmq.consumer.consumeThreadMax}")
12. private int consumeThreadMax;
13. @Value("${rocketmq.consumer.topics}")
14. private String topics;
15. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
16. private int consumeMessageBatchMaxSize;
17. @Autowired
18. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
19. 
20. @Bean
21. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
22. if (StringUtils.isEmpty(groupName)){
23. throw new Exception("groupName is null !!!");
24.         }
25. if (StringUtils.isEmpty(namesrvAddr)){
26. throw new Exception("namesrvAddr is null !!!");
27.         }
28. if(StringUtils.isEmpty(topics)){
29. throw new Exception("topics is null !!!");
30.         }
31.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
32.         consumer.setNamesrvAddr(namesrvAddr);
33.         consumer.setConsumeThreadMin(consumeThreadMin);
34.         consumer.setConsumeThreadMax(consumeThreadMax);
35.         consumer.registerMessageListener(mqMessageListenerProcessor);
36. 
37. /**
38.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
39.          * 如果非第一次启动,那么按照上次消费的位置继续消费
40.          */
41.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
42. /**
43.          * 设置消费模型,集群还是广播,默认为集群
44.          */
45.         consumer.setMessageModel(MessageModel.CLUSTERING);
46. /**
47.          * 设置一次消费消息的条数,默认为1条
48.          */
49.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
50. 
51. try {           
52. //根据条件过滤消息
53.             consumer.subscribe("DemoTopic", MessageSelector.bySql("a between 0 and 3"));
54.             consumer.start();
55.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
56.         }catch (MQClientException e){
57.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
58. throw new Exception(e);
59.         }
60. return consumer;
61.     }
62. }

日志配置

image.png

1. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ALL
2. 
3. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ERROR


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
监控 安全 程序员
Python日志模块配置:从print到logging的优雅升级指南
从 `print` 到 `logging` 是 Python 开发的必经之路。`print` 调试简单却难维护,日志混乱、无法分级、缺乏上下文;而 `logging` 支持级别控制、多输出、结构化记录,助力项目可维护性升级。本文详解痛点、优势、迁移方案与最佳实践,助你构建专业日志系统,让程序“有记忆”。
367 0
|
5月前
|
缓存 Java 应用服务中间件
Spring Boot配置优化:Tomcat+数据库+缓存+日志,全场景教程
本文详解Spring Boot十大核心配置优化技巧,涵盖Tomcat连接池、数据库连接池、Jackson时区、日志管理、缓存策略、异步线程池等关键配置,结合代码示例与通俗解释,助你轻松掌握高并发场景下的性能调优方法,适用于实际项目落地。
858 5
|
11月前
|
SQL Java 数据库连接
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录—— application.yml 中对日志的配置
在 Spring Boot 项目中,`application.yml` 文件用于配置日志。通过 `logging.config` 指定日志配置文件(如 `logback.xml`),实现日志详细设置。`logging.level` 可定义包的日志输出级别,例如将 `com.itcodai.course03.dao` 包设为 `trace` 级别,便于开发时查看 SQL 操作。日志级别从高到低为 ERROR、WARN、INFO、DEBUG,生产环境建议调整为较高级别以减少日志量。本课程采用 yml 格式,因其层次清晰,但需注意格式要求。
1060 0
|
安全 BI 网络安全
EventLog Analyzer 如何满足等保合规要求?密码有效期、产品日志保留、配置备份三大核心问题全面解答
EventLog Analyzer(ELA)助力企业满足网络安全等级保护要求,支持配置自动/手动备份、日志180天留存及密码策略管理,提升合规性与安全运营效率。
184 0
|
7月前
|
JSON 安全 Go
Go语言项目工程化 —— 日志、配置、错误处理规范
本章详解Go语言项目工程化核心规范,涵盖日志、配置与错误处理三大关键领域。在日志方面,强调其在问题排查、性能优化和安全审计中的作用,推荐使用高性能结构化日志库zap,并介绍日志级别与结构化输出的最佳实践。配置管理部分讨论了配置分离的必要性,对比多种配置格式如JSON、YAML及环境变量,并提供viper库实现多环境配置的示例。错误处理部分阐述Go语言显式返回error的设计哲学,讲解标准处理方式、自定义错误类型、错误封装与堆栈追踪技巧,并提出按调用层级进行错误处理的建议。最后,总结各模块的工程化最佳实践,助力构建可维护、可观测且健壮的Go应用。
|
8月前
|
存储 NoSQL MongoDB
Docker中安装MongoDB并配置数据、日志、配置文件持久化。
现在,你有了一个运行在Docker中的MongoDB,它拥有自己的小空间,对高楼大厦的崩塌视而不见(会话丢失和数据不持久化的问题)。这个MongoDB的数据、日志、配置文件都会妥妥地保存在你为它精心准备的地方,天旋地转,它也不会失去一丁点儿宝贵的记忆(即使在容器重启后)。
991 4
|
10月前
|
存储 监控 API
【Azure App Service】分享使用Python Code获取App Service的服务器日志记录管理配置信息
本文介绍了如何通过Python代码获取App Service中“Web服务器日志记录”的配置状态。借助`azure-mgmt-web` SDK,可通过初始化`WebSiteManagementClient`对象、调用`get_configuration`方法来查看`http_logging_enabled`的值,从而判断日志记录是否启用及存储方式(关闭、存储或文件系统)。示例代码详细展示了实现步骤,并附有执行结果与官方文档参考链接,帮助开发者快速定位和解决问题。
299 22
|
11月前
|
监控 Shell Linux
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
|
11月前
|
数据库连接 测试技术 Windows
【YashanDB知识库】windows配置ODBC跟踪日志, 使用日志定位问题
【YashanDB知识库】windows配置ODBC跟踪日志, 使用日志定位问题
|
11月前
|
消息中间件 测试技术 Kafka
Apache RocketMQ 批处理模型演进之路
Apache RocketMQ 批处理模型演进之路
138 0