- Tag过滤
通过消费者端指定要订阅消息的Tag,如果订阅多个Tag的消息,Tag间使用或运算符(||)连接
- SQL过滤
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
支持的常量类型:
数值:比如:123,3.1415
字符:必须用单引号包裹起来,比如:‘abc’
布尔:TRUE 或 FALSE
NULL:特殊的常量,表示空
支持的运算符有:
数值比较:>,>=,<,<=,BETWEEN,=
字符比较:=,<>,IN
逻辑运算 :AND,OR,NOT
NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter 1 = true
修改broker配置文件后,需要重启broker
- 生产者业务接口
public interface FilterMessageService {
/**
* 发送Tag过滤消息
* @param id
* @param message
*/
void sendFilterTagMessage(String id, String message);
/**
* 发送SQL过滤消息
* @param id
* @param message
* @param index
*/
void sendFilterSqlMessage(String id, String message, int index);
}
- 生产者业务接口实现类
@Service
public class FilterMessageServiceImpl implements FilterMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger logger = LoggerFactory.getLogger(FilterMessageServiceImpl.class);
@Override
public void sendFilterTagMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
if (result.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送TAG过滤消息成功!");
} else {
logger.info("发送TAG过滤消息失败!消息状态为:{}", result.getSendStatus());
}
}
@Override
public void sendFilterSqlMessage(String id, String message, int index) {
Message<String> strMessage = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, id)
.setHeader("age", index).build();
SendResult result = rocketMQTemplate.syncSend("filter-message-topic:sync-tags", strMessage);
if (result.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送SQL过滤消息成功!");
} else {
logger.info("发送SQL过滤消息失败!消息状态为:{}", result.getSendStatus());
}
}
}
- Tag过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-tag-consumer-group", selectorExpression = "sync-tags || async-tags")
public class FilterTagMessageListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(FilterTagMessageListener.class);
@Override
public void onMessage(String message) {
logger.info("接收到TAG过滤消息为:{}", message);
}
}
- SQL过滤消费者类
@Component
@RocketMQMessageListener(topic = "filter-message-topic", consumerGroup = "filter-sql-consumer-group", selectorExpression = "age >= 5", selectorType = SelectorType.SQL92)
public class FilterSqlMessageListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(FilterSqlMessageListener.class);
@Override
public void onMessage(String message) {
logger.info("接收到SQL过滤消息为:{}", message);
}
}
- 测试
@Test
void filterMessage() {
/*for (int i = 0; i < 10; i++) {
String uuid = UUID.randomUUID().toString();
filterMessageService.sendFilterSqlMessage(uuid, "hello" + i, i);
}*/
String uuid = UUID.randomUUID().toString();
filterMessageService.sendFilterTagMessage(uuid, "hello");
}