SpringBoot整合RocketMQ发送消息过滤

简介: 消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤
  1. Tag过滤

通过消费者端指定要订阅消息的Tag,如果订阅多个Tag的消息,Tag间使用或运算符(||)连接

  1. SQL过滤

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:

数值:比如:123,3.1415
字符:必须用单引号包裹起来,比如:‘abc’
布尔:TRUE 或 FALSE
NULL:特殊的常量,表示空
支持的运算符有:

数值比较:>,>=,<,<=,BETWEEN,=
字符比较:=,<>,IN
逻辑运算 :AND,OR,NOT
NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter 1 = true
修改broker配置文件后,需要重启broker

  1. 生产者业务接口
public interface FilterMessageService {

    /**
     * 发送Tag过滤消息
     * @param id
     * @param message
     */
    void sendFilterTagMessage(String id, String message);

    /**
     * 发送SQL过滤消息
     * @param id
     * @param message
     * @param index
     */
    void sendFilterSqlMessage(String id, String message, int index);
}
  1. 生产者业务接口实现类
@Service
public class FilterMessageServiceImpl implements FilterMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(FilterMessageServiceImpl.class);

    @Override
    public void sendFilterTagMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送TAG过滤消息成功!");
        } else {
            logger.info("发送TAG过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }

    @Override
    public void sendFilterSqlMessage(String id, String message, int index) {
        Message<String> strMessage = MessageBuilder.withPayload(message)
                .setHeader(RocketMQHeaders.KEYS, id)
                .setHeader("age", index).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送SQL过滤消息成功!");
        } else {
            logger.info("发送SQL过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }
}
  1. Tag过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-tag-consumer-group", selectorExpression = "sync-tags || async-tags")
public class FilterTagMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterTagMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到TAG过滤消息为:{}", message);
    }
}
  1. SQL过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-sql-consumer-group", selectorExpression = "age >= 5", selectorType = SelectorType.SQL92)
public class FilterSqlMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterSqlMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到SQL过滤消息为:{}", message);
    }
}
  1. 测试
@Test
void filterMessage() {
    /*for (int i = 0; i < 10; i++) {
        String uuid = UUID.randomUUID().toString();
        filterMessageService.sendFilterSqlMessage(uuid, "hello" + i, i);
    }*/
    String uuid = UUID.randomUUID().toString();
    filterMessageService.sendFilterTagMessage(uuid, "hello");
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Java RocketMQ
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
1094 0
|
3月前
|
消息中间件 Java Maven
|
4月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
318 1
|
6月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息
117 0
|
消息中间件 Java Kafka
springboot整合kafka,kafka消息过滤
springboot整合kafka,kafka消息过滤
559 0
|
消息中间件 IDE Java
Springboot 集成 Rocketmq 生产者|学习笔记
快速学习 Springboot 集成 Rocketmq 生产者
785 0
Springboot 集成 Rocketmq 生产者|学习笔记
|
消息中间件 Java 网络架构
SpringBoot整合RabbitMQ 实现五种消息模型
SpringBoot整合RabbitMQ 实现五种消息模型
259 2
|
消息中间件 算法 数据可视化
SpringBoot整合RocketMQ发送顺序消息
严格按照消息的发送顺序进行消费的消息。默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列,而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的,如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性
|
消息中间件 Unix Java
SpringBoot整合RocketMQ发送事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
1127 1