【消息队列】消息中间件RabbitMQ急速入门2

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 【消息队列】消息中间件RabbitMQ急速入门

4.SpringBoot整合Spring-AMPQ

4.1.什么是Spring-AMQP

Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等.
提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发.
总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter.

4.2.引入AMQP-starter依赖

<!-- 代码库 -->
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
 <!--引入AMQP-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.3.SpringBoot整合RabbitMQ编码

yml配置文件修改

#消息队列
spring: 
  rabbitmq:
    host: 10.211.63.14
    port: 5672
    virtual-host: /dev
    password: 123456
    username: admin

RabbitMQConfig配置类

@Configuration
public class RabbitMQConfig{
    public static final String EXCHANGE_NAME = "exchange_order";
    public static final String QUEUE = "order_queue";
    /**
     * 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(ture).build();
    }
    /**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }
    /**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue,Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

生产者发送消息

rabbitTemplate.converAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单");

消费者监听消息

@Component
@RabbitMQListener(queue = "order_queue")
public class OrderMQListener{
    /**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void messageHandler(String body,Message message){
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());
    }
}

5.RabbitMQ的消息可靠性投递

5.1.什么是RabbitMQ的消息可靠性投递

  • 保证mq节点成功接收到消息,消息发送端需要接收到mq服务端接受到消息的确认应答,完善消息补偿机制,发送失败的消息可以二次感知,并进行二次处理。

5.2.RabbitMQ消息投递路径

  • 生产者->交换机->队列->消费者

5.3.通过两个点控制消息的可靠性投递

  • 生产者到交换机:confirmCallback
  • 交换机到队列:returnCallback

**注意:**开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,rabbitmq整体的新跟那个效率会变低,吞吐量下降严重,不是非常重要的消息不建议开启消息确认机制

5.4.RabbitMQ消费可靠性投递confirmCallback实战

生产者到交换机

通过confirmCallback

生产者投递消息后,如果Broker收到消费后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种法师是方式可靠性投递的核心。

开启confirmCallback

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换机Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值事发布消息成功到交换机后触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated

编码实战

核心API:setConfirmCallback() 、confirm(配置,是否接到消息,失败的原因)

@Test
    void testConfirmCallback(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 配置
             * @param ack 交换机是否收到消息,true是成功,false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback======>");
                System.out.println("correlationData======>"+correlationData);
                System.out.println("ack======>"+ack);
                System.out.println("cause======>"+cause);
                if(ack){
                    System.out.println("发送成功");
                    //更新数据库的状态,状态为成功
                }else {
                    System.out.println("发送失败,记录到日志或者数据库");
                    //更新数据库的状态,状态为失败
                }
            }
        });
        //数据库新增一个消息记录,状态是发送,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
    }

**模拟异常:**修改交换机名称

5.5.RabbitMQ消费可靠性投递returnCallback实战

交换机到队列

  • 通过returnCallback
  • 消息从交换机发送到对应的队列失败时,触发
  • 两种模式:
  • 交换机到队列不成功,则丢弃消息(默认)
  • 交换机到队列不成功,发挥生产者,触发returnCallback
#配置为true,则交换机处理消息到路由失败,会返回给生产者
spring.rabbitmq.template.mandatory=true
#或者在temlate对象上设置
template.setMandatory(true);

第一步:开启returnCallback配置

#新版
spring.rabbitmq.publisher-returns=true

第二步:修改交换机投递到队列失败的策略

#为true,则交换机处理消息到路由失败会返回给生产者
spring.rabbitmq.template.mandatory=true

编码实战

    @Test
    void testReturnCallback(){
    //publisher-returns为true则交换机处理消息到路由失败,返回给生产者
        //mandatory为true则消息未被路由到任何一个queue,则回退一条消息给生产者
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("ReturnedMessage:" + returned.toString());
                int replyCode = returned.getReplyCode();
                System.out.println("_______________________");
                System.out.println("replyCode:" + replyCode);
            }
        });
        //数据库新增一个消息记录,状态是发送,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
    }
}

5.6.Rabbitmq的消息确机制ACK讲解

背景:消费者从broker中监听消息,需要确保消息被合理的处理掉

RabbitMQ的ACK介绍

  • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,按摩就不会有ACK反馈,RabbitMQ回认为i这个消息没有正常消费,会将消息重新放入队列中。
  • 只有当消费者正确的发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如违背进行ACK消息确认机制,这条消息将被锁定Unacked

确认方式

  • 自动确认(默认)
  • 手动确认(manual)
spring:
  rabbitmq: 
    #开启手动确认消息,如果消息重新入队,进行重试
    listener: 
      simple: 
        acknowledge-mode: manual

5.7.编码实战

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
    /**
     * 处理器,适配器,加上@RabbitHandler注解
     * 加上Channel这个参数
     */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        System.out.println("message:"+message.toString());
        System.out.println("==============");
        System.out.println("消息标识tag:"+tag);
        System.out.println("消息体body:"+body);
        //第一个参数是该消息的index,第二个是是否批量操作
        channel.basicAck(tag,false);
        //第一个参数是index,第二个是否批量,第三个是失败后是否重新返回给生产者重新投递
        //channel.basicNack(msgTag,false,true);
    }
}

deliveryTage介绍

  • 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

basicNack和basicReject介绍

  • basicReject一次只能拒绝接收一个消息,可以设置是否requeue
  • basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
26天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
26天前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
26天前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
26天前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
49 5
|
22天前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
25 0
分享一下rocketmq入门小知识
|
1月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
70 1
|
1月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
118 2
|
2月前
|
消息中间件
云消息队列RabbitMQ版入门训练营 打卡领好礼
云消息队列RabbitMQ版入门训练营 打卡领好礼
36 3
|
30天前
|
消息中间件 网络架构
RabbitMQ消息队列常见面试题
这篇文章总结了RabbitMQ的常见面试题,涵盖了消息模型、使用场景、实现功能、消息幂等性、顺序性、堆积和丢失的避免方法,以及推模式和拉模式的区别。
42 0