微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题

简介: 微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题

RocketMQ-延时消息

给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫描,判断是否到达要求时间

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";


但这是默认的,我们可以修改


想修改可以去rocketmq的conf文件夹,修改broker.conf配置参数
该时间是指消息在中间件里面存储的时间

实现延时消息

消费者类:

public class Consumer {
    public static void main(String[] args) throws Exception {
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("消息消费时间:"+new Date()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

生产者类:

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        Message msg = new Message(topic,("延时消息,发送时间:"+new Date()).getBytes(Charset.defaultCharset()));
        //设置消息延时级别
        msg.setDelayTimeLevel(3);
        producer.sendOneway(msg);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        //关闭资源
        producer.shutdown();
    }
}

RocketMQ-消息过滤

Tag标签过滤

用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效。
生产者类:

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("tagProduceGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "tagFilterTopic";
        Message msg1 = new Message(topic,"TagA",("消息A").getBytes(Charset.defaultCharset()));
        Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset()));
        Message msg3 = new Message(topic,"TagC",("消息C").getBytes(Charset.defaultCharset()));
        producer.sendOneway(msg1);
        producer.sendOneway(msg2);
        producer.sendOneway(msg3);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        producer.shutdown();
    }
}

消费者类:

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("tagFilterTopic","TagA || TagC");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果


SQL标签过滤

可以过滤内容,像写where一样

生产者类:

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("sqlProduceGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "sqlFilterTopic";
        Message msg1 = new Message(topic,("美女A,年龄22,体重45").getBytes(Charset.defaultCharset()));
        msg1.putUserProperty("age","22");
        msg1.putUserProperty("weight","45");
        Message msg2 = new Message(topic,("美女B,年龄25,体重60").getBytes(Charset.defaultCharset()));
        msg2.putUserProperty("age","25");
        msg2.putUserProperty("weight","60");
        Message msg3 = new Message(topic,("美女C,年龄40,体重70").getBytes(Charset.defaultCharset()));
        msg3.putUserProperty("age","40");
        msg3.putUserProperty("weight","70");
        producer.sendOneway(msg1);
        producer.sendOneway(msg2);
        producer.sendOneway(msg3);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        producer.shutdown();
    }
}

消费者类:

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age>23 and weight>60"));
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果:

原因是因为默认是不支持sql过滤的,需要更改配置文件之后重启broker服务


在文件最后一行添加enablePropertyFilter=true即可

随后重新启动broker服务


之后运行结果




管控台搜索问题

为什么有时候管控台的消息都没有显示收到此消息,但消费者却能消费?


因为时间问题,因为我们的rocketmq是部署在虚拟机上的,当我们虚拟机和windows时间是同步的时候,消息是没有问题的,控制台显示时间内上下波动一小时的消息,但当虚拟机关掉的时候,时间是不动的,windows的时间却因为电脑里面的一个物理小电池,时间还在正常运行,两者时间不同步了,造成我们发消息是虚拟机的时间,控制台显示的是windows的时间,但消费因为并没有按照时间过滤,所以还是可以接收的到,把时间改一下又可以看到消息了



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
91 0
|
1月前
|
消息中间件 缓存 API
|
2月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
53 0
|
3月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
30 0
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
67 0
|
3月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
40 0
|
4月前
|
消息中间件 前端开发 架构师
华为架构师复盘2024最全2340页面试题jvm+spring+redis+MQ+微服务
包括 Java 集合、JVM、多线程、并发编程、设计模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat、Python、HTML、CSS、Vue、React、JavaScript、Android 大数据、阿里巴巴等大厂面试题等、等技术栈!
|
5月前
|
消息中间件 Java Apache
微服务轮子项目(26) -分布式事务(RocketMQ)
微服务轮子项目(26) -分布式事务(RocketMQ)
45 0
|
7月前
|
消息中间件 运维 算法
RabbitMQ高阶使用延时任务
RabbitMQ高阶使用延时任务
101 0