【消息中间件】异常和死信消息们的浪浪山 1

简介: 【消息中间件】异常和死信消息们的浪浪山

1.springboot整合RabbitMQ

1.1springboot整合生产者

新建项目rabbitmqdemo02,新建模块producer-springboot

2a4f9f3c4b3d4814a801029dd55f1665.png

修改改模块的pom.xml,引入依赖

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

配置下application.yml

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port: 5672

启动类com.wangzhou.ProducerApplication。

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

按如下结构新建配置类RabbitMQConfig

203250db80dc43fcaf61e9c5e63c2799.png

编写配置类。

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "boot_queue";
    public static final String EXCHANGE_NAME = "boot_exchange";
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue, @Qualifier("bootExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

编写测试类。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    // 1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.haha", "boot mq haha~~~~");
    }
}

运行,rabbitmq管控台就有我们创建的队列了。

808c12d159fa4c43975d42b31be572ae.png

点进去还可以看到具体的消息详情。


38324c0ccdd84a519708c95fe5d3f84d.png

1.2 springboot整合消费者

步骤与生产者极其类似。


d9cc4b280ada4feb985d27aedd142232.png

创建工程consumer-springboot

编写pom.xml。引入依赖。

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

编写yml配置

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port: 5672

新建主启动类。

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

新建监听类。

@Component
public class RabbitMQListener {
    @RabbitListener(queues = "boot_queue")
    public void ListenQueue(Message message) {
        System.out.println(message);
    }
}

启动类运行。完美!

164be4235ed04fbfb36e10461d8c6fe8.png

小结下。

42d319eb834a490b99261bc123f1e0f9.png

2.异常消息的浪浪山

2.1 消息可靠性问题

62dd2915bfed43388305d9befbb9d4ca.png

a6bedeb474dd4fd1bd9f0a1954d1afcd.png


上面问题的答案是:发送时丢失(未到交换机或者到交换机未到队列),MQ丢失,消费者丢失。

针对这些可能性,我们将介绍如下高级特性。


08b4744741114dc8832a76c44faee0ab.png

基于这些问题,我们需要进一步学习MQ的一些高级特性。

2.2 生产者确认机制


7db875eba113474d9701ef182bcda685.png


2.2.1 初始化代码

新建一个工程,mq-advanced-demo。项目的架构如下图。


6a0f7850abf94571bdcffae6a305a4b2.png

2.2.2 实现生产者确认

(1)生产者配置

678de8a4208b4ab5a90f8631271886c9.png

完整代码。

logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug
spring:
  rabbitmq:
    host: localhost # rabbitMQ的ip地址
    port: 5672 # 端口
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

(2)实现ReturnCallback回调

当消息到交换器,但是路由过程中出现问题,通过ReturnCallback回调。

06ede1187b73423f94ecd0cb5772fc5c.png每个RabbitTemplate只能配置一个ReturnCallback,而RabbitTemplate是由spring容器创建的,是单例实例。因此ReturnCallback必须在全局进行配置,即在项目启动过程进行配置。


因此,CommonConfig实现了ApplicationContextAware接口。我们知道,Aware是通知接口,而ApplicationContext是一个bean容器,管理spring项目中的bean。因此,实现了ApplicationContextAware接口即意味着可以在项目启动所有bean(当然包括rabbitTemplate)加载以后调用回调,获取rabbitTemplate,设置全局的ReturnCallback。具体细节可以看接口的方法实现setApplicationContext。


代码如下。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
12月前
|
消息中间件 Docker 容器
【消息中间件】异常和死信消息们的浪浪山 3
【消息中间件】异常和死信消息们的浪浪山
|
12月前
|
消息中间件 Java 测试技术
【消息中间件】异常和死信消息们的浪浪山 2
【消息中间件】异常和死信消息们的浪浪山
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
181 1
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
|
消息中间件 存储 NoSQL
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
551 0
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
|
5月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
9月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
10月前
|
消息中间件 安全 Java
全网首发!消息中间件神仙笔记,涵盖阿里十年技术精髓
消息中间件是分布式系统中的重要组件,在实际工作中常用消息中间件进行系统间数据交换,从而解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。