RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息

简介: RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息


概述

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息 通过SQL92的方式,消费者可以过滤到自己想要的消息,其实RocketMQ还提供了一个更为抢到的功能,支持自定义Java类…

直接来看下如何使用的吧


集群信息

RocketMQ : V4.3.2

集群模式: 互为主备

节点信息: 192.168.18.130 192.168.18.131 双机互为主备

broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重启。


项目结构

生产者

package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:30
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterProducer {
    /***
     * TAG-FILTER-1000 ---> 布隆过滤器
     * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicFilter",
                    "TAG-FILTER",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息
            msg.putUserProperty("a", String.valueOf(i));
            if (i % 2 == 0) {
                msg.putUserProperty("b", "artisan");
            } else {
                msg.putUserProperty("b", "smart artisan");
            }
            producer.send(msg);
        }
        producer.shutdown();
    }
}

自定义类

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.artisan.rocketmq.filter;
import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg, FilterContext context) {
        String property = msg.getProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if (((id % 10) == 0) && (id > 100)) {
                return true;
            }
        }
        return false;
    }
}

消费者

package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:45
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
        /**
         * 注册中心
         */
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        /**
         * 订阅主题
         * 一种资源去换取另外一种资源
         */
        consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));
        /**
         * 注册监听器,监听主题消息
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    try {
                        System.out.println("consumeThread=" + Thread.currentThread().getName()
                                + ", queueId=" + msg.getQueueId() + ", content:"
                                + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Filter Consumer Started.%n");
    }
}

测试结果

目前 没测试成功,先记录下。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8天前
|
Java
让星星⭐月亮告诉你,自定义定时器和Java自带原生定时器
定时器是一种可以设置多个具有不同执行时间和间隔的任务的工具。本文介绍了定时器的基本概念、如何自定义实现一个定时器,以及Java原生定时器的使用方法,包括定义定时任务接口、实现任务、定义任务处理线程和使用Java的`Timer`与`TimerTask`类来管理和执行定时任务。
27 3
|
5天前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
1天前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。HashSet基于哈希表实现,提供高效的元素操作;TreeSet则通过红黑树实现元素的自然排序,适合需要有序访问的场景。本文通过示例代码详细介绍了两者的特性和应用场景。
15 6
|
3天前
|
IDE Java 编译器
Java:如何确定编译和运行时类路径是否一致
类路径(Classpath)是JVM用于查找类文件的路径列表,对编译和运行Java程序至关重要。编译时通过`javac -classpath`指定,运行时通过`java -classpath`指定。IDE如Eclipse和IntelliJ IDEA也提供界面管理类路径。确保编译和运行时类路径一致,特别是外部库和项目内部类的路径设置。
|
3天前
|
安全 Java 测试技术
Java零基础-StringBuffer 类详解
【10月更文挑战第9天】Java零基础教学篇,手把手实践教学!
11 2
|
2天前
|
安全 Java
如何在 Java 中创建自定义安全管理器
在Java中创建自定义安全管理器需要继承SecurityManager类并重写其方法,以实现特定的安全策略。通过设置系统安全属性来启用自定义安全管理器,从而控制应用程序的访问权限和安全行为。
|
3天前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其独特的“不重复性”要求,彻底改变了处理唯一性约束数据的方式。
【10月更文挑战第14天】从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其独特的“不重复性”要求,彻底改变了处理唯一性约束数据的方式。本文深入探讨Set的核心理念,并通过示例代码展示了HashSet和TreeSet的特点和应用场景。
11 2
|
4天前
|
存储 Java 索引
Java 中集合框架的常见接口和类
【10月更文挑战第13天】这些只是集合框架中的一部分常见接口和类,还有其他一些如 Queue、Deque 等接口以及相关的实现类。理解和掌握这些集合的特点和用法对于高效编程非常重要。
|
8天前
|
小程序 Oracle Java
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
这篇文章是关于JVM基础知识的介绍,包括JVM的跨平台和跨语言特性、Class文件格式的详细解析,以及如何使用javap和jclasslib工具来分析Class文件。
21 0
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
|
11天前
|
存储 安全 Java
Java零基础-Java类详解
【10月更文挑战第2天】Java零基础教学篇,手把手实践教学!
13 2