依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency>
简单模式
生产者
package com.imooc.mq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 构建简单模式的生产者,发送消息 */ public class FooProducer { public static void main(String[] args) throws Exception { // 1. 创建连接工厂以及相关的参数配置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 2. 通过工程创建连接 Connection Connection connection = factory.newConnection(); // 3. 创建管道 Channel Channel channel = connection.createChannel(); // 4. 创建队列 Queue(简单模式不需要交换机Exchange) /** * queue: 队列名 * durable: 是否持久化,true:重启之后,队列依然存在,false则不存在 * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false * autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列 * arguments: map类型的其他参数 */ channel.queueDeclare("hello", true, false, false, null); // 5. 向队列发送消息 /** * exchange: 交换机的名称,简单模式下没有,所以直接设置为 "" * routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致 * props: 配置参数 * body: 消息数据 */ String msg = "Hello 慕课网~~~"; channel.basicPublish("", "hello", null, msg.getBytes()); // 6. 释放资源 channel.close(); connection.close(); } }
消费者
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建简单模式的消费者,监听消费消息 */ public class FooConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag = " + consumerTag); System.out.println("envelope = " + envelope.toString()); System.out.println("BasicProperties = " + properties.toString()); System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume("hello", true, consumer); } }
工作队列模式
生产者
package com.imooc.mq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 构建工作队列的生产者,发送消息 */ public class WorkQueuesProducer { public static void main(String[] args) throws Exception { // 1. 创建连接工厂以及相关的参数配置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 2. 通过工程创建连接 Connection Connection connection = factory.newConnection(); // 3. 创建管道 Channel Channel channel = connection.createChannel(); // 4. 创建队列 Queue(简单模式不需要交换机Exchange) /** * queue: 队列名 * durable: 是否持久化,true:重启之后,队列依然存在,false则不存在 * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false * autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列 * arguments: map类型的其他参数 */ channel.queueDeclare("work_queue", true, false, false, null); // 5. 向队列发送消息 /** * exchange: 交换机的名称,简单模式下没有,所以直接设置为 "" * routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致 * props: 配置参数 * body: 消息数据 */ for (int i = 0 ; i < 10 ; i ++) { String task = "开始上班,搬砖喽~ 开启任务[" + i + "]"; channel.basicPublish("", "work_queue", null, task.getBytes()); } // 6. 释放资源 channel.close(); connection.close(); } }
消费者A
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建工作队列模式的消费者,监听消费消息 */ public class WorkQueuesConsumerA { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume("work_queue", true, consumer); } }
消费者B
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建工作队列模式的消费者,监听消费消息 */ public class WorkQueuesConsumerB { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume("work_queue", true, consumer); } }
发布订阅模式
生产者
package com.imooc.mq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 构建发布订阅模式的生产者,发送消息 */ public class PubSubProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 Exchange /** * exchange: 交换机的名称 * type: 交换机的类型 * FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列 * DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列 * TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列 * HEADERS("headers"): 使用率不多,参数匹配 * durable: 是否持久化 * autoDelete: 是否自动删除 * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可 * arguments: map类型的参数 */ String fanout_exchange = "fanout_exchange"; channel.exchangeDeclare(fanout_exchange, BuiltinExchangeType.FANOUT, true, false, false, null); // 定义两个队列 String fanout_queue_a = "fanout_queue_a"; String fanout_queue_b = "fanout_queue_b"; channel.queueDeclare(fanout_queue_a, true, false, false, null); channel.queueDeclare(fanout_queue_b, true, false, false, null); // 绑定交换机和队列 channel.queueBind(fanout_queue_a, fanout_exchange, ""); channel.queueBind(fanout_queue_b, fanout_exchange, ""); for (int i = 0 ; i < 10 ; i ++) { String task = "开始上班,搬砖喽~ 开启任务[" + i + "]"; channel.basicPublish(fanout_exchange, "", null, task.getBytes()); } channel.close(); connection.close(); } }
消费者A
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建发布订阅模式的消费者,监听消费消息 */ public class PubSubConsumerA { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String fanout_queue_a = "fanout_queue_a"; channel.queueDeclare(fanout_queue_a, true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume(fanout_queue_a, true, consumer); } }
消费者B
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建发布订阅模式的消费者,监听消费消息 */ public class PubSubConsumerB { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String fanout_queue_b = "fanout_queue_b"; channel.queueDeclare(fanout_queue_b, true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume(fanout_queue_b, true, consumer); } }
路由(定向)模式
生产者
package com.imooc.mq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 构建路由(定向)模式的生产者,发送消息 */ public class RoutingProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 Exchange /** * exchange: 交换机的名称 * type: 交换机的类型 * FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列 * DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列 * TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列 * HEADERS("headers"): 使用率不多,参数匹配 * durable: 是否持久化 * autoDelete: 是否自动删除 * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可 * arguments: map类型的参数 */ String routing_exchange = "routing_exchange"; channel.exchangeDeclare(routing_exchange, BuiltinExchangeType.DIRECT, true, false, false, null); // 定义两个队列 String routing_queue_order = "routing_queue_order"; String routing_queue_pay = "routing_queue_pay"; channel.queueDeclare(routing_queue_order, true, false, false, null); channel.queueDeclare(routing_queue_pay, true, false, false, null); // 绑定交换机和队列 channel.queueBind(routing_queue_order, routing_exchange, "order_create"); channel.queueBind(routing_queue_order, routing_exchange, "order_delete"); channel.queueBind(routing_queue_order, routing_exchange, "order_update"); channel.queueBind(routing_queue_pay, routing_exchange, "order_pay"); String msg1 = "创建订单A"; String msg2 = "创建订单B"; String msg3 = "删除订单C"; String msg4 = "修改订单D"; String msg5 = "支付订单E"; String msg6 = "支付订单F"; channel.basicPublish(routing_exchange, "order_create", null, msg1.getBytes()); channel.basicPublish(routing_exchange, "order_create", null, msg2.getBytes()); channel.basicPublish(routing_exchange, "order_delete", null, msg3.getBytes()); channel.basicPublish(routing_exchange, "order_update", null, msg4.getBytes()); channel.basicPublish(routing_exchange, "order_pay", null, msg5.getBytes()); channel.basicPublish(routing_exchange, "order_pay", null, msg6.getBytes()); channel.close(); connection.close(); } }
消费者A
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建路由模式的消费者,监听消费消息 */ public class RoutingOrderConsumerA { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String routing_queue_order = "routing_queue_order"; channel.queueDeclare(routing_queue_order, true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume(routing_queue_order, true, consumer); } }
消费者B
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建路由模式的消费者,监听消费消息 */ public class RoutingPayConsumerB { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String routing_queue_pay = "routing_queue_pay"; channel.queueDeclare(routing_queue_pay, true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume(routing_queue_pay, true, consumer); } }
通配符匹配模式 topic
生产者
package com.imooc.mq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 构建通配符模式的生产者,发送消息 */ public class TopicsProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建交换机 Exchange /** * exchange: 交换机的名称 * type: 交换机的类型 * FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列 * DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列 * TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列 * HEADERS("headers"): 使用率不多,参数匹配 * durable: 是否持久化 * autoDelete: 是否自动删除 * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可 * arguments: map类型的参数 */ String topics_exchange = "topics_exchange"; channel.exchangeDeclare(topics_exchange, BuiltinExchangeType.TOPIC, true, false, false, null); // 定义两个队列 String topics_queue_order = "topics_queue_order"; String topics_queue_pay = "topics_queue_pay"; channel.queueDeclare(topics_queue_order, true, false, false, null); channel.queueDeclare(topics_queue_pay, true, false, false, null); // 绑定交换机和队列 *表示通配一个词,#表示通配一个或多个词 channel.queueBind(topics_queue_order, topics_exchange, "order.*"); channel.queueBind(topics_queue_pay, topics_exchange, "*.pay.#"); String msg1 = "创建订单A"; String msg2 = "创建订单B"; String msg3 = "删除订单C"; String msg4 = "修改订单D"; String msg5 = "支付订单E"; String msg6 = "超市订单F"; String msg7 = "慕课订单G"; channel.basicPublish(topics_exchange, "order.create", null, msg1.getBytes()); channel.basicPublish(topics_exchange, "order.create", null, msg2.getBytes()); channel.basicPublish(topics_exchange, "order.delete", null, msg3.getBytes()); channel.basicPublish(topics_exchange, "order.update", null, msg4.getBytes()); // order.pay 能匹配到两个队列,注意看两个队列是否都能消费到 channel.basicPublish(topics_exchange, "order.pay", null, msg5.getBytes()); channel.basicPublish(topics_exchange, "imooc.pay.super.market", null, msg6.getBytes()); channel.basicPublish(topics_exchange, "imooc.pay.course", null, msg7.getBytes()); channel.close(); connection.close(); } }
消费者A
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建通配符模式的消费者,监听消费消息 */ public class TopicsOrderConsumerA { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String topics_queue_order = "topics_queue_order"; channel.queueDeclare(topics_queue_order, true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume(topics_queue_order, true, consumer); } }
消费者B
package com.imooc.mq; import com.rabbitmq.client.*; import java.io.IOException; /** * 构建通配符模式的消费者,监听消费消息 */ public class TopicsPayConsumerB { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String topics_queue_pay = "topics_queue_pay"; channel.queueDeclare(topics_queue_pay, true, false, false, null); Consumer consumer = new DefaultConsumer(channel){ /** * 重写消息配送方法 * @param consumerTag 消息的标签(标识) * @param envelope 信封(一些信息,比如交换机路由等等信息) * @param properties 配置信息 * @param body 收到的消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; /** * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听到的消息 */ channel.basicConsume(topics_queue_pay, true, consumer); } }
SpringBoot集成RabbitMQ
依赖(生产和消费)
<!-- SpringBoot 整合RabbitMQ 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置类 (生产者)
package com.imooc.api.mq; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 的配置类 */ @Configuration public class RabbitMQSMSConfig { // 定义交换机的名称 public static final String SMS_EXCHANGE = "sms_exchange"; // 定义队列的名称 public static final String SMS_QUEUE = "sms_queue"; // 统一定义路由key public static final String ROUTING_KEY_SMS_SEND_LOGIN = "imooc.sms.send.login"; // 创建交换机 @Bean(SMS_EXCHANGE) public Exchange exchange() { return ExchangeBuilder .topicExchange(SMS_EXCHANGE) .durable(true) .build(); } // 创建队列 @Bean(SMS_QUEUE) public Queue queue() { // return new Queue(SMS_QUEUE); return QueueBuilder .durable(SMS_QUEUE) .build(); } // 创建绑定关系 @Bean public Binding smsBinding(@Qualifier(SMS_EXCHANGE) Exchange exchange, @Qualifier(SMS_QUEUE) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("imooc.sms.#") .noargs(); } }
yml配置(生产和消费)
spring: rabbitmq: host: 192.168.233.128 port: 5672 virtual-host: / username: guest password: guest
生产者
@Autowired private RabbitTemplate rabbitTemplate; @PostMapping("getSMSCode") public GraceJSONResult getSMSCode(String mobile, HttpServletRequest request) throws Exception { if (StringUtils.isBlank(mobile)) { return GraceJSONResult.error(); } // 获得用户ip String userIp = IPUtil.getRequestIp(request); // 限制用户只能在60s以内获得一次验证码 redis.setnx60s(MOBILE_SMSCODE + ":" + userIp, mobile); String code = (int)((Math.random() * 9 + 1) * 100000) + ""; // 使用消息队列异步解耦发送短信 SMSContentQO contentQO = new SMSContentQO(); contentQO.setMobile(mobile); contentQO.setContent(code); rabbitTemplate.convertAndSend( "sms_exchange", "imooc.sms.send.login", GsonUtils.object2String(contentQO) );
GsonUtils
package com.imooc.utils; import com.google.gson.*; import com.google.gson.reflect.TypeToken; import java.util.ArrayList; import java.util.List; import java.util.Map; public class GsonUtils { /** * 不用创建对象,直接使用Gson.就可以调用方法 */ private static Gson gson = null; private static JsonParser jsonParser = null; /** * 判断gson对象是否存在了,不存在则创建对象 */ static { if (gson == null) { //gson = new Gson(); // 当使用GsonBuilder方式时属性为空的时候输出来的json字符串是有键值key的,显示形式是"key":null,而直接new出来的就没有"key":null的 gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); } if(jsonParser == null ){ jsonParser = new JsonParser(); } } private GsonUtils() {} /** * json 转对象 * @param strJson * @return */ public static JsonObject string2Object(String strJson) { return jsonParser.parse(strJson).getAsJsonObject(); } /** * 将对象转成json格式 * @param object * @return String */ public static String object2String(Object object) { String gsonString = null; if (gson != null) { gsonString = gson.toJson(object); } return gsonString; } /** * 将json转成特定的cls的对象 * @param gsonString * @param cls * @return */ public static <T> T stringToBean(String gsonString, Class<T> cls) { T t = null; if (gson != null) { //传入json对象和对象类型,将json转成对象 t = gson.fromJson(gsonString, cls); } return t; } // public static <T> T stringToBean2(String gsonString, Class<T> cls) { // // JsonParser jsonParser = new JsonParser(); // JsonObject jsonObject = jsonParser.parse(gsonString).getAsJsonObject(); // // T t = null; // if (gson != null) { // //传入json对象和对象类型,将json转成对象 // t = gson.fromJson(jsonObject, cls); // } // return t; // } /** * json字符串转成list * @param gsonString * @param cls * @return */ public static <T> List<T> stringToList(String gsonString, Class<T> cls) { List<T> list = null; if (gson != null) { //根据泛型返回解析指定的类型,TypeToken<List<T>>{}.getType()获取返回类型 list = gson.fromJson(gsonString, new TypeToken<List<T>>() { }.getType()); } return list; } public static <T> List<T> stringToListAnother(String gsonString, Class<T> cls) { List<T> list = new ArrayList<>(); JsonArray jsonArray = new JsonParser().parse(gsonString).getAsJsonArray(); Gson gson = new Gson(); for (JsonElement jsonElement : jsonArray) { list.add(gson.fromJson(jsonElement,cls)); } return list; } /** * json字符串转成list中有map的 * @param gsonString * @return */ public static <T> List<Map<String, T>> stringToListMaps(String gsonString) { List<Map<String, T>> list = null; if (gson != null) { list = gson.fromJson(gsonString, new TypeToken<List<Map<String, T>>>() { }.getType()); } return list; } /** * json字符串转成map的 * @param gsonString * @return */ public static <T> Map<String, T> stringToMaps(String gsonString) { Map<String, T> map = null; if (gson != null) { map = gson.fromJson(gsonString, new TypeToken<Map<String, T>>() { }.getType()); } return map; } public static String jsonElementAsString(JsonElement jsonElement) { return jsonElement == null ? null : jsonElement.getAsString(); } public static Integer jsonElementAsInt(JsonElement jsonElement) { return jsonElement == null ? null : jsonElement.getAsInt(); } }
消费者
/** * 监听队列,并且处理消息 * @param payload * @param message */ @RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE}) public void watchQueue(String payload, Message message) throws Exception { log.info("payload = " + payload); String routingKey = message.getMessageProperties().getReceivedRoutingKey(); log.info("routingKey = " + routingKey); String msg = payload; log.info("msg = " + msg); if (routingKey.equalsIgnoreCase(RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN)) { // 此处为短信发送的消息消费处理 SMSContentQO contentQO = GsonUtils.stringToBean(msg, SMSContentQO.class); // smsUtils.sendSMS(contentQO.getMobile(), contentQO.getContent()); } }
生产端消息可靠性机制
setConfirmCallback交换机确认接收到消息机制
配置文件:
rabbitmq: host: 192.168.233.128 port: 5672 virtual-host: / username: guest password: guest publisher-confirm-type: correlated
生产者实现
rabbitTemplate.convertAndSend( "sms_exchange", "imooc.sms.send.login", GsonUtils.object2String(contentQO), new CorrelationData(UUID.randomUUID().toString()) ); // 定义confirm回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 回调函数 * @param correlationData 相关性数据 * @param ack 交换机是否成功接收到消息,true:成功 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("进入confirm"); log.info("correlationData:{}", correlationData.getId()); if (ack) { log.info("交换机成功接收到消息~~ {}", cause); } else { log.info("交换机接收消息失败~~失败原因: {}", cause); } } });
setReturnsCallback队列未接受到消息回退机制
配置文件
rabbitmq: host: 192.168.233.128 port: 5672 virtual-host: / username: guest password: guest publisher-confirm-type: correlated publisher-returns: true
生产者实现
rabbitTemplate.convertAndSend( "sms_exchange", "123imooc.sms.send.login", GsonUtils.object2String(contentQO), new CorrelationData(UUID.randomUUID().toString()) ); // 定义路由匹配不到时候 return回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.info("进入return"); log.info(returned.toString()); } });
这里把路由故意写为"123imooc.sms.send.login" 这匹配不到RabbitMQSMSConfig类中定义的路由,所以从交换机发布到对应的队列中
所以就会进入到setReturnsCallback方法中
消费端消息可靠性
ACK确认机制
none 默认自动确认 manual 手动确认(业务处理完成后手动确认,最常用) auto 异常类型确认
配置文件
rabbitmq: host: 192.168.233.128 port: 5672 virtual-host: / username: guest password: guest listener: simple: acknowledge-mode: manual # 手动ack确认
消费者实现
@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE}) public void watchQueue(Message message, Channel channel) throws Exception { try { String routingKey = message.getMessageProperties().getReceivedRoutingKey(); log.info("routingKey = " + routingKey); int a = 1/0; // 模拟异常 String msg = new String(message.getBody()); log.info("msg = " + msg); /** * long deliveryTag: 消息投递的标签 * boolean multiple: 批量确认所有消费者获得的消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception e) { e.printStackTrace(); /** * long deliveryTag * boolean multiple * boolean requeue: true:重回队列 ;false:丢弃消息 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false); } }
消费端消息拉取限流
配置文件
rabbitmq: host: 192.168.233.128 port: 5672 virtual-host: / username: guest password: guest listener: simple: acknowledge-mode: manual # 手动ack确认 prefetch: 2 # 每次每个消费者从mq中拉去的消息的数量,直到手动ack确认之后,才会拉取下一个消息
模拟场景:
生产者连续发10条消息
for (int i = 0 ; i < 10 ; i ++) { rabbitTemplate.convertAndSend(RabbitMQSMSConfig.SMS_EXCHANGE, RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN, GsonUtils.object2String(contentQO), new CorrelationData(UUID.randomUUID().toString())); }
消费者在手动ack的地方断点
查看RabbitMQ管理平台,每次确实只从队列中拉取2条消息
设置消息过期失效(移除消息)
在RabbitMQSMSConfig配置类中
// 创建队列 @Bean(SMS_QUEUE) public Queue queue() { return QueueBuilder .durable(SMS_QUEUE) .withArgument("x-message-ttl", 30*1000) .build(); }
.withArgument(“x-message-ttl”, 30*1000) 表示30秒后,消息自动失效
注意:只能对新的队列生效,对老的队列需要先在RabbitMQ管理平台中删除老队列
测试方法
可以先将消费者注释掉,或者不启动消费者
生产者生产的消息,过一段时间在RabbitMQ管理平台看看是否自动消失
死信队列
以下3中消息都会进入死信队列
1、 ttl超时被丢弃的消息
2、超过队列长度被丢弃的消息
3、手动nack或者reject,并且requeue为false的消息
死信队列配置类
package com.imooc.api.mq; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 的配置类(用于死信队列的配置) */ @Configuration public class RabbitMQSMSConfig_Dead { // 定义交换机的名称 public static final String SMS_EXCHANGE_DEAD = "sms_exchange_dead"; // 定义队列的名称 public static final String SMS_QUEUE_DEAD = "sms_queue_dead"; // 统一定义路由key public static final String ROUTING_KEY_SMS_DEAD = "dead.sms.display"; // 创建交换机 @Bean(SMS_EXCHANGE_DEAD) public Exchange exchange() { return ExchangeBuilder .topicExchange(SMS_EXCHANGE_DEAD) .durable(true) .build(); } // 创建队列 @Bean(SMS_QUEUE_DEAD) public Queue queue() { return QueueBuilder .durable(SMS_QUEUE_DEAD) .build(); } // 创建绑定关系 @Bean public Binding smsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD) Exchange exchange, @Qualifier(SMS_QUEUE_DEAD) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("dead.sms.*") .noargs(); } }
正常队列配置类中添加死信队列配置
package com.imooc.api.mq; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 的配置类 */ @Configuration public class RabbitMQSMSConfig { // 定义交换机的名称 public static final String SMS_EXCHANGE = "sms_exchange"; // 定义队列的名称 public static final String SMS_QUEUE = "sms_queue"; // 统一定义路由key public static final String ROUTING_KEY_SMS_SEND_LOGIN = "imooc.sms.send.login"; // 创建交换机 @Bean(SMS_EXCHANGE) public Exchange exchange() { return ExchangeBuilder .topicExchange(SMS_EXCHANGE) .durable(true) .build(); } // 创建队列 @Bean(SMS_QUEUE) public Queue queue() { return QueueBuilder .durable(SMS_QUEUE) .withArgument("x-message-ttl", 30*1000) // 队列中消息过期时间,过期进入死信队列 .withArgument("x-dead-letter-exchange", RabbitMQSMSConfig_Dead.SMS_EXCHANGE_DEAD) // 死信队列交换机 .withArgument("x-dead-letter-routing-key", RabbitMQSMSConfig_Dead.ROUTING_KEY_SMS_DEAD) // 死信队列路由 .withArgument("x-max-length", 6) // 队列消息个数最大值,超出的进入死信队列 .build(); } // 创建绑定关系 @Bean public Binding smsBinding(@Qualifier(SMS_EXCHANGE) Exchange exchange, @Qualifier(SMS_QUEUE) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("imooc.sms.#") .noargs(); } }
死信队列消费者
package com.imooc.mq; import com.imooc.api.mq.RabbitMQSMSConfig_Dead; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 死信队列监听消费者 */ @Slf4j @Component public class RabbitMQSMSConsumer_Dead { /** * * @param message * @param channel * @throws Exception */ @RabbitListener(queues = {RabbitMQSMSConfig_Dead.SMS_QUEUE_DEAD}) public void watchQueue(Message message, Channel channel) throws Exception { log.info("++++++++++++++++++++++++++++++"); String routingKey = message.getMessageProperties().getReceivedRoutingKey(); log.info("routingKey = " + routingKey); String msg = new String(message.getBody()); log.info("msg = " + msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.info("++++++++++++++++++++++++++++++"); } }
测试方法:正常的消费者关闭,设置队列长度为6,然后发10条消息,会发现,首先有4条超过队列长度的消息,进入死信队列消费者,然后,到了消息过期时间后,又有6条消息进入死信队列中被死信消费者消费