开发者社区> 游客bzudnnsbr7w4e> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

SpringBoot整合RocketMQ发送消息过滤

简介: 消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。 对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤
+关注继续查看
  1. Tag过滤

通过消费者端指定要订阅消息的Tag,如果订阅多个Tag的消息,Tag间使用或运算符(||)连接

  1. SQL过滤

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:

数值:比如:123,3.1415
字符:必须用单引号包裹起来,比如:‘abc’
布尔:TRUE 或 FALSE
NULL:特殊的常量,表示空
支持的运算符有:

数值比较:>,>=,<,<=,BETWEEN,=
字符比较:=,<>,IN
逻辑运算 :AND,OR,NOT
NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter 1 = true
修改broker配置文件后,需要重启broker

  1. 生产者业务接口
public interface FilterMessageService {

    /**
     * 发送Tag过滤消息
     * @param id
     * @param message
     */
    void sendFilterTagMessage(String id, String message);

    /**
     * 发送SQL过滤消息
     * @param id
     * @param message
     * @param index
     */
    void sendFilterSqlMessage(String id, String message, int index);
}
  1. 生产者业务接口实现类
@Service
public class FilterMessageServiceImpl implements FilterMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(FilterMessageServiceImpl.class);

    @Override
    public void sendFilterTagMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送TAG过滤消息成功!");
        } else {
            logger.info("发送TAG过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }

    @Override
    public void sendFilterSqlMessage(String id, String message, int index) {
        Message<String> strMessage = MessageBuilder.withPayload(message)
                .setHeader(RocketMQHeaders.KEYS, id)
                .setHeader("age", index).build();
        SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送SQL过滤消息成功!");
        } else {
            logger.info("发送SQL过滤消息失败!消息状态为:{}", result.getSendStatus());
        }
    }
}
  1. Tag过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-tag-consumer-group", selectorExpression = "sync-tags || async-tags")
public class FilterTagMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterTagMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到TAG过滤消息为:{}", message);
    }
}
  1. SQL过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-sql-consumer-group", selectorExpression = "age >= 5", selectorType = SelectorType.SQL92)
public class FilterSqlMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(FilterSqlMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到SQL过滤消息为:{}", message);
    }
}
  1. 测试
@Test
void filterMessage() {
    /*for (int i = 0; i < 10; i++) {
        String uuid = UUID.randomUUID().toString();
        filterMessageService.sendFilterSqlMessage(uuid, "hello" + i, i);
    }*/
    String uuid = UUID.randomUUID().toString();
    filterMessageService.sendFilterTagMessage(uuid, "hello");
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
SpringBoot整合Spring Security实现安全管理
SpringBoot整合Spring Security实现安全管理
247 0
SpringBoot整合MongoDB实现文件存储
SpringBoot整合MongoDB实现文件存储
321 0
SpringBoot 实战:一招实现结果的优雅响应
今天说一下 Spring Boot 如何实现优雅的数据响应:统一的结果响应格式、简单的数据封装。
122 0
SpringBoot里slf4j日志功能的默认实现
SpringBoot里slf4j日志功能的默认实现
59 0
SpringBoot里实现了某个接口的实现类运行时如何注入的?
SpringBoot里实现了某个接口的实现类运行时如何注入的?
62 0
SpringBoot【实现热部署-devtools】
本文介绍下在IDEA中项目热部署的两种方式,因为如果每次我们修改下页面的代码都需要重新启动的话那么效率就太低了。
74 0
基于springboot实现一个简单的aop
要实现aop,首先你要知道你拿aop来干啥,我们今天就以记录日志来说,因为这个最常用,一般对于重要的数据库操作,我们需要记录操作人、什么时间、做了什么,关于做了什么怎么实现我们后面细讲
5396 0
Java:SpringBoot集成JWT实现token验证
Java:SpringBoot集成JWT实现token验证
92 0
缓存架构中的服务详解!SpringBoot中二级缓存服务实现
本文分析了在微服务架构项目中SpringBoot框架里的缓存的集成和使用。通过真实的案例说明了如何创建缓存服务,包括创建缓存服务的接口,创建缓存服务的提供者和缓存服务的消费者。重点分析了MyBatis中的二级缓存,主要说明了MyBatis中二级缓存的配置和使用。通过对本文的学习,可以清楚地了解到SpringBoot中的缓存功能以及缓存在MyBatis框架中的应用。
125 0
SpringBoot从入门到精通(三十三)Spring Boot项目打包,实现静态文件、配置文件与jar分离!
前面介绍了Spring Boot项目的打包、发布和部署。我们知道Spring Boot打包时,默认是会把resource目录下的静态资源文件和配置文件统一打包到jar文件中。这样部署到生产环境中一旦需要修改配置文件,则非常麻烦。
691 0
文章
问答
文章排行榜
最热
最新
相关电子书
更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析
立即下载
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载