【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

简介: 【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

在认识消息队列之前 我想有必要说明什么是异步处理


今天是女神节 现在我们的坤坤 很希望约他的女神出来


当然结果不尽人意


于是坤坤转念一想 天下女神千千万 何必单恋一枝花?  他提出了这样一个理论“只要舔的够多 够快 总能成功的”  于是他转换策略  他决定在列表中循环


问完一个女神 再问候下一个


这 就是同步处理


但他发现效率太低了  等他问到第三个的时候 隔壁老王已经把小美越走了  于是坤坤为了解决这个问题   想到了一个很好的办法 那就是异步通讯


异步通讯


异步同学 顾名思义 “异”就是不同  不同的步骤去执行 就不是一个线上的  他不必等待上一位女神给他回复 或者他不必完成上一次的任务(可以理解为群发) 就能够直接执行下一步  这 就是异步处理(异步通讯)


但是 要怎么样实现这个模型?  就引入到了这里的主题 消息队列


消息队列 --RabbitMQ


RabbitMQ是一个消息队列中间件,用于实现应用程序的异步和解耦,同时也能起到消息缓冲和消息分发的作用。它是基于AMQP(高级消息队列协议)的一种消息中间件,最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ具有高可用性、高性能和灵活性等特点,因此在互联网公司和分布式系统中得到广泛应用

我们来解析其架构 就会发现其实这是一个很简单的东西


无非就是 发布(也可以说是生产)和消费


生产者:


生产者是消息队列中的消息发送方。它负责创建并发送消息到消息队列中,供消费者进行消费。生产者通常与特定的业务逻辑相关联,根据业务需求生成消息并将其发送到消息队列。生产者将消息发送到特定的队列或主题,然后消息队列会将消息传递给一个或多个消费者进行处理。


生产者的主要职责包括:


  • 创建消息并设置相关的属性(如消息内容、优先级、过期时间等)。
  • 将消息发送到消息队列中。
  • 处理发送消息过程中可能出现的异常情况。


消费者:


消费者是消息队列中的消息接收方。它负责从消息队列中获取消息并进行处理。消费者通常与特定的业务逻辑相关联,负责处理接收到的消息,执行相应的操作,可能是业务逻辑的处理、数据存储、日志记录等。


消费者的主要职责包括:


  • 从消息队列中获取消息。
  • 处理接收到的消息,执行相应的操作。
  • 确认消息的消费状态(如消息确认、消息拒绝、消息重试等)。
  • 处理消费消息过程中可能出现的异常情况。


生产者和消费者的协作可以实现解耦和异步通信的优势。生产者可以独立于消费者的处理速度和状态,将消息发送到消息队列中,而消费者可以根据自己的处理能力和需求从消息队列中获取并处理消息。这种解耦和异步通信的方式可以提高系统的可伸缩性、可靠性和灵活性。


深入原理

他由下面四个部分组成:


  1. 生产者(Producer):

生产者负责创建并发送消息到RabbitMQ的交换器。生产者将消息发布到特定的交换器,并可以指定消息的路由键(Routing Key)。生产者可以根据业务需求生成消息,并选择将消息发送到特定的交换器中。


  1. 交换器(Exchange):

交换器是消息的分发中心,它接收来自生产者的消息,并根据消息的路由键将消息路由到一个或多个绑定的队列中。交换器根据事先定义的规则(Exchange Type)来决定如何路由消息。RabbitMQ提供了几种常见的交换器类型,包括直连交换器(Direct Exchange)、主题交换器(Topic Exchange)、广播交换器(Fanout Exchange)和首部交换器(Headers Exchange)。


  1. 队列(Queue):

队列是消息的存储和传递载体。它是消息的终点,消费者通过订阅队列来接收消息。每个消息都被发送到一个特定的队列中,消费者从队列中获取消息并进行处理。队列具有先进先出的特性,保证了消息的顺序性。


  1. 消费者(Consumer):

消费者从队列中获取消息并进行处理。消费者可以根据自身的需求订阅一个或多个队列,以接收相应的消息。消费者可以在不同的节点或者不同的系统中部署,实现分布式的消息处理。


具体使用


那么我们明白了他的构成 就来看如何进行使用


引入Spring RabbitMQ依赖:


在项目的构建文件(如Maven的pom.xml)中添加Spring RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


配置RabbitMQ连接:


在Spring Boot的配置文件(如application.properties或application.yml)中添加RabbitMQ的连接配置:

spring.rabbitmq.host=your-rabbitmq-host
spring.rabbitmq.port=5672
spring.rabbitmq.username=your-username
spring.rabbitmq.password=your-password


  1. 创建消息发送者:


创建一个消息发送者(Producer)的类,使用Spring RabbitMQ提供的RabbitTemplate来发送消息。在发送消息之前,需要注入RabbitTemplate并配置交换器和路由键:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
@Component
public class MessageSender {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("exchange-name", "routing-key", message);
    }
}


  1. 创建消息接收者:


创建一个消息接收者(Consumer)的类,使用Spring RabbitMQ提供的@RabbitListener注解来监听队列并处理接收到的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class MessageReceiver {
 
    @RabbitListener(queues = "queue-name")
    public void receiveMessage(String message) {
        System.out.println("消费者接收到消息: " + "【"+message+"】");
        // 处理接收到的消息逻辑
    }
}


消息发送与接收:


在需要发送消息的地方,注入MessageSender并调用sendMessage方法发送消息:

@Autowired
private MessageSender messageSender;
 
public void send() {
    for(int i=0;i<100;i+=2){
    messageSender.sendMessage("hello, message_"+i);
    }
}


运行效果


消息的可靠性投递:


为了实现消息的可靠性投递,可以使用以下方法:


  • 消息持久化:在发送消息时,将消息设置为持久化。通过MessageProperties中的setDeliveryMode方法将消息的传递模式设置为2(持久化)。
rabbitTemplate.convertAndSend("exchange-name", "routing-key", message, message -> {
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return message;
});


  • 发送者确认模式:在发送消息时,启用发送者确认模式,确保消息成功发送到RabbitMQ。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        // 消息发送成功
    } else {
        // 消息发送失败,进行处理
    }
});


  • 消费者确认模式:在消费者处理消息完成后,手动确认消息的消费状态。
@RabbitListener(queues = "queue-name")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
    try {
        // 处理接收到的消息逻辑
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        // 处理消息消费失败的情况
        channel.basicNack(deliveryTag, false, true);
    }
}


在具体的业务中 我们可以把消息队列作为一个消息的传递,例如订单完成以后 就去通知发货系统跟售后系统去执行 能够解除系统之间的耦合 达到更高效的工作效率

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
371 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
5月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
318 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
10月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
10月前
|
消息中间件 人工智能 运维
乐言科技:云原生加速电商行业赋能,云消息队列助力降本 37%
乐言科技依托云原生架构及阿里云云原生产品体系,实现基础设施与业务解耦以及弹性调度,在提升业务稳定性的同时,显著增加研发效能并降低运维成本,加速电商客户定制化需求交付,推动云计算与 AI 技术在电商领域的深度融合。
632 101
|
6月前
|
消息中间件 人工智能 Cloud Native
【云故事探索 | NO.14】:乐言科技——云原生加速电商行业赋能,云消息队列助力降本 37%
【云故事探索 | NO.14】:乐言科技——云原生加速电商行业赋能,云消息队列助力降本 37%
|
9月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
303 32
|
10月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
3131 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
10月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
446 11
RocketMQ原理—3.源码设计简单分析下
|
10月前
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
|
10月前
|
消息中间件 人工智能 Kafka
【云故事探索】NO.14:乐言科技——云原生加速电商行业赋能,云消息队列助力降本 37%
上海乐言科技股份有限公司专注于AI技术,提供电商、金融等领域的整体解决方案。其核心产品“乐语助人”智能客服机器人日均服务超千万人次,助力六万余家电商客户数智化转型。为解决自建消息队列痛点,乐言科技采用阿里云消息队列RocketMQ版Serverless系列,实现业务稳定、开发成本降低、运维效率提升及资源弹性降本37%。通过云原生架构,乐言科技推动AI与电商深度融合,助力行业创新突破。