RabbitMQ发布确认高级

简介: RabbitMQ发布确认高级

八、发布确认高级


correlated

adj. 有相互关系的

v. (使)相关联;(使)相互对照(correlate 的过去分词)

如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?


8.1 发布确认 springboot 版本


8.1.1确认机制方案


image.png


8.1.2 代码架构图


image.png


8.1.3 配置文件


在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated

  • NONE
    禁用发布确认模式,是默认值
  • CORRELATED
    发布消息成功到交换器后会触发回调方法
  • SIMPLE
    经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法
    其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
    waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker


spring:
  rabbitmq:
    host: 192.168.42.96
    port: 5672
    username: admin
    password: 123
#    开启了交换机回调
    publisher-confirm-type: correlated
#    开启回退消息
    publisher-returns: true


8.1.4 配置类代码


package com.caq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
    //    交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange_name";
    //    队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //    routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    //交换机创建通过new的形式,队列的创建通过对象的方法QueueBuilder.durable
    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                                      @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}


8.1.5 消息生产者


package com.caq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
    //    交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange_name";
    //    队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //    routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    //交换机创建通过new的形式,队列的创建通过对象的方法QueueBuilder.durable
    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                                      @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}


8.1.6 回调接口


交换机确认回调方法 参数介绍:

1、发消息  交换机接收到了 回调

CorrelationData 保存回调消息的ID及相关信息

交换机收到消息  ack = true

cause null

2、发消息  交换机没有接收到 回调

CorrelationData 保存回调消息的ID及相关信息

交换机收到消息 ack = false

cause 失败的原因

correlation 相互关系


package com.caq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MycallBack implements RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //三元运算
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到了Id为:{}的信息", id);
        } else {
            log.info("交换机还未收到了Id为:{}的信息,由于原因:{}", id, cause);
        }
    }
}


8.1.7 消息消费者


package com.caq.consumer;
import com.caq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class Consumer {
    //接收消息
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接受到的队列confirm,queue消息{}",msg);
    }
}

正常情况:发送消息到交换机

image.png

如果交换机收不到消息呢?

怎么回调呢?如果消息发不出去,那就给我返回过来然后保存下来

image.png

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 Java
RabbitMQ之发布确认高级
【1月更文挑战第10天】 在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:
229 9
|
6月前
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题
【RabbitMQ高级篇】消息可靠性问题
162 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ——高级篇
RabbitMQ——高级篇
56 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ高级
RabbitMQ高级
44 0
|
6月前
|
消息中间件 存储 Java
【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级
【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级
|
消息中间件 存储 负载均衡
工作八年?是高级开发?竟然答不出:如何保证RabbitMQ的高可用?
一个8年工作经验的小伙伴,被问到这样一个问题,说如何保证RabbitMQ的高可用。关于这个问题呢,这位小伙伴倒是有个实操经验,就是不知道如何组织语言。所以,当时面试结果不太理想。今天,我给大家分享一下我的理解。
152 0
|
消息中间件 Java 测试技术
【RabbitMQ高级篇】消息可靠性问题(1)(下)
【RabbitMQ高级篇】消息可靠性问题(1)(下)
122 0
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题(1)(上)
【RabbitMQ高级篇】消息可靠性问题(1)
100 0
|
存储 消息中间件
十二、RabbitMQ高级 - 惰性队列
十二、RabbitMQ高级 - 惰性队列
|
消息中间件
十一、RabbitMQ高级 - 延迟队列
十一、RabbitMQ高级 - 延迟队列