概述
RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息 通过SQL92的方式,消费者可以过滤到自己想要的消息,其实RocketMQ还提供了一个更为抢到的功能,支持自定义Java类…
直接来看下如何使用的吧
集群信息
RocketMQ : V4.3.2
集群模式: 互为主备
节点信息: 192.168.18.130 192.168.18.131 双机互为主备
broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重启。
项目结构
生产者
package com.artisan.rocketmq.filter; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * @author 小工匠 * @version v1.0 * @create 2019-11-11 23:30 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class FilterProducer { /*** * TAG-FILTER-1000 ---> 布隆过滤器 * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group"); producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); producer.start(); for (int i = 0; i < 3; i++) { Message msg = new Message("TopicFilter", "TAG-FILTER", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息 msg.putUserProperty("a", String.valueOf(i)); if (i % 2 == 0) { msg.putUserProperty("b", "artisan"); } else { msg.putUserProperty("b", "smart artisan"); } producer.send(msg); } producer.shutdown(); } }
自定义类
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.artisan.rocketmq.filter; import org.apache.rocketmq.common.filter.FilterContext; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg, FilterContext context) { String property = msg.getProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); if (((id % 10) == 0) && (id > 100)) { return true; } } return false; } }
消费者
package com.artisan.rocketmq.filter; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author 小工匠 * @version v1.0 * @create 2019-11-11 23:45 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class FilterConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group"); /** * 注册中心 */ consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876"); /** * 订阅主题 * 一种资源去换取另外一种资源 */ consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'")); /** * 注册监听器,监听主题消息 */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs){ try { System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Filter Consumer Started.%n"); } }
测试结果
目前 没测试成功,先记录下。