SpringBoot整合RocketMQ发送消息过滤

简介: 消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤
  1. Tag过滤

通过消费者端指定要订阅消息的Tag,如果订阅多个Tag的消息,Tag间使用或运算符(||)连接

  1. SQL过滤

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:

数值:比如:123,3.1415
字符:必须用单引号包裹起来,比如:‘abc’
布尔:TRUE 或 FALSE
NULL:特殊的常量,表示空
支持的运算符有:

数值比较:>,>=,<,<=,BETWEEN,=
字符比较:=,<>,IN
逻辑运算 :AND,OR,NOT
NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter 1 = true
修改broker配置文件后,需要重启broker

  1. 生产者业务接口
public interface FilterMessageService {

    /**
     * 发送Tag过滤消息
     * @param id
     * @param message
     */
    void sendFilterTagMessage(String id, String message);

    /**
     * 发送SQL过滤消息
     * @param id
     * @param message
     * @param index
     */
    void sendFilterSqlMessage(String id, String message, int index);
}
  1. 生产者业务接口实现类
@Service
public class FilterMessageServiceImpl implements FilterMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(FilterMessageServiceImpl.class);

    @Override
    public void sendFilterTagMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送TAG过滤消息成功!");
        } else {
            logger.info("发送TAG过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }

    @Override
    public void sendFilterSqlMessage(String id, String message, int index) {
        Message<String> strMessage = MessageBuilder.withPayload(message)
                .setHeader(RocketMQHeaders.KEYS, id)
                .setHeader("age", index).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送SQL过滤消息成功!");
        } else {
            logger.info("发送SQL过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }
}
  1. Tag过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-tag-consumer-group", selectorExpression = "sync-tags || async-tags")
public class FilterTagMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterTagMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到TAG过滤消息为:{}", message);
    }
}
  1. SQL过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-sql-consumer-group", selectorExpression = "age >= 5", selectorType = SelectorType.SQL92)
public class FilterSqlMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterSqlMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到SQL过滤消息为:{}", message);
    }
}
  1. 测试
@Test
void filterMessage() {
    /*for (int i = 0; i < 10; i++) {
        String uuid = UUID.randomUUID().toString();
        filterMessageService.sendFilterSqlMessage(uuid, "hello" + i, i);
    }*/
    String uuid = UUID.randomUUID().toString();
    filterMessageService.sendFilterTagMessage(uuid, "hello");
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Java RocketMQ
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
1111 0
|
4月前
|
消息中间件 Java Maven
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
385 1
|
7月前
|
消息中间件 Java RocketMQ
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
68 0
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
|
7月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息
132 0
|
消息中间件 Java Kafka
springboot整合kafka,kafka消息过滤
springboot整合kafka,kafka消息过滤
578 0
|
消息中间件 Java RocketMQ
Springboot 集成 Rocketmq 消费者|学习笔记
快速学习 Springboot 集成 Rocketmq 消费者
1437 1
Springboot 集成 Rocketmq 消费者|学习笔记
|
消息中间件 IDE Java
Springboot 集成 Rocketmq 生产者|学习笔记
快速学习 Springboot 集成 Rocketmq 生产者
794 0
Springboot 集成 Rocketmq 生产者|学习笔记
|
消息中间件 Java RocketMQ
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
2056 0
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息