过滤消息概述
大部分情况下 ,我们都可以通过TAG来选择我们想要获取的消息,如下
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。
在RocketMQ定义的语法下,可以实现一些简单的逻辑。
举个例子
基本语法
使用限制
只有使用push模式的消费者才能用使用SQL92标准的sql语句 ,
接口如下
public void subscribe(final String topic, final MessageSelector messageSelector)
启用配置 (重要 )
使用Filter功能,需要在启动配置文件当中配置以下选项
enablePropertyFilter=true
常见错误:The broker does not support consumer to filter message by SQL92
配置文件中增加如下配置
enablePropertyFilter=true
示例
生产者
package com.artisan.rocketmq.filter; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * @author 小工匠 * @version v1.0 * @create 2019-11-11 23:30 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class FilterProducer { /*** * TAG-FILTER-1000 ---> 布隆过滤器 * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group"); producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); producer.start(); for (int i = 0; i < 3; i++) { Message msg = new Message("TopicFilter", "TAG-FILTER", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息 msg.putUserProperty("a", String.valueOf(i)); if (i % 2 == 0) { msg.putUserProperty("b", "artisan"); } else { msg.putUserProperty("b", "smart artisan"); } producer.send(msg); } producer.shutdown(); } }
消费者
package com.artisan.rocketmq.filter; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author 小工匠 * @version v1.0 * @create 2019-11-11 23:45 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class FilterConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group"); /** * 注册中心 */ consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); /** * 订阅主题 * 一种资源去换取另外一种资源 */ consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'")); /** * 注册监听器,监听主题消息 */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs){ try { System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Filter Consumer Started.%n"); } }
日志:
Filter Consumer Started. consumeThread=ConsumeMessageThread_1, queueId=0, content:Hello RocketMQ 0 consumeThread=ConsumeMessageThread_2, queueId=2, content:Hello RocketMQ 2
可以看到,我们虽然发了 3条消息 ,但是只获取了我们期望的2条消息,可见过滤起了作用。