【微服务系列笔记】MQ消息可靠性

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。

3. 消息可靠性

消息丢失原因?

  • 发送时丢失:
  • 生产者发送的消息未送达exchange
  • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

3.1. 生产者确认机制

三种方式:

  1. 发送成功,返回ack publish-confirm
  • 调用ConfirmCallback,进行打印“消息发送成功, ID:{}”
  1. 发送到交换机,但没有到达队列,返回ack publish-return
  • 调用ConfirmCallback,进行打印“消息发送成功, ID:{}”
  • 调用ReturnCallback,进行日志打印“消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}”,还可进行消息重发。
  1. 没有发送成功,返回 nack publish-confirm
  • 调用ConfirmCallback,进行打印“消息发送失败, ID:{}, 原因{}”

ConfirmCallback和ReturnCallback区别?

ConfirmCallback(发布确认回调):处理返回确认消息

ReturnCallback(发布返回回调):处理返回消息

回调函数理解,由于发送消息对于返回消息是异步回调


实现方式:修改配置

publish-confirm-type:开启publisher-confirm,这里支持两种类型:

  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallback

publish-returns:回调机制,定义ReturnCallback

template.mandatory:定义消息路由失败时的策略。

  • true,则调用ReturnCallback
  • false:则直接丢弃消息
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息体
    String message = "hello!";
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            if (result.isAck()) {
                log.debug("消息投递到交换机成功, ID:{}", cd.getId());
            } else {
                log.error("消息投递到交换机失败,消息ID:{},原因:{}", cd.getId(), cd.getReturnedMessage());
            }
        }
        @Override
        public void onFailure(Throwable ex) {
            log.error("消息发送异常,ID:{},原因:{}", cd.getId(), ex.getMessage());
            rabbitTemplate.convertAndSend("amq.topic", routingKey, message, cd);
        }
    });
    //     cd.getFuture().addCallback(result -> {
    //         if (result.isAck()) {
    //             log.debug("消息投递到交换机成功, ID:{}", cd.getId());
    //         } else {
    //             log.error("消息投递到交换机失败,消息ID:{},原因:{}", cd.getId(), cd.getReturnedMessage());
    //         }
    //     }, ex -> log.error("消息发送异常,ID:{},原因:{}", cd.getId(), ex.getMessage()));
    // 4.发送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
    // 休眠一会儿,等待ack回执
    Thread.sleep(2000);
}
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}

3.2. 消息持久化

交换机持久化,队列持久化,消息持久化

默认情况下三者都是持久化的,记住关键字durable,在MQ上看到属性feature带D

3.3. 消费者确认机制

manual

手动ack,需要在业务代码结束后,调用api发送ack。

none

关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

消费者接收消息后,消费者异常,消息依然被RabbitMQ删除了。

auto

自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

消费者接收消息后,消费者异常,消息会重入队,在重新发送给消费者,进行无限循环。

本地重试

  • 消息处理过程中抛出异常,不会重入队,而是在消费者本地重试
  • 重试达到最大次数后,执行失败策略

失败策略

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          multiplier: 1 
          max-attempts: 3 # 最大重试次数
          # true无状态;false有状态。如果业务中包含事务,这里改为false
          stateless: true
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
14天前
|
SQL 关系型数据库 数据库
【微服务系列笔记】Seata
Seata是一种开源的分布式事务解决方案,旨在解决分布式事务管理的挑战。它提供了高性能和高可靠性的分布式事务服务,支持XA、TCC、AT等多种事务模式,并提供了全局唯一的事务ID,以确保事务的一致性和隔离性。Seata还提供了分布式事务的协调、事务日志、事务恢复等功能,帮助开发人员简化分布式事务的管理和实现。
52 1
|
14天前
|
负载均衡 安全 Java
【微服务系列笔记】Gateway
Gateway是Spring Cloud生态系统中的网关服务,作为微服务架构的入口,提供路由、负载均衡、限流、鉴权等功能。借助于过滤器和路由器,Gateway能够动态地管理请求流量,保障系统的安全和性能。
41 7
|
14天前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1000 3
|
14天前
|
Linux Docker 容器
【微服务系列笔记】Docker
docker是一种容器技术,它主要是用来解决软件跨环境迁移的问题和同一环境下依赖冲突问题。 Docker可以运行在Mac, Windows, linux等操作系统上,常用于适用于构建和部署分布式应用、微服务架构。
47 0
【微服务系列笔记】Docker
|
14天前
|
监控 Java 应用服务中间件
【微服务系列笔记】Sentinel入门-微服务保护
Sentinel是一个开源的分布式系统和应用程序的运维监控平台。它提供了实时数据收集、可视化、告警和自动化响应等功能,帮助用户监控和管理复杂的IT环境。本文简单介绍了微服务保护以及常见雪崩问题,解决方案。以及利用sentinel进行入门案例。
40 3
|
14天前
|
负载均衡 Java Apache
【微服务系列笔记】Feign
Feign是一个声明式的伪HTTP客户端,它使得HTTP请求变得更简单。使用Feign,只需要创建一个接口并注解。Feign默认集成了Ribbon,并和Eureka结合,默认实现了负载均衡的效果。 OpenFeign 是SpringCloud在Feign的基础上支持了SpringMVC的注解。
54 8
|
14天前
|
存储 负载均衡 Cloud Native
【微服务系列笔记】Nacos
Nacos 是阿里巴巴开源的项目,用于构建云原生应用的动态服务发现、配置管理和服务管理平台。它支持动态服务发现、服务配置、服务元数据和流量管理,旨在更敏捷和方便地构建、交付和管理微服务平台。可作为注册中心与配置中心。
54 5
|
14天前
|
Nacos 微服务
【微服务系列笔记】Eureka
该文档介绍了微服务注册中心的重要性和流行选项,如Eureka、Nacos、Consul和Zookeeper,强调Eureka是唯一支持跨区域调用的AP系统。接着,它提供了一个Eureka入门案例,包括设置Eureka服务器和客户端的步骤,并展示了多实例部署的效果。最后,简要总结了学习Eureka的意义,并提出了几个思考问题,如Eureka的功能、工作原理以及其他服务发现技术。
39 5
|
14天前
|
负载均衡 算法 应用服务中间件
【微服务系列笔记】负载均衡
本文介绍了负载均衡的概念和重要性,指出随着流量增长,通过垂直扩展和水平扩展来提升系统性能,其中水平扩展引入了负载均衡的需求。负载均衡的目标是将流量分布到多台服务器以提高响应速度和可用性,常见的硬件和软件负载均衡器包括F5、A10、Nginx、HAProxy和LVS等。 文章接着提到了Ribbon,这是一个客户端实现的负载均衡器,用于Spring Cloud中。Ribbon在发起REST请求时进行拦截,根据预设的负载均衡算法(如随机算法)选择服务器,并重构请求URI。文中还介绍了如何通过代码和配置文件两种方式自定义Ribbon的负载均衡策略。
60 3
|
14天前
|
存储 Java 数据库
【微服务系列笔记】微服务概述
本文对比了单体应用和微服务架构。单体应用中所有功能模块在一个工程中,而微服务则按领域模型拆分为独立服务,每个服务有明确边界,可独立开发、部署和扩展。微服务允许使用不同语言和技术栈,每个服务有自己的数据库。微服务架构的优点包括易于开发维护、技术栈开放和错误隔离,但缺点包括增加运维成本、调用链路复杂、分布式事务处理困难以及学习成本高。实现微服务通常涉及SpringCloud等开发框架和Docker等运行平台。
53 2