RocketMQ-延时消息
给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫描,判断是否到达要求时间
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
但这是默认的,我们可以修改
想修改可以去rocketmq的conf文件夹,修改broker.conf配置参数
该时间是指消息在中间件里面存储的时间
实现延时消息
消费者类:
public class Consumer { public static void main(String[] args) throws Exception { //定义消息消费者(在同一个JVM中,消费者的组名不能重复) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup"); //设置nameServer地址 consumer.setNamesrvAddr("43.143.161.59:9876"); //设置订阅的主题 consumer.subscribe("helloTopic","*"); //设置消息的监听器 consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg:list){ String s = new String(msg.getBody(), Charset.defaultCharset()); System.out.println("消息消费时间:"+new Date()+",消息的内容:"+s); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }
生产者类:
public class Producer { public static void main(String[] args) throws Exception { //定义一个生产者对象 DefaultMQProducer producer = new DefaultMQProducer("helloGroup"); //连接nameServer producer.setNamesrvAddr("43.143.161.59:9876"); //启动生产者 producer.start(); //设置消息发送的目的地 String topic = "helloTopic"; //发送消息 Message msg = new Message(topic,("延时消息,发送时间:"+new Date()).getBytes(Charset.defaultCharset())); //设置消息延时级别 msg.setDelayTimeLevel(3); producer.sendOneway(msg); System.out.println("消息发送完毕."); TimeUnit.SECONDS.sleep(5); //关闭资源 producer.shutdown(); } }
RocketMQ-消息过滤
Tag标签过滤
用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效。
生产者类:
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("tagProduceGroup"); producer.setNamesrvAddr("43.143.161.59:9876"); producer.start(); String topic = "tagFilterTopic"; Message msg1 = new Message(topic,"TagA",("消息A").getBytes(Charset.defaultCharset())); Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset())); Message msg3 = new Message(topic,"TagC",("消息C").getBytes(Charset.defaultCharset())); producer.sendOneway(msg1); producer.sendOneway(msg2); producer.sendOneway(msg3); System.out.println("消息发送完毕."); TimeUnit.SECONDS.sleep(5); producer.shutdown(); } }
消费者类:
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup"); consumer.setNamesrvAddr("43.143.161.59:9876"); consumer.subscribe("tagFilterTopic","TagA || TagC"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg:list){ String s = new String(msg.getBody(), Charset.defaultCharset()); System.out.println("消息的内容:"+s); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }
运行结果
SQL标签过滤
可以过滤内容,像写where一样
生产者类:
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("sqlProduceGroup"); producer.setNamesrvAddr("43.143.161.59:9876"); producer.start(); String topic = "sqlFilterTopic"; Message msg1 = new Message(topic,("美女A,年龄22,体重45").getBytes(Charset.defaultCharset())); msg1.putUserProperty("age","22"); msg1.putUserProperty("weight","45"); Message msg2 = new Message(topic,("美女B,年龄25,体重60").getBytes(Charset.defaultCharset())); msg2.putUserProperty("age","25"); msg2.putUserProperty("weight","60"); Message msg3 = new Message(topic,("美女C,年龄40,体重70").getBytes(Charset.defaultCharset())); msg3.putUserProperty("age","40"); msg3.putUserProperty("weight","70"); producer.sendOneway(msg1); producer.sendOneway(msg2); producer.sendOneway(msg3); System.out.println("消息发送完毕."); TimeUnit.SECONDS.sleep(5); producer.shutdown(); } }
消费者类:
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup"); consumer.setNamesrvAddr("43.143.161.59:9876"); consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age>23 and weight>60")); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg:list){ String s = new String(msg.getBody(), Charset.defaultCharset()); System.out.println("消息的内容:"+s); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }
运行结果:
原因是因为默认是不支持sql过滤的,需要更改配置文件之后重启broker服务
在文件最后一行添加enablePropertyFilter=true即可
随后重新启动broker服务
之后运行结果
管控台搜索问题
为什么有时候管控台的消息都没有显示收到此消息,但消费者却能消费?
因为时间问题,因为我们的rocketmq是部署在虚拟机上的,当我们虚拟机和windows时间是同步的时候,消息是没有问题的,控制台显示时间内上下波动一小时的消息,但当虚拟机关掉的时候,时间是不动的,windows的时间却因为电脑里面的一个物理小电池,时间还在正常运行,两者时间不同步了,造成我们发消息是虚拟机的时间,控制台显示的是windows的时间,但消费因为并没有按照时间过滤,所以还是可以接收的到,把时间改一下又可以看到消息了