RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息

简介: RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息


概述

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息 通过SQL92的方式,消费者可以过滤到自己想要的消息,其实RocketMQ还提供了一个更为抢到的功能,支持自定义Java类…

直接来看下如何使用的吧


集群信息

RocketMQ : V4.3.2

集群模式: 互为主备

节点信息: 192.168.18.130 192.168.18.131 双机互为主备

broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重启。


项目结构

生产者

package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:30
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterProducer {
    /***
     * TAG-FILTER-1000 ---> 布隆过滤器
     * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicFilter",
                    "TAG-FILTER",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息
            msg.putUserProperty("a", String.valueOf(i));
            if (i % 2 == 0) {
                msg.putUserProperty("b", "artisan");
            } else {
                msg.putUserProperty("b", "smart artisan");
            }
            producer.send(msg);
        }
        producer.shutdown();
    }
}

自定义类

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.artisan.rocketmq.filter;
import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg, FilterContext context) {
        String property = msg.getProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if (((id % 10) == 0) && (id > 100)) {
                return true;
            }
        }
        return false;
    }
}

消费者

package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:45
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
        /**
         * 注册中心
         */
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        /**
         * 订阅主题
         * 一种资源去换取另外一种资源
         */
        consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));
        /**
         * 注册监听器,监听主题消息
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    try {
                        System.out.println("consumeThread=" + Thread.currentThread().getName()
                                + ", queueId=" + msg.getQueueId() + ", content:"
                                + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Filter Consumer Started.%n");
    }
}

测试结果

目前 没测试成功,先记录下。


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
安全 Java 数据建模
Java记录类:简化数据载体的新选择
Java记录类:简化数据载体的新选择
410 101
|
7月前
|
安全 Java 开发者
Java记录类:简化数据载体的新方式
Java记录类:简化数据载体的新方式
352 100
|
8月前
|
安全 IDE Java
Java记录类型(Record):简化数据载体类
Java记录类型(Record):简化数据载体类
615 143
|
6月前
|
存储 Java 索引
用Java语言实现一个自定义的ArrayList类
自定义MyArrayList类模拟Java ArrayList核心功能,支持泛型、动态扩容(1.5倍)、增删改查及越界检查,底层用Object数组实现,适合学习动态数组原理。
281 4
|
6月前
|
IDE JavaScript Java
在Java 11中,如何处理被弃用的类或接口?
在Java 11中,如何处理被弃用的类或接口?
335 5
|
6月前
|
编解码 Java 开发者
Java String类的关键方法总结
以上总结了Java `String` 类最常见和重要功能性方法。每种操作都对应着日常编程任务,并且理解每种操作如何影响及处理 `Strings` 对于任何使用 Java 的开发者来说都至关重要。
414 5
|
6月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
344 1
|
6月前
|
Java Go 开发工具
【Java】(8)正则表达式的使用与常用类分享
正则表达式定义了字符串的模式。正则表达式并不仅限于某一种语言,但是在每种语言中有细微的差别。
470 1
|
6月前
|
存储 Java 程序员
【Java】(6)全方面带你了解Java里的日期与时间内容,介绍 Calendar、GregorianCalendar、Date类
java.util 包提供了 Date 类来封装当前的日期和时间。Date 类提供两个构造函数来实例化 Date 对象。第一个构造函数使用当前日期和时间来初始化对象。Date( )第二个构造函数接收一个参数,该参数是从1970年1月1日起的毫秒数。
303 0
|
6月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
341 1