RocketMQ第五章:手把手教老婆实现-延时消息18禁

简介: RocketMQ第五章:手把手教老婆实现-延时消息18禁

image.png

RocketMQ使用教程相关系列 目录


目录


第一节:介绍


第二节:延时消息-生产者和消息者步骤说明


延时消息生产者代码实现步骤


延时消息消费者代码实现步骤


第三节:延时消息生产者


效果:


第四节:延时消息消费者


效果:


第一节:介绍

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。


它的实现和普通消息的生产者,消费者基本一样,多了一个设置延迟级别。


message.setDelayTimeLevel()

现在RocketMq(开源的版本,白嫖的,知足吧)并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18


注:阿里云收费版本支持任意时间延时

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

第二节:延时消息-生产者和消息者步骤说明

延时消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名


2.指定Nameserver地址


3.启动producer


4.创建消息对象,指定主题Topic、Tag和消息体,设置延时级别


5.发送消息


6.关闭生产者producer


延时消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名


2.指定Nameserver地址


3.订阅主题Topic和Tag


4.设置回调函数,处理消息


5.启动消费者consumer


注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:延时消息生产者

public class Producer {
   public static void main(String[] args)
         throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_delay_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      for (int i = 0; i < 10; i++) {
         // 4.创建消息对象,指定主题Topic、Tag和消息体
         /**
          * 参数一:消息主题Topic
          * 参数二:消息Tag
          * 参数三:消息内容
          */
         Message msg = new Message("DelayTopic", "Tag1", ("Hello 虚竹" + i).getBytes());
         // 设定延迟时间 10s
         msg.setDelayTimeLevel(3);
         // 5.发送消息
         SendResult result = producer.send(msg);
         // 发送状态
         SendStatus status = result.getSendStatus();
         System.out.println("发送结果:" + result);
         // 线程睡1秒
         TimeUnit.SECONDS.sleep(1);
      }
      // 6.关闭生产者producer
      producer.shutdown();
   }
}

image.png

public class Consumer {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_delay_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 3.订阅主题Topic和Tag
      consumer.subscribe("DelayTopic", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               System.out.println("消息ID:【" + msg.getMsgId() + "】,消息内容:" + new String(msg.getBody()) + ",延迟时间:"
                     + (System.currentTimeMillis() - msg.getBornTimestamp()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
      System.out.println("消费者启动");
   }
}

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
792 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
19天前
|
消息中间件 中间件 Kafka
MQ四兄弟:如何实现延时消息
本文介绍了几种常见的消息队列系统(RabbitMQ、RocketMQ、Kafka和Pulsar)实现延时消息的方式。RabbitMQ通过死信队列或延时插件实现;RocketMQ内置延时消息支持,可通过设置`delayTimeLevel`属性实现;Kafka不直接支持延时消息,但可以通过时间戳、延时Topic、Kafka Streams等方法间接实现;Pulsar自带延时消息功能,提供`deliverAfter`和`deliverAt`两种方式。每种方案各有优劣,适用于不同的应用场景。
44 0
|
7月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
83 0
|
消息中间件 运维 算法
RabbitMQ高阶使用延时任务
RabbitMQ高阶使用延时任务
181 0
|
消息中间件 SQL 存储
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
145 0
|
消息中间件 Kafka
MQ 学习日志(八) 消息队列的延时以及过期失效问题处理
消息队列的延时以及过期失效问题处理
331 0
|
消息中间件 Shell RocketMQ
RocketMQ进阶-延时消息
RocketMQ进阶-延时消息
1130 0
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
1132 1
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
343 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
下一篇
开通oss服务