RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息

简介: RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息


过滤消息概述

大部分情况下 ,我们都可以通过TAG来选择我们想要获取的消息,如下

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。

在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。

RocketMQ定义的语法下,可以实现一些简单的逻辑。

举个例子


基本语法


使用限制

只有使用push模式的消费者才能用使用SQL92标准的sql语句

接口如下

public void subscribe(final String topic, final MessageSelector messageSelector)

启用配置 (重要 )

使用Filter功能,需要在启动配置文件当中配置以下选项

enablePropertyFilter=true

常见错误:The broker does not support consumer to filter message by SQL92

配置文件中增加如下配置

enablePropertyFilter=true

示例

生产者

package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:30
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterProducer {
    /***
     * TAG-FILTER-1000 ---> 布隆过滤器
     * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicFilter",
                    "TAG-FILTER",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息
            msg.putUserProperty("a", String.valueOf(i));
            if (i % 2 == 0) {
                msg.putUserProperty("b", "artisan");
            } else {
                msg.putUserProperty("b", "smart artisan");
            }
            producer.send(msg);
        }
        producer.shutdown();
    }
}

消费者

package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:45
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
        /**
         * 注册中心
         */
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        /**
         * 订阅主题
         * 一种资源去换取另外一种资源
         */
        consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));
        /**
         * 注册监听器,监听主题消息
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    try {
                        System.out.println("consumeThread=" + Thread.currentThread().getName()
                                + ", queueId=" + msg.getQueueId() + ", content:"
                                + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Filter Consumer Started.%n");
    }
}

日志:

Filter Consumer Started.
consumeThread=ConsumeMessageThread_1, queueId=0, content:Hello RocketMQ 0
consumeThread=ConsumeMessageThread_2, queueId=2, content:Hello RocketMQ 2

可以看到,我们虽然发了 3条消息 ,但是只获取了我们期望的2条消息,可见过滤起了作用。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
SQL 数据处理 数据库
SQL进阶之路:深入解析数据更新与删除技巧——掌握批量操作、条件筛选、子查询和事务处理,提升数据库维护效率与准确性
【8月更文挑战第31天】在数据库管理和应用开发中,数据的更新和删除至关重要,直接影响数据准确性、一致性和性能。本文通过具体案例,深入解析SQL中的高级更新(UPDATE)和删除(DELETE)技巧,包括批量更新、基于条件的删除以及使用子查询和事务处理复杂场景等,帮助读者提升数据处理能力。掌握这些技巧能够有效提高数据库性能并确保数据一致性。
66 0
|
4月前
|
SQL 存储 缓存
揭秘SQL中的公用表表达式:数据查询的新宠儿
揭秘SQL中的公用表表达式:数据查询的新宠儿
58 2
|
5月前
|
SQL API 数据库
在Python中获取筛选后的SQL数据行数
在Python中获取筛选后的SQL数据行数
57 1
|
5月前
|
SQL Oracle 关系型数据库
Oracle的PL/SQL表达式:数据的魔法公式
【4月更文挑战第19天】探索Oracle PL/SQL表达式,体验数据的魔法公式。表达式结合常量、变量、运算符和函数,用于数据运算与转换。算术运算符处理数值计算,比较运算符执行数据比较,内置函数如TO_CHAR、ROUND和SUBSTR提供多样化操作。条件表达式如CASE和NULLIF实现灵活逻辑判断。广泛应用于SQL查询和PL/SQL程序,助你驾驭数据,揭示其背后的规律与秘密,成为数据魔法师。
|
5月前
|
SQL Java 关系型数据库
MyBatis的动态SQL之OGNL(Object-Graph Navigation Language)表达式以及各种标签的用法
MyBatis的动态SQL之OGNL(Object-Graph Navigation Language)表达式以及各种标签的用法
97 0
|
5月前
|
SQL
leetcode-SQL-1440. 计算布尔表达式的值
leetcode-SQL-1440. 计算布尔表达式的值
58 1
|
5月前
|
消息中间件 Java RocketMQ
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
62 0
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
|
5月前
|
SQL 数据挖掘 数据处理
「SQL面试题库」 No_109 计算布尔表达式的值
「SQL面试题库」 No_109 计算布尔表达式的值
|
5月前
|
SQL Oracle 关系型数据库
Oracle PL/SQL 第三章--运算符与表达式
Oracle PL/SQL 第三章--运算符与表达式
|
5月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
61 0