《微服务实战》 第十五章 RabbitMQ 延迟队列

简介: 《微服务实战》 第十五章 RabbitMQ 延迟队列

前言

实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

1、RabbitMQ延迟队列

1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

  • TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
  • Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
  • Dead Letter Exchanges(DLX),即死信交换机
  • Dead Letter Routing Key (DLK),死信路由键
/***********************延迟队列*************************/
//创建立即消费队列
@Bean
public Queue immediateQueue(){
    return new Queue("immediateQueue");
}
//创建立即消费交换机
@Bean
public DirectExchange immediateExchange(){
    return new DirectExchange("immediateExchange");
}
@Bean
public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");
}
//创建延迟队列
@Bean
public Queue delayQueue(){
    Map<String,Object> params = new HashMap<>();
    //死信队列转发的死信转发到立即处理信息的交换机
    params.put("x-dead-letter-exchange","immediateExchange");
    //死信转化携带的routing-key
    params.put("x-dead-letter-routing-key","immediateRoutingKey");
    //设置消息过期时间,单位:毫秒
    params.put("x-message-ttl",60 * 1000);
    return new Queue("delayQueue",true,false,false,params);
}
@Bean
public DirectExchange delayExchange(){
    return new DirectExchange("delayExchange");
}
@Bean
public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");
}
@Test
public void sendDelay(){
    this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");
}

1.2、方式二:安装延迟队列插件

1.2.1、安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

/**************延迟队列一个单一queue*******************/
@Bean
public Queue delayNewQueue(){
    return new Queue("delayNewQueue");
}
@Bean
public CustomExchange delayNewExchange(){
    Map<String, Object> args = new HashMap<>();
    // 设置类型,可以为fanout、direct、topic
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args);
}
@Bean
public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){
    return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();
}
@Test
public void sendDelay() {
    //生产端写完了
    UserInfo userInfo = new UserInfo();
    userInfo.setPassword("13432432");
    userInfo.setUserAccount("tiger");
    this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo
            , a -> {
                //单位毫秒
                a.getMessageProperties().setDelay(30000);
                return a;
            });
}

2、消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。

生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。

消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

2.1、生产确认

@Bean
public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    //消息发送到交换器Exchange后触发回调
    template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //  可以进行消息入库操作
            log.info("消息唯一标识 correlationData = {}", correlationData);
            log.info("确认结果 ack = {}", ack);
            log.info("失败原因 cause = {}", cause);
        }
    });
    // 配置这个,下面的ReturnCallback 才会起作用
    template.setMandatory(true);
    // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            //  可以进行消息入库操作
            log.info("消息主体 message = {}", returnedMessage.getMessage());
            log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());
            log.info("回复描述 replyText = {}", returnedMessage.getReplyText());
            log.info("交换机名字 exchange = {}", returnedMessage.getExchange());
            log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());
        }
    });
    return template;
}
spring:
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
  application:
    name: drp-user-service  #微服务名称
  datasource:
    username: root
    password: root
    url: jdbc:mysql://127.0.0.1:3306/drp
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: tiger
    password: tiger
    virtual-host: tiger_vh
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual # 开启消息消费手动确认
        retry:
          enabled: true

2.2、消费确认

@RabbitHandler
public void process(UserInfo data, Message message, Channel channel){
    log.info("收到directQueue队列信息:" + data);
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        //成功消费确认
        channel.basicAck(deliveryTag,true);
        log.info("消费成功确认完毕。。。。。");
    } catch (IOException e) {
        log.error("确认消息时抛出异常 ", e);        
        // 重新确认,成功确认消息
        try {
            Thread.sleep(50);
            channel.basicAck(deliveryTag, true);
        } catch (IOException | InterruptedException e1) {
            log.error("确认消息时抛出异常 ", e);
            // 可以考虑入库
        }
    }catch (Exception e){
        log.error("业务处理失败", e);
        try {
            // 失败确认
            channel.basicNack(deliveryTag, false, false);
        } catch (IOException e1) {
            log.error("消息失败确认失败", e1);
        }
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
62
分享
相关文章
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
27天前
|
微服务——MongoDB实战演练——需求分析
本文档《5-MongoDB实战演练》聚焦于某头条文章评论业务的需求分析与功能实现。基于MongoDB,需完成以下功能:1)提供基本的增删改查API;2)支持通过文章ID查询相关评论;3)实现评论点赞功能。结合实际业务场景,演示MongoDB在数据存储与操作中的应用,附带示意图帮助理解业务结构。
24 2
微服务——MongoDB实战演练——需求分析
微服务——MongoDB实战演练——文章评论的基本增删改查
本节介绍了文章评论的基本增删改查功能实现。首先,在`cn.itcast.article.dao`包下创建数据访问接口`CommentRepository`,继承`MongoRepository`以支持MongoDB操作。接着,在`cn.itcast.article.service`包下创建业务逻辑类`CommentService`,通过注入`CommentRepository`实现保存、更新、删除及查询评论的功能。最后,新建Junit测试类`CommentServiceTest`,对保存和查询功能进行测试,并展示测试结果截图,验证功能的正确性。
34 2
|
27天前
|
微服务——MongoDB实战演练——文章评论实体类的编写
本节主要介绍文章评论实体类的编写,创建了包`cn.itcast.article.po`用于存放实体类。具体实现中,`Comment`类通过`@Document`注解映射到MongoDB的`comment`集合,包含主键、内容、发布时间、用户ID、昵称等属性,并通过`@Indexed`和`@CompoundIndex`注解添加单字段及复合索引,以提升查询效率。同时提供了Mongo命令示例,便于理解和操作。
33 2
微服务——MongoDB实战演练——MongoTemplate实现评论点赞
本节介绍如何使用MongoTemplate实现评论点赞功能。传统方法通过查询整个文档并更新所有字段,效率较低。为优化性能,采用MongoTemplate对特定字段直接操作。代码中展示了如何利用`Query`和`Update`对象构建更新逻辑,通过`update.inc(&quot;likenum&quot;)`实现点赞数递增。测试用例验证了功能的正确性,确保点赞数成功加1。
25 0
微服务——MongoDB实战演练——根据上级ID查询文章评论的分页列表
本节介绍如何根据上级ID查询文章评论的分页列表,主要包括以下内容:(1)在CommentRepository中新增`findByParentid`方法,用于按父ID查询子评论分页列表;(2)在CommentService中新增`findCommentListPageByParentid`方法,封装分页逻辑;(3)提供JUnit测试用例,验证功能正确性;(4)使用Compass插入测试数据并执行测试,展示查询结果。通过这些步骤,实现对评论的高效分页查询。
31 0
微服务——MongoDB实战演练——文章微服务模块搭建
本节介绍文章微服务模块的搭建过程,主要包括以下步骤:(1)创建项目工程 *article*,并在 *pom.xml* 中引入依赖;(2)配置 *application.yml* 文件;(3)创建启动类 *cn.itcast.article.ArticleApplication*;(4)启动项目,确保控制台无错误提示。通过以上步骤,完成文章微服务模块的基础构建与验证。
26 0
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署

热门文章

最新文章