开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):延迟消息】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12384
延迟消息
内容介绍
一、延时时间的级别
二、延时时间使用限制
三、代码测试
四、验证延时时间
一、延时时间的级别
1、代码实现:
public class scheduiedMessageProducer {
public static void main(string[]args) throws Exception {
//实例化一个生产者来产生延时消息
DefaultMQproducer producer = new DefaultMgProducer(“ExampleProducerGroup”);
//启动生产者
producer.start():
int totalMessagesTosend=100:
for (int i=0; i < tota1MessagesTosend;i++){
Message message =new Message("TestTopic".("Hello scheduled message "+ i).getsytes());
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看 delayTimeLeve1)
message. setDe1ayTimeLeve1(3);
//发送消息
producer.send(message);
)
// 关闭生产者
producer.shutdown();
}
}
2、含义
这个延时消息那么延迟消息顾名思义就是我们这个给消息的队列当中发一个消息,那么这个消息,他并没有立即的被消息的消费者所去消费,而是延迟一段时间才能
被消费。
3、级别
这个延迟时间怎么去决定呢?就是你再去发送消息的时候呢可以通过消息的setDelayTimeLevel 这么一个级别去决定当前延迟的时间,那么这个级别是什么,目前RocketMQ仅仅只是支持“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”这些级别延迟的时间,并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。
二、延时时间使用限制
//org/apache/rocletmq/store/config/messagestoreconfig.java
Private string messagedelaylevel=
’’
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;
延迟时间,它是已经约定好的,我们不能自己去写,只能去选择,要去延迟多久?
比如说,如果 level 是1的话呢,那么就延迟1秒, level 是2的话呢,就延迟五秒。以此类推,最多延迟两小时。这就是延迟的一个设置。其实对于消费者来讲的并没有什么区别,本来怎么消费,还怎么消费,但是消费时机稍微的拖后了一点。
三、代码测试
1、书写producer
在 delay 中添加 Consumer 和 Producer,
public static void main(string[] args) {
//1.创建消息生产者 producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1")
;
//2.指定 Nameserver.地址producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876");
//3.启动producer
producer.start();
for (int i
=0
; i
<
10; i++) {
//4.创建消息对象,指定主售 Topic、Tag 和消息体
/**
*参数一,消息主心 Topic
*参数二:消息 Tag
*参数三:消息内容
*/
Message msg = new Message( topic: "base", ta
q
s: "Tag1",(“Hello World"+i).getBytes());
//5.发送消息
SendResult result=producer.send(msg);
//发送状态
Sendstatus status = result.getsendstatus(); System.out.println
(“发送结果:"+result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
)
//6.关闭生产者 producer
producer,shutdown();
}
将前面 SynProducer 中的内容粘贴至新的 Producer 中,接着用 send 来发,发送之前设置一个 msg.setDelayTimeLevel,再按照设置的级别,举例延迟5s,则写2。发送后,将 basic 改为 DelayTopic,这边 Producer 就写完了。
2、书写consumer
开始写消费者 Consumer:也是将前面的复制粘贴过来,然后同样的,消费的 topic 改为 DelayTopic,然后删去设定消费模式,打印这边也做一些修改:
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2、指定 Nameserver 地址consumer,setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
//3.订阅主题 Topic 利 Tag
consumer.subscribe("base","*");
//设定消费模式:负载均衡]广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调的数,处理消息consumer.registerMessageListener(newMessageListenerConcurrently(){
//接受消息内容
Override
publicConsumeConcurrentlystatus consumeMessage(Listmsgs, ConsumeConcurrentlyC
on
for (MessageExt msg : msgs) {
System.out.println("consumeThread="+ Thread.currentThread().getName() +","+ new Sti
}
return ConsumeConcurrentlystatus.CONSUME_SUCCESS;
}
});
//5.启动消费者 consumer
consumer.start();
然后打印一句话消费者启动,就可以开始测试了。
c:/develop/java/jdk.8.0_161/bin/java.exe...
消费者启动
先启动消费者,然后启动 producer,它会给 MQ 发送消息,消费者并没有立即能去消费,它要等到延迟时间到了之后才能去消费,这边可以看到消费的延迟时间,看起来这个延迟时间和设置的延迟时间不是很准,显示着十几秒,就是消息的发送和消息的消费时通过网络进行传输,也是有一系列网络时间的消耗,所以有一点差异也是很正常的。
四、验证延迟时间
现在如何去证明是设定延迟时间的作用还是网络延迟造成延迟的效果,很简单,也就是把设定延迟时间删除,再重新启动,可以看到发出后马上被消费。