【消息中间件】异常和死信消息们的浪浪山 1

简介: 【消息中间件】异常和死信消息们的浪浪山

1.springboot整合RabbitMQ

1.1springboot整合生产者

新建项目rabbitmqdemo02,新建模块producer-springboot

2a4f9f3c4b3d4814a801029dd55f1665.png

修改改模块的pom.xml,引入依赖

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

配置下application.yml

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port: 5672

启动类com.wangzhou.ProducerApplication。

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

按如下结构新建配置类RabbitMQConfig

203250db80dc43fcaf61e9c5e63c2799.png

编写配置类。

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "boot_queue";
    public static final String EXCHANGE_NAME = "boot_exchange";
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue, @Qualifier("bootExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

编写测试类。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    // 1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.haha", "boot mq haha~~~~");
    }
}

运行,rabbitmq管控台就有我们创建的队列了。

808c12d159fa4c43975d42b31be572ae.png

点进去还可以看到具体的消息详情。


38324c0ccdd84a519708c95fe5d3f84d.png

1.2 springboot整合消费者

步骤与生产者极其类似。


d9cc4b280ada4feb985d27aedd142232.png

创建工程consumer-springboot

编写pom.xml。引入依赖。

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

编写yml配置

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port: 5672

新建主启动类。

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

新建监听类。

@Component
public class RabbitMQListener {
    @RabbitListener(queues = "boot_queue")
    public void ListenQueue(Message message) {
        System.out.println(message);
    }
}

启动类运行。完美!

164be4235ed04fbfb36e10461d8c6fe8.png

小结下。

42d319eb834a490b99261bc123f1e0f9.png

2.异常消息的浪浪山

2.1 消息可靠性问题

62dd2915bfed43388305d9befbb9d4ca.png

a6bedeb474dd4fd1bd9f0a1954d1afcd.png


上面问题的答案是:发送时丢失(未到交换机或者到交换机未到队列),MQ丢失,消费者丢失。

针对这些可能性,我们将介绍如下高级特性。


08b4744741114dc8832a76c44faee0ab.png

基于这些问题,我们需要进一步学习MQ的一些高级特性。

2.2 生产者确认机制


7db875eba113474d9701ef182bcda685.png


2.2.1 初始化代码

新建一个工程,mq-advanced-demo。项目的架构如下图。


6a0f7850abf94571bdcffae6a305a4b2.png

2.2.2 实现生产者确认

(1)生产者配置

678de8a4208b4ab5a90f8631271886c9.png

完整代码。

logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug
spring:
  rabbitmq:
    host: localhost # rabbitMQ的ip地址
    port: 5672 # 端口
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

(2)实现ReturnCallback回调

当消息到交换器,但是路由过程中出现问题,通过ReturnCallback回调。

06ede1187b73423f94ecd0cb5772fc5c.png每个RabbitTemplate只能配置一个ReturnCallback,而RabbitTemplate是由spring容器创建的,是单例实例。因此ReturnCallback必须在全局进行配置,即在项目启动过程进行配置。


因此,CommonConfig实现了ApplicationContextAware接口。我们知道,Aware是通知接口,而ApplicationContext是一个bean容器,管理spring项目中的bean。因此,实现了ApplicationContextAware接口即意味着可以在项目启动所有bean(当然包括rabbitTemplate)加载以后调用回调,获取rabbitTemplate,设置全局的ReturnCallback。具体细节可以看接口的方法实现setApplicationContext。


代码如下。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
202 0
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
|
消息中间件 Docker 容器
【消息中间件】异常和死信消息们的浪浪山 3
【消息中间件】异常和死信消息们的浪浪山
|
消息中间件 Java 测试技术
【消息中间件】异常和死信消息们的浪浪山 2
【消息中间件】异常和死信消息们的浪浪山
|
消息中间件 存储 NoSQL
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
614 0
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
|
3月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
81 0
|
2月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
152 0
|
1月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
166 3
|
1天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
1天前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】