RabbitMQ Tutorial by Java(2)https://developer.aliyun.com/article/1517445
SpringBoot集成RabbitMQ
如何声明
首先创建SpringBoot项目, 然后引入依赖:
<!-- rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
写入配置文件:
spring: rabbitmq: host: your-rabbitMQ-host port: your-port username: username password: password
声明配置类:
package com.example.rabbitmqtest.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 确定交换机名 public static final String EXCHANGE_NAME = "TEST_EXCHANGE"; // 确定队列名 public static final String QUEUE_NAME = "QUEUE_NAME"; // 确定RoutingKey public static final String ROUTING_KEY = "ROUTING_KEY"; // 声明交换机 @Bean(EXCHANGE_NAME) public DirectExchange getExchange() { return new DirectExchange(EXCHANGE_NAME); } // 声明队列 @Bean(QUEUE_NAME) public Queue getQueue() { return new Queue(QUEUE_NAME); } // 绑定交换机和队列 @Bean public Binding queueBindingExchange( @Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
建立controller接口接收消息, 并将消息以生产者的身份发送给交换机:
@ResponseBody @Controller @RequestMapping("/your-path") public class SendMsgController { // 使用这个模板类来来对RabbitMQ进行操作 @Resource private RabbitTemplate rabbitTemplate; // 添加路径 @GetMapping("/child-path/{message}") public void sendMSG(@PathVariable String message) { System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}"); // 发送消息 rabbitTemplate.convertAndSend(ExchangeName, "RoutingKey", message); } }
声明消费者:
package com.example.rabbitmqtest.consumer; import com.example.rabbitmqtest.config.TTLQueueConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Date; /** * 队列ttl 的消费者 */ @Component public class DeadLetterQueueConsumer { // 接收消息, 添加监听器, 监听对应queue中的消息, 可以包含多个queue, 多个queue之间使用逗号隔开 @RabbitListener(queues = {"queue1","queue2","queue3"}) public void recieveD(Message message, Channel channel) throws Exception { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+msg+"}"); } }
发布确认
此时如果消息 发布出去, 但是由于某种原因接受失败:
如果生产者发的, 交换机没有回应, 那么就应该调用回调接口来确认消息是否发送失败. 如果消费者发的消息没有发送过去, 那么就会触发回调接口, 让消息被缓存在内存中.
@FunctionalInterface public interface ConfirmCallback { void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3); }
参数:
- var1 是表示什么内容发送失败了.
- var2 表示内容是否发送成功, 如果为true, 则表示发送成哥, 反之则为失败.
- var3 表示发送失败的原因是什么
实现类:
package com.example.rabbitmqtest.config; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); } /** * 交换机确认回调方法 * @param correlationData 保存回调函数的消息的id以及其相关信息 * @param b 交换机收到消息为true, 否则为false * @param s 失败的原因 * */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if(b) { System.out.println("success: 已接收到的消息的id>" + (correlationData!=null ? correlationData.getId() : "null")); } else { System.out.println("false:" + s); } } }
@PostConstruct
注解标记的方法会在依赖注入完成后,自动被Spring框架调用。因此,在init
方法中,您可以安全地访问rabbitTemplate
字段,并将其确认回调设置为当前实例(this
)。这样,当RabbitMQ确认消息时,就会调用MyCallBack
中的confirm
方法。
关联给生产者:
@ResponseBody @Controller @RequestMapping("/your-path") public class SendMsgController { // 使用这个模板类来来对RabbitMQ进行操作 @Resource private RabbitTemplate rabbitTemplate; // 添加路径 @GetMapping("/child-path/{message}") public void sendMSG(@PathVariable String message) { CorrelationData correlationData = new CorrelationData(); System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}"); // 发送消息 rabbitTemplate.convertAndSend(ExchangeName, "RoutingKey", message,correlationData ); } }
配置文件 启动确认机制
spring: rabbitmq: publisher-confirm-type: correlated
同时, 不只是correlated, 还有:
⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE
单个确认
消息回退
CallBack接口, 如果RoutingKey是正确的, 可以路由到对应的队列, 那么当消息正确被接受的时候, RabbitMQ调用回调并返回true, 失败则返回false, 但是如果指定的RoutingKey不存在, 那么消息就会被直接丢弃. 但是生产者是不知道的, 所以 我们应该想办法处理这个被丢弃的消息.
package com.example.rabbitmqtest.config; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(this); } @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("被回退的消息内容为: " + new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8)); } }
在上述代码中, 首先需要让一个类实现RabbitTemplate.ReturnsCallback这个接口, 然后重写其中的returnedMessage方法, 如下, 消息中接受一个ReturnedMessage类型的参数, 它的具体源码如下:
package org.springframework.amqp.core; public class ReturnedMessage { private final Message message; private final int replyCode; private final String replyText; private final String exchange; private final String routingKey; // 下面是一些构造方法 和 其对应的get方法, 以及toString方法, 这里省略 }
这个类是Spring AMQP中的一个类, 用于封装RabbitMQ返回给生产者的消息信息. 当RabbitMQ的交换机无法将路由器路由到任何队列的时候, 它就会将消息返回给生产者. 并附带一些额外的信息. 例如上图所示的源码. 下面是对该类中的每一个成员变量的一个解释:
Message message
:这是原始的消息对象,包含了消息的体和其他属性,比如消息头、内容类型等。int replyCode
:这是RabbitMQ返回的回复码。它是一个整数,通常用于指示返回的原因或状态。具体的值可能依赖于RabbitMQ的配置或文档。String replyText
:这是RabbitMQ返回的回复文本。它通常是一个描述性的字符串,用于解释为什么消息被返回。String exchange
:这是发送消息时使用的交换器名称。当消息返回时,这个交换器名称可以帮助你定位问题,了解是哪个交换器未能正确路由消息。String routingKey
:这是发送消息时使用的路由键。路由键用于决定消息应该被路由到哪个队列。当消息返回时,路由键可以帮助你理解为何消息没有被路由到预期的队列。
备份交换机
上述讲述了, 如何接收到被回退的消息, 然后手动做出相关处理, 但是如果消息内容多了,复杂了,手动处理的成本也会很高, 所以这个时候, 我们就需要另外一种方法来自动处理没有被正确路由并被返回的消息.
可以使用备份交换机,在正常交换机无法正确路由某个消息的时候, 这个交换机就会将这个消息发送给备用交换机, 备用交换机连接着用来处理这些被回退的消息的队列和消费者(你也可以理解为一种监察者, 也就是warning consumer or backup consumer).
修改配置类
package com.example.rabbitmqtest.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; // 声明确认队列 // 声明确认队列 @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //声明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("key1"); } //声明备份Exchange @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //声明确认Exchange交换机的备份交换机 @Bean("confirmExchange") public DirectExchange confirmExchange() { ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true) // 设置备份交换机 .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return (DirectExchange) exchangeBuilder.build(); } // 声明警告队列 @Bean("warningQueue") public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } // 声明报警队列绑定关系 @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(queue).to(backupExchange); } // 声明备份队列 @Bean("backQueue") public Queue backQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } // 声明备份队列绑定关系 @Bean public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(queue).to(backupExchange); } }
然后添加一个报警消费者即可
@Component @Slf4j // 这里你也可以不使用slf4j public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning.queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message) { String msg = new String(message.getBody()); log.error("报警发现不可路由消息:{}", msg); } }
队列 TTL
实现
思路图
创建两个队列 QA和QB, QA的消息过期时间设置为10s, QB为40s, 然后创建一个直接交换机X, 和死信交换机Y, 也是direct类型, 然后创建一个死信队列QD, c为死信消费者.
按照上面的模式, 进行书写配置类:
package com.example.rabbitmqtest.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Objects; /** * 声明一个ttl队列 */ @Configuration public class TTLQueueConfig { // 普通交换机X public static final String X_EXCHANGE = "X"; // 死信交换机Y public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; // 普通队列QA, QB public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; // 死信队列QD public static final String DEAD_LETTER_QUEUE_QD = "QD"; // 创建普通交换机 @Bean(X_EXCHANGE) public DirectExchange x_exchage() { return new DirectExchange(X_EXCHANGE); } @Bean(Y_DEAD_LETTER_EXCHANGE) public DirectExchange Y_DEAD_LETTER_EXCHANGE() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } // 声明普通队列 // QA @Bean(QUEUE_A) public Queue queue_a() { Map<String, Object> map = new HashMap<>(); // 设置死信交换机 map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key", "YD"); // 设置ttl 单位是ms map.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(map).build(); } // QB @Bean(QUEUE_B) public Queue queue_b() { Map<String, Object> map = new HashMap<>(); // 设置死信交换机 map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key", "YD"); // 设置ttl 单位是ms map.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(map).build(); } // 死信队列 @Bean(DEAD_LETTER_QUEUE_QD) public Queue queue_d () { return QueueBuilder.durable(DEAD_LETTER_QUEUE_QD).build(); } // 绑定交换机 // QA QB 绑定普通交换机X @Bean // X板顶QA, RoutingKey为XA public Binding queueABindingToX( @Qualifier(QUEUE_A) Queue QA, @Qualifier(X_EXCHANGE) DirectExchange X) { return BindingBuilder.bind(QA).to(X).with("XA"); } @Bean // X板顶QB, RoutingKey为XB public Binding queueBBindingToX( @Qualifier(QUEUE_B) Queue QB, @Qualifier(X_EXCHANGE) DirectExchange X) { return BindingBuilder.bind(QB).to(X).with("XB"); } }
生产者
首先我们需要接收来自接口的消息, 然后将消息通过交换机X发送给队列QB和QA :
/** * 发送延迟消息 * * 发送消息至 localhost ..... /ttl/sendMSG/test */ @ResponseBody @Controller @RequestMapping("/ttl") public class SendMsgController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendMSG/{message}") public void sendMSG(@PathVariable String message) { System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}"); rabbitTemplate.convertAndSend(TTLQueueConfig.X_EXCHANGE, "XA","消息来则ttl为10s的队列" + message); rabbitTemplate.convertAndSend(TTLQueueConfig.X_EXCHANGE, "XB","消息来则ttl为40s的队列" + message); } }
消费者
package com.example.rabbitmqtest.consumer; import com.example.rabbitmqtest.config.TTLQueueConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Date; /** * 队列ttl 的消费者 */ @Component public class DeadLetterQueueConsumer { // 接收消息 @RabbitListener(queues = {TTLQueueConfig.DEAD_LETTER_QUEUE_QD}) public void recieveD(Message message, Channel channel) throws Exception { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+msg+"}"); } }
展示:
查看后端:
延迟队列优化
如图:
此处新增了一个Queue, 名为QC, 该队列不需要设置ttl时间.
存在的问题
我们首先发送一条过期时间为20000ms的消息给QC, 然后发送一个2000ms的消息给QC,会发现:
消息2虽然过期时间段, 但是它并没有优先发送给死信队列, 反而是过期时间长的消息1先发送给死信队列处理..
因为RabbitMQ只会检查第一个消息是否过期, 如果过期则丢弃到死信队列. 然后再去检查第二个消息.
优先级队列
场景
想象一下, 一个队列可以存储转发很多个消息来让特定的消费者消费,但是在复杂的情况中,消息也是分优先级的,比如说, 加入天猫是我创建的后台也是我写的, 那么它里面有很多商户,在收到顾客的消费请求的时候会创建很多订单, 但是不同的商家订单是不一样的. 我们必须对这些商家做出区分, 比如我们优先处理那些订单量大的商家的订单,给他们做一个优先处理.
考虑到使用redis来做消息队列,但是redis只能使用一个list作为一个简单的消息队列.并不能实现一个优先级的场景, 所以考虑到我们RabbitMQ的优先级队列.
使用web插件添加优先级队列
登录到web插件:
添加新队列:
这里的10意味着,队列接受的消息的优先级的范围是0~10,包括0和10也就是[0,10],你发送消息的时候可以将发送到该优先级队列的消息设置一个0~10的优先级.
RabbitMQ会确保具有更高级别的消息优先于较低等级的消息被消费,如果有多个消费者,并且他们都是空闲的,那么具有最高优先级的消息将被传递给其中一个消费者.
但是需要主义的是,如果你不设置x-max-priority,那么默认的最大优先级就是0,这就意味着所有的消息都具有相同的优先级,那么就和普通队列一样了.设置优先级的时候确保在优先级范围内,否则RabbitMQ会拒绝该消息.
使用java代码声明优先级队列
package PriorityQueue; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Producer { public static void main(String[] args) throws Exception{ // 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); // 工厂的ip, 链接rabbit队列 connectionFactory.setHost("106.14.165.91"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("***"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 构造消息 for (int i = 0; i <= 10; i++) { String msg = "消息:" + i; if (i % 2 == 0) { // 设置优先级的属性 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(i).build(); channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes(StandardCharsets.UTF_8)); } else { channel.basicPublish("exchangeName","routingKey",null,msg.getBytes(StandardCharsets.UTF_8)); } } System.out.println("消息发送完毕"); } }
消费者
package PriorityQueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; public class Consumer { public static void main(String[] args) throws Exception{ // 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); // 工厂的ip, 链接rabbit队列 connectionFactory.setHost("106.14.165.91"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("***"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 设置优先级属性 Map<String,Object> params = new HashMap<>(); params.put("x-max-priority",10); // 声明一个优先级队列 channel.queueDeclare("queueName",true,false,false,params); System.out.println("启动消费者"); // 确认回调函数 DeliverCallback deliverCallback = (consumerTag,delivery) -> { String recieveMsg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到消息:" + recieveMsg); }; channel.basicConsume("queueName",deliverCallback,(consumerTag)-> { System.out.println("消费者无法消费此消息时被调用"); }); } }
惰性队列
懒惰队列是一个classic类型的队列, 但是这个队列是以lazy模式运行的. 当你将某个队列设置为lazy模式, 那么在队列里面的消息就会被尽早的存如硬盘, 这些存入硬盘的消息会在他们被消费者请求消费的时候加载到内存中.
这样设计的原因就是希望队列能支持一个更多数量的消息的存储. 消费者如果由于各种原因下线,那么队列中的元素就会堆积在内存, 导致内存溢出, 此时需要处理这些消息, 懒惰队列, 也可以称之为惰性队列, 它的处理方法就是将其持久化到硬盘, 然后要使用或者消费的时候就拿出来.
对比消息的持久化, broker会将消息写入磁盘的时候, 也会给内存中进行一个备份, RabbitMQ释放内存的时候, 将消息持久化到硬盘是一个比较消耗资源的操作, 同时也会阻塞队列, 进而无法接受新的消息.
队列的两种模式:
- default 默认模式
- lazy 惰性模式
如何创建一个lazy queue?
- channel.queueDeclare的时候传入对应的参数
1. Map<String,Object> params = new HashMap<>(); 2. params.put("x-queue-mode","lazy"); 3. channel.queueDeclare("queueName",true,false,false,params);
- 通过policy的方式设置
如果同时设置了这两种的话, 那么policy优先生效
内存对比
在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB👍
集群
最后一个环节就是集群, 之前学过redis的同学都知道为什么要建立一个集群. 我们一台主机上有一个RabbitMQ服务器, 但是如果这个服务器挂了宕机了(
- 交不起电费了
- 地震了电线断了
- 火灾把机器烧了
- 内存崩溃导致RabbitMQ崩溃了
- 主机进水了
- 网站被压测导致内存崩溃, 后台杀死了RabbitMQ进程
), 导致不可用了, 那么生产者的消息就会找不到服务器而被直接丢弃, 造成的影响是毁灭性的
此外单台RabbitMQ的服务器可以满足1000 per s 的消息吞吐量, 但是如果服务器需要100000 per s的吞吐量, 那么要想要买多台服务器, 那就显得有点力不从心了. 但是我们可以在单机上部署多个RabbitMQ服务器. 也就是建立一个RabbitMQ集群.
首先你需要是三个主机, 分别为为node1,node2和node3. 但是主机名并不一定就是node1~3, 需要你去手动修改hostName, 如下:
vim /etc/hostname
然后配置各个节点服务器的hosts文件, 让其能够相互识别:
vim /etc/hosts
node1,node2,node3都要修改
然后将node1上面的cookie给node2和node3:
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行以下命令) :
rabbitmq-server -detached
在节点2执行:
rabbitmqctl stop_app
(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务)
节点3执行:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
集群状态:
rabbitmqctl cluster_status
需要重新设置用户:
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
解除集群节点(node2和node3机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行)