在认识消息队列之前 我想有必要说明什么是异步处理
今天是女神节 现在我们的坤坤 很希望约他的女神出来
于是坤坤转念一想 天下女神千千万 何必单恋一枝花? 他提出了这样一个理论“只要舔的够多 够快 总能成功的” 于是他转换策略 他决定在列表中循环
这 就是同步处理
但他发现效率太低了 等他问到第三个的时候 隔壁老王已经把小美越走了 于是坤坤为了解决这个问题 想到了一个很好的办法 那就是异步通讯
异步通讯
异步同学 顾名思义 “异”就是不同 不同的步骤去执行 就不是一个线上的 他不必等待上一位女神给他回复 或者他不必完成上一次的任务(可以理解为群发) 就能够直接执行下一步 这 就是异步处理(异步通讯)
但是 要怎么样实现这个模型? 就引入到了这里的主题 消息队列
消息队列 --RabbitMQ
RabbitMQ是一个消息队列中间件,用于实现应用程序的异步和解耦,同时也能起到消息缓冲和消息分发的作用。它是基于AMQP(高级消息队列协议)的一种消息中间件,最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ具有高可用性、高性能和灵活性等特点,因此在互联网公司和分布式系统中得到广泛应用
我们来解析其架构 就会发现其实这是一个很简单的东西
无非就是 发布(也可以说是生产)和消费
生产者:
生产者是消息队列中的消息发送方。它负责创建并发送消息到消息队列中,供消费者进行消费。生产者通常与特定的业务逻辑相关联,根据业务需求生成消息并将其发送到消息队列。生产者将消息发送到特定的队列或主题,然后消息队列会将消息传递给一个或多个消费者进行处理。
生产者的主要职责包括:
- 创建消息并设置相关的属性(如消息内容、优先级、过期时间等)。
- 将消息发送到消息队列中。
- 处理发送消息过程中可能出现的异常情况。
消费者:
消费者是消息队列中的消息接收方。它负责从消息队列中获取消息并进行处理。消费者通常与特定的业务逻辑相关联,负责处理接收到的消息,执行相应的操作,可能是业务逻辑的处理、数据存储、日志记录等。
消费者的主要职责包括:
- 从消息队列中获取消息。
- 处理接收到的消息,执行相应的操作。
- 确认消息的消费状态(如消息确认、消息拒绝、消息重试等)。
- 处理消费消息过程中可能出现的异常情况。
生产者和消费者的协作可以实现解耦和异步通信的优势。生产者可以独立于消费者的处理速度和状态,将消息发送到消息队列中,而消费者可以根据自己的处理能力和需求从消息队列中获取并处理消息。这种解耦和异步通信的方式可以提高系统的可伸缩性、可靠性和灵活性。
深入原理
他由下面四个部分组成:
- 生产者(Producer):
生产者负责创建并发送消息到RabbitMQ的交换器。生产者将消息发布到特定的交换器,并可以指定消息的路由键(Routing Key)。生产者可以根据业务需求生成消息,并选择将消息发送到特定的交换器中。
- 交换器(Exchange):
交换器是消息的分发中心,它接收来自生产者的消息,并根据消息的路由键将消息路由到一个或多个绑定的队列中。交换器根据事先定义的规则(Exchange Type)来决定如何路由消息。RabbitMQ提供了几种常见的交换器类型,包括直连交换器(Direct Exchange)、主题交换器(Topic Exchange)、广播交换器(Fanout Exchange)和首部交换器(Headers Exchange)。
- 队列(Queue):
队列是消息的存储和传递载体。它是消息的终点,消费者通过订阅队列来接收消息。每个消息都被发送到一个特定的队列中,消费者从队列中获取消息并进行处理。队列具有先进先出的特性,保证了消息的顺序性。
- 消费者(Consumer):
消费者从队列中获取消息并进行处理。消费者可以根据自身的需求订阅一个或多个队列,以接收相应的消息。消费者可以在不同的节点或者不同的系统中部署,实现分布式的消息处理。
具体使用
那么我们明白了他的构成 就来看如何进行使用
引入Spring RabbitMQ依赖:
在项目的构建文件(如Maven的pom.xml)中添加Spring RabbitMQ的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置RabbitMQ连接:
在Spring Boot的配置文件(如application.properties或application.yml)中添加RabbitMQ的连接配置:
spring.rabbitmq.host=your-rabbitmq-host spring.rabbitmq.port=5672 spring.rabbitmq.username=your-username spring.rabbitmq.password=your-password
- 创建消息发送者:
创建一个消息发送者(Producer)的类,使用Spring RabbitMQ提供的RabbitTemplate
来发送消息。在发送消息之前,需要注入RabbitTemplate
并配置交换器和路由键:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend("exchange-name", "routing-key", message); } }
- 创建消息接收者:
创建一个消息接收者(Consumer)的类,使用Spring RabbitMQ提供的@RabbitListener
注解来监听队列并处理接收到的消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageReceiver { @RabbitListener(queues = "queue-name") public void receiveMessage(String message) { System.out.println("消费者接收到消息: " + "【"+message+"】"); // 处理接收到的消息逻辑 } }
消息发送与接收:
在需要发送消息的地方,注入MessageSender
并调用sendMessage
方法发送消息:
@Autowired private MessageSender messageSender; public void send() { for(int i=0;i<100;i+=2){ messageSender.sendMessage("hello, message_"+i); } }
运行效果
消息的可靠性投递:
为了实现消息的可靠性投递,可以使用以下方法:
- 消息持久化:在发送消息时,将消息设置为持久化。通过
MessageProperties
中的setDeliveryMode
方法将消息的传递模式设置为2(持久化)。
rabbitTemplate.convertAndSend("exchange-name", "routing-key", message, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; });
- 发送者确认模式:在发送消息时,启用发送者确认模式,确保消息成功发送到RabbitMQ。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // 消息发送成功 } else { // 消息发送失败,进行处理 } });
- 消费者确认模式:在消费者处理消息完成后,手动确认消息的消费状态。
@RabbitListener(queues = "queue-name") public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { try { // 处理接收到的消息逻辑 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 处理消息消费失败的情况 channel.basicNack(deliveryTag, false, true); } }
在具体的业务中 我们可以把消息队列作为一个消息的传递,例如订单完成以后 就去通知发货系统跟售后系统去执行 能够解除系统之间的耦合 达到更高效的工作效率