SpringBoot整合RocketMQ发送消息过滤

简介: 消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。 对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤
  1. Tag过滤

通过消费者端指定要订阅消息的Tag,如果订阅多个Tag的消息,Tag间使用或运算符(||)连接

  1. SQL过滤

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:

数值:比如:123,3.1415
字符:必须用单引号包裹起来,比如:‘abc’
布尔:TRUE 或 FALSE
NULL:特殊的常量,表示空
支持的运算符有:

数值比较:>,>=,<,<=,BETWEEN,=
字符比较:=,<>,IN
逻辑运算 :AND,OR,NOT
NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter 1 = true
修改broker配置文件后,需要重启broker

  1. 生产者业务接口
public interface FilterMessageService {

    /**
     * 发送Tag过滤消息
     * @param id
     * @param message
     */
    void sendFilterTagMessage(String id, String message);

    /**
     * 发送SQL过滤消息
     * @param id
     * @param message
     * @param index
     */
    void sendFilterSqlMessage(String id, String message, int index);
}
  1. 生产者业务接口实现类
@Service
public class FilterMessageServiceImpl implements FilterMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(FilterMessageServiceImpl.class);

    @Override
    public void sendFilterTagMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送TAG过滤消息成功!");
        } else {
            logger.info("发送TAG过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }

    @Override
    public void sendFilterSqlMessage(String id, String message, int index) {
        Message<String> strMessage = MessageBuilder.withPayload(message)
                .setHeader(RocketMQHeaders.KEYS, id)
                .setHeader("age", index).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送SQL过滤消息成功!");
        } else {
            logger.info("发送SQL过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }
}
  1. Tag过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-tag-consumer-group", selectorExpression = "sync-tags || async-tags")
public class FilterTagMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterTagMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到TAG过滤消息为:{}", message);
    }
}
  1. SQL过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-sql-consumer-group", selectorExpression = "age >= 5", selectorType = SelectorType.SQL92)
public class FilterSqlMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterSqlMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到SQL过滤消息为:{}", message);
    }
}
  1. 测试
@Test
void filterMessage() {
    /*for (int i = 0; i < 10; i++) {
        String uuid = UUID.randomUUID().toString();
        filterMessageService.sendFilterSqlMessage(uuid, "hello" + i, i);
    }*/
    String uuid = UUID.randomUUID().toString();
    filterMessageService.sendFilterTagMessage(uuid, "hello");
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
25天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
283 1
|
4月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
620 0
|
10月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
154 6
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
2079 3
|
消息中间件 Java Maven
|
2天前
|
搜索推荐 JavaScript Java
基于springboot的儿童家长教育能力提升学习系统
本系统聚焦儿童家长教育能力提升,针对家庭教育中理念混乱、时间不足、个性化服务缺失等问题,构建科学、系统、个性化的在线学习平台。融合Spring Boot、Vue等先进技术,整合优质教育资源,提供高效便捷的学习路径,助力家长掌握科学育儿方法,促进儿童全面健康发展,推动家庭和谐与社会进步。
|
2天前
|
JavaScript Java 关系型数据库
基于springboot的古树名木保护管理系统
本研究针对古树保护面临的严峻挑战,构建基于Java、Vue、MySQL与Spring Boot技术的信息化管理系统,实现古树资源的动态监测、数据管理与科学保护,推动生态、文化与经济可持续发展。