♨️本篇文章记录的为RabbitMQ知识中
高级特性
和企业级项目
相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习
如果对RabbitMQ的基础认识不够了解的话
请点这里👇RabbitMQ快速上手👇
🐇🐇🐇前言:我们的RabbitMQ经常被用来做⚡秒杀类业务
⚡,所以在商城类项目中充当着一个很重要的中间件,关于它的高级特性
和企业级项目
中的一些重点问题的解决方案在这里我会进行详细的总结, 并在最后展示一部分秒杀业务中的细节性代码
@[TOC]
1. 🐇Springboot整合RabbitMQ🐇
(1) 添加依赖
在pom.xml中引入这两个依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
(2) RabbitMQ相关参数的yml
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
retry:
#发布重试,默认false
enabled: true
#重试时间 默认1000ms
initial-interval: 1000
#重试最大次数 最大3
max-attempts: 3
#重试最大间隔时间
max-interval: 10000
#重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 1
listener:
# 默认配置是simple
type: simple
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
#消费者调用程序线程的最小数量
concurrency: 10
#消费者最大数量
max-concurrency: 10
#限制消费者每次只处理一条信息,处理完在继续下一条
prefetch: 1
#启动时是否默认启动容器
auto-startup: true
#被拒绝时重新进入队列
default-requeue-rejected: true
🍓2. 消息可靠性投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式
rabbitmq 整个消息投递的路径为:
producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
confirm 确认模式
(1)confirm确认模式代码实现
创建maven工程,消息的生产者工程,项目模块名称:rabbitmq-producer-spring
(2) 配置交换机和绑定队列
@Configuration
public class RabbitConfig {
/**
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
//交换机名称
public static final String FANOUT_EXCHANGE = "test_exchange_confirm";
//队列名称
public static final String FANOUT_QUEUE_1 = "test_queue_confirm";
@Bean(FANOUT_EXCHANGE)
public Exchange FANOUT_EXCHANGE(){
return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE).durable(true).build();
}
@Bean(FANOUT_QUEUE_1)
public Queue FANOUT_QUEUE_1(){
return new Queue(FANOUT_QUEUE_1,true,false,false,null);
}
@Bean
public Binding FANOUT_QUEUE_1_FANOUT_EXCHANGE(@Qualifier(FANOUT_QUEUE_1) Queue queue,
@Qualifier(FANOUT_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
(3) 接下来我们来测试一下confirm模式
@Slf4j
@SpringBootTest
public class ConfirmTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:yml中开启publisher-confirm-type: correlated
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {
//2. 定义回调 **
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
(4) 测试成功
: 测试结果如图
return 退回模式
首先我们要在yml中添加 开启退回模式
publisher-returns: true
(1) 接下来我们测试一下return模式
@Test
public void testReturn() {
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//2.设置ReturnCallBack
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("return 执行了....");
log.info(returnedMessage.getMessage().toString());
log.info(String.valueOf(returnedMessage.getReplyCode()));
log.info(returnedMessage.getReplyText());
log.info(returnedMessage.getExchange());
log.info(returnedMessage.getRoutingKey());
//处理
}
});
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm1", "confirm", "message confirm....");
}
(2) 测试成功
: 测试结果如图
🍓3. Consumer ACK
ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
对应yml如下
listener:
# 默认配置是simple
type: simple
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
#消费者调用程序线程的最小数量
concurrency: 10
#消费者最大数量
max-concurrency: 10
#限制消费者每次只处理一条信息,处理完在继续下一条
prefetch: 1
#启动时是否默认启动容器
auto-startup: true
#被拒绝时重新进入队列
default-requeue-rejected: true
下面我们来测试一下ack
@Component
public class AckListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_confirm")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
//int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
// 了解
//channel.basicReject(deliveryTag,true);
}
}
}
其实Consumer ACK机制并不复杂:
- 设置手动签收。acknowledge="manual"
- 让监听器类实现ChannelAwareMessageListener接口
- 如果消息成功处理,则调用channel的 basicAck()签收
- 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
🍓4.消费端限流
这里有必要说一下,在秒杀类业务中,对于突如其来的万级的请求
我们的消费者可能也会吃不消的,这里就需要对其限流处理
还有一种情况就是公司中需要维护相关的业务功能,可能需要将A系统的服务停止,那么这个时候消息的生产者还是一直会向MQ中发送待处理的消息,消费者此时服务已经关闭,导致大量的消息都会在MQ中累积。如果当A系统成功启动后,默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的
可以通过MQ中的 listener-container 配置属性perfetch =1表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
🍓5. TTL
设置队列参数、交换机参数、发消息都可以用页面。
也能用代码。
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
应用场景 : 比如说现在我们抢购到了一个商品,现在到了付款的环节,订单的有效期为30min,到期未付款订单自动取消,我们就可以使用TTL
来进行处理
代码展示
//队列名称
public static final String TEST_QUEUE_TTL = "test_queue_ttl";
//交换机名称
public static final String TEST_EXCHANGE_TTL = "test_exchange_ttl";
@Bean(TEST_QUEUE_TTL)
public Queue TEST_QUEUE_TTL(){
Map<String, Object> arguments = new HashMap();
arguments.put("x-message-ttl", 10000);
return new Queue(TEST_QUEUE_TTL, true, false, false, arguments);
}
@Bean(TEST_EXCHANGE_TTL)
public Exchange TEST_EXCHANGE_TTL(){
return ExchangeBuilder.fanoutExchange(TEST_EXCHANGE_TTL).durable(true).build();
}
@Bean
public Binding QUEUE_TTL_EXCHANGE_TTL (@Qualifier(TEST_QUEUE_TTL) Queue queue,
@Qualifier(TEST_EXCHANGE_TTL) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
}
我们来测试一下
```java
@Test
public void testTtl2() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor1 = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
}
};
MessagePostProcessor messagePostProcessor2 = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("10000");//消息的过期时间
//2.返回该消息
return message;
}
};
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor1);
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor1);
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor2);
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor2);
}
> 测试结果图像是这个样子的
![在这里插入图片描述](https://ucc.alicdn.com/images/user-upload-01/19048ef292524ef7bf9648db14161c22.png)
**这是因为**
> `队列过期后,会将队列所有消息全部移除`。
`消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)`
- 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
- 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
- 如果两者都进行了设置,以时间短的为准。
### 🍓6. 死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
> **消息成为死信的三种情况:**
- `队列消息长度到达限制`;
- `消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false`;
- `原队列存在消息过期设置,消息到达超时时间未被消费`;
> **队列绑定死信交换机:**
给队列设置参数: `x-dead-letter-exchange `和 `x-dead-letter-routing-key`
```java
@Configuration
public class RabbitMQConfig {
/**业务交换机*/
public static final String BUSINESS_EXCHANGE = "business.exchange";
/**死信交换机*/
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
/**常规超时时间*/
public static Long QUEUE_EXPIRATION = 20000L;
/**生成订单队列*/
public static final String ORDER_CREATE_QUEUE = "order.create.queue";
/**生成订单死信队列*/
public static final String ORDER_CREATE_DEAD_LETTER_QUEUE = "order.create.dead.letter.queue";
/**生成订单路由键*/
public static final String ORDER_CREATE_ROUTING_KEY = "order.create.routing.key";
/**生成订单死信路由键*/
public static final String ORDER_CREATE_DEAD_LETTER_ROUTING_KEY = "order.create.dead.letter.routing.key";
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**业务交换机*/
@Bean
public Exchange getBusinessExchange(){
return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE).durable(true).build();
}
/**死信交换机*/
@Bean
public Exchange getDeadLetterExchange(){return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}
/**生成订单队列*/
@Bean
public Queue getOrderQueue(){
Map<String, Object> args = new HashMap<>();
//x-dead-letter-exchange 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//x-dead-letter-routing-key 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", ORDER_CREATE_DEAD_LETTER_ROUTING_KEY);
//设置过期时间
args.put("x-message-ttl", QUEUE_EXPIRATION);
return QueueBuilder.durable(ORDER_CREATE_QUEUE).withArguments(args).build();
}
/**绑定业务交换机和生成订单队列*/
@Bean
public Binding bindOrder(){
return BindingBuilder.bind(getOrderQueue()).to(getBusinessExchange()).with(ORDER_CREATE_ROUTING_KEY).noargs();
}
/**生成订单死信队列*/
@Bean
public Queue getOrderDeadLetterQueue(){return new Queue(ORDER_CREATE_DEAD_LETTER_QUEUE);}
/**绑定死信交换机和生成订单死信队列*/
@Bean
public Binding bingOrderDeadLetter(){
return BindingBuilder.bind(getOrderDeadLetterQueue()).to(getDeadLetterExchange()).with(ORDER_CREATE_DEAD_LETTER_ROUTING_KEY).noargs();
}
}
🍓7. 延迟队列-重点(订单处理)
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
提出需求:
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功7天后,发送短信问候。
实现方式:
定时器(不优雅!)
延迟队列
注意:在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
以下代码是订单业务中一部分核心的业务逻辑
@Configuration
public class RabbitMQConfig {
/**业务交换机*/
public static final String BUSINESS_EXCHANGE = "business.exchange";
/**死信交换机*/
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
/**常规超时时间*/
public static Long QUEUE_EXPIRATION = 20000L;
/**支付超时时间*/
public static Long QUEUE_PAID_EXPIRATION = 60000*30L;//30分钟
/**订单待支付队列*/
public static final String UNPAID_QUEUE = "unpaid.queue";
/**订单待支付死信队列*/
public static final String UNPAID_DEAD_LETTER_QUEUE = "unpaid.dead.letter.queue";
/**订单待支付路由键*/
public static final String UNPAID_ROUTING_KEY = "unpaid.routing.key";
/**订单待支付死信路由键*/
public static final String UNPAID_DEAD_LETTER_ROUTING_KEY = "unpaid.dead.letter.routing.key";
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**业务交换机*/
@Bean
public Exchange getBusinessExchange(){
return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE).durable(true).build();
}
/**死信交换机*/
@Bean
public Exchange getDeadLetterExchange(){
return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}
/**订单未支付队列*/
@Bean
public Queue getUnpaidQueue(){
Map<String,Object> args = new HashMap<>();
//x-dead-letter-exchange 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//x-dead-letter-routing-key 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", UNPAID_DEAD_LETTER_ROUTING_KEY);
//设置过期时间
args.put("x-message-ttl", QUEUE_PAID_EXPIRATION);
return QueueBuilder.durable(UNPAID_QUEUE).withArguments(args).build();
}
/**绑定业务交换机和订单未支付队列*/
@Bean
public Binding bindUnpaid(){
return BindingBuilder.bind(getUnpaidQueue()).to(getBusinessExchange()).with(UNPAID_ROUTING_KEY).noargs();
}
/**订单待支付死信队列*/
@Bean
public Queue getUnpaidDeadLetterQueue(){
return new Queue(UNPAID_DEAD_LETTER_QUEUE);}
/**绑定死信交换机和待支付死信队列*/
@Bean
public Binding bingUnpaidDeadLetter(){
return BindingBuilder.bind(getUnpaidDeadLetterQueue()).to(getDeadLetterExchange()).with(UNPAID_DEAD_LETTER_ROUTING_KEY).noargs();
}
}
@RabbitListener(queues = RabbitMQConfig.ORDER_CREATE_QUEUE)
public void createOrderListener(OrderInfoIn orderInfoIn,Message message, Channel channel) throws CommonRuntimeException, IOException {
String messageId = (String)redisTemplate.opsForValue().get(orderInfoIn.getMessageId().toString());
if(messageId == null){
//保证冥等性
try {
this.createOrder(orderInfoIn);
redisTemplate.opsForValue().set(orderInfoIn.getMessageId().toString(), "ack");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//存入订单待支付队列
orderInfoIn.setMessageId(idGenerator.getGlobalId());//雪花算法保证冥等性
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.UNPAID_ROUTING_KEY,orderInfoIn);
} catch (Exception e){
logger.error("消费创建订单消息失败【】error:"+ message.getBody());
logger.error("OrderConsumer handleMessage {} , error:",message,e);
//处理消息失败,将消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
/**
* 订单未支付消息监听
* @param message
* @param channel
* @throws CommonRuntimeException
*/
@RabbitListener(queues = RabbitMQConfig.UNPAID_DEAD_LETTER_QUEUE)
public void OrderDeadLetterListener(OrderInfoIn orderInfoIn,Message message, Channel channel) throws CommonRuntimeException, IOException {
String messageId = (String)redisTemplate.opsForValue().get(orderInfoIn.getMessageId().toString());
if(messageId == null){
//保证冥等性
try {
//检测订单状态是否为已支付,否则回滚库存、订单支付超时等操作...
redisTemplate.opsForValue().set(orderInfoIn.getMessageId().toString(), "ack");
//消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e){
logger.error("消费订单未支付消息失败【】error:"+ message.getBody());
logger.error("handleMessage {} , error:",message,e);
//处理消息失败,将消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}else{
//已消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
小结:
基于SpringBoot的微服务项目
里集成RabbitMQ应用就差不多搭建完成,本文结合一些网上的理论知识概念,及实例给大家介绍其中几个常用特性, 以后再给大家介绍一些RabbitMQ面试知识
如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页
💖阿千弟