【消息队列】消息中间件RabbitMQ急速入门2

简介: 【消息队列】消息中间件RabbitMQ急速入门

4.SpringBoot整合Spring-AMPQ

4.1.什么是Spring-AMQP

Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等.
提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发.
总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter.

4.2.引入AMQP-starter依赖

<!-- 代码库 -->
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
 <!--引入AMQP-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.3.SpringBoot整合RabbitMQ编码

yml配置文件修改

#消息队列
spring: 
  rabbitmq:
    host: 10.211.63.14
    port: 5672
    virtual-host: /dev
    password: 123456
    username: admin

RabbitMQConfig配置类

@Configuration
public class RabbitMQConfig{
    public static final String EXCHANGE_NAME = "exchange_order";
    public static final String QUEUE = "order_queue";
    /**
     * 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(ture).build();
    }
    /**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }
    /**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue,Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

生产者发送消息

rabbitTemplate.converAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单");

消费者监听消息

@Component
@RabbitMQListener(queue = "order_queue")
public class OrderMQListener{
    /**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void messageHandler(String body,Message message){
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());
    }
}

5.RabbitMQ的消息可靠性投递

5.1.什么是RabbitMQ的消息可靠性投递

  • 保证mq节点成功接收到消息,消息发送端需要接收到mq服务端接受到消息的确认应答,完善消息补偿机制,发送失败的消息可以二次感知,并进行二次处理。

5.2.RabbitMQ消息投递路径

  • 生产者->交换机->队列->消费者

5.3.通过两个点控制消息的可靠性投递

  • 生产者到交换机:confirmCallback
  • 交换机到队列:returnCallback

**注意:**开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,rabbitmq整体的新跟那个效率会变低,吞吐量下降严重,不是非常重要的消息不建议开启消息确认机制

5.4.RabbitMQ消费可靠性投递confirmCallback实战

生产者到交换机

通过confirmCallback

生产者投递消息后,如果Broker收到消费后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种法师是方式可靠性投递的核心。

开启confirmCallback

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换机Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值事发布消息成功到交换机后触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated

编码实战

核心API:setConfirmCallback() 、confirm(配置,是否接到消息,失败的原因)

@Test
    void testConfirmCallback(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 配置
             * @param ack 交换机是否收到消息,true是成功,false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback======>");
                System.out.println("correlationData======>"+correlationData);
                System.out.println("ack======>"+ack);
                System.out.println("cause======>"+cause);
                if(ack){
                    System.out.println("发送成功");
                    //更新数据库的状态,状态为成功
                }else {
                    System.out.println("发送失败,记录到日志或者数据库");
                    //更新数据库的状态,状态为失败
                }
            }
        });
        //数据库新增一个消息记录,状态是发送,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
    }

**模拟异常:**修改交换机名称

5.5.RabbitMQ消费可靠性投递returnCallback实战

交换机到队列

  • 通过returnCallback
  • 消息从交换机发送到对应的队列失败时,触发
  • 两种模式:
  • 交换机到队列不成功,则丢弃消息(默认)
  • 交换机到队列不成功,发挥生产者,触发returnCallback
#配置为true,则交换机处理消息到路由失败,会返回给生产者
spring.rabbitmq.template.mandatory=true
#或者在temlate对象上设置
template.setMandatory(true);

第一步:开启returnCallback配置

#新版
spring.rabbitmq.publisher-returns=true

第二步:修改交换机投递到队列失败的策略

#为true,则交换机处理消息到路由失败会返回给生产者
spring.rabbitmq.template.mandatory=true

编码实战

    @Test
    void testReturnCallback(){
    //publisher-returns为true则交换机处理消息到路由失败,返回给生产者
        //mandatory为true则消息未被路由到任何一个queue,则回退一条消息给生产者
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("ReturnedMessage:" + returned.toString());
                int replyCode = returned.getReplyCode();
                System.out.println("_______________________");
                System.out.println("replyCode:" + replyCode);
            }
        });
        //数据库新增一个消息记录,状态是发送,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
    }
}

5.6.Rabbitmq的消息确机制ACK讲解

背景:消费者从broker中监听消息,需要确保消息被合理的处理掉

RabbitMQ的ACK介绍

  • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,按摩就不会有ACK反馈,RabbitMQ回认为i这个消息没有正常消费,会将消息重新放入队列中。
  • 只有当消费者正确的发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如违背进行ACK消息确认机制,这条消息将被锁定Unacked

确认方式

  • 自动确认(默认)
  • 手动确认(manual)
spring:
  rabbitmq: 
    #开启手动确认消息,如果消息重新入队,进行重试
    listener: 
      simple: 
        acknowledge-mode: manual

5.7.编码实战

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
    /**
     * 处理器,适配器,加上@RabbitHandler注解
     * 加上Channel这个参数
     */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        System.out.println("message:"+message.toString());
        System.out.println("==============");
        System.out.println("消息标识tag:"+tag);
        System.out.println("消息体body:"+body);
        //第一个参数是该消息的index,第二个是是否批量操作
        channel.basicAck(tag,false);
        //第一个参数是index,第二个是否批量,第三个是失败后是否重新返回给生产者重新投递
        //channel.basicNack(msgTag,false,true);
    }
}

deliveryTage介绍

  • 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

basicNack和basicReject介绍

  • basicReject一次只能拒绝接收一个消息,可以设置是否requeue
  • basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
5月前
|
消息中间件 监控 中间件
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
167 5
|
6月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
6月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
4月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
258 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
5月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
186 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
4月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
5月前
|
消息中间件 中间件 UED
为什么需要消息队列中间件?
为什么需要消息队列中间件?
77 4
|
5月前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
5月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
114 0