(3).消费者
1. 设置监听的那个队列? 2. 监听到的数据,输出在哪?
package com.jsxs.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:45 * @PackageName:com.jsxs.service.faout * @ClassName: EmailConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"email.direct.queue"}) // 这个客户端的队列是哪个? public class EmailConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("email接收到的信息是:->"+message); } }
message
1. 设置监听的那个队列? 2. 监听到的数据,输出在哪?
package com.jsxs.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: MessageConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"message.direct.queue"}) // 这个客户端的队列是哪个? public class MessageConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("Message接收到的信息是:->"+message); } }
sms
1. 设置监听的那个队列? 2. 监听到的数据,输出在哪?
package com.jsxs.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: SmsConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"sms.direct.queue"}) // 这个客户端的队列是哪个? public class SmsConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("sms接收到的信息是:->"+message); } }
1. 设置监听的那个队列? 2. 监听到的数据,输出在哪?
package com.jsxs.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: WechatConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"wechat.direct.queue"}) // 这个客户端的队列是哪个? public class WechatConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("wechat接收到的信息是:->"+message); } }
服务端先提供消息
客户端接收消息: 接受各自的消息,通过路由key进行区分的
4.主题模式 (Topic)
所有的模式都可以使用注解配置和配置类配置,这里我们用注解进行配置
(1).生产者
package com.jsxs.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.UUID; /** * @Author Jsxs * @Date 2023/4/2 11:24 * @PackageName:com.jsxs.service * @ClassName: OrderService * @Description: TODO : 分别给对应的路由Key发送消息. * @Version 1.0 */ @Service public class OrderService { @Resource // 获取rabbitMQ的服务 private RabbitTemplate rabbitTemplate; /** * * @param userId * @param productID * @param num */ public void makeOrder(String userId,String productID,int num){ //1. 生成订单 String orderID = UUID.randomUUID().toString().replace("-",""); System.out.println("订单号已经生产成功-"+orderID); //2. 设置交换机名字和路由 String exchangeName="topic_order_producer"; //3. 发送消息 // 参数: (交换机、路由key或队列名、消息内容) rabbitTemplate.convertAndSend(exchangeName,"sms","1"); rabbitTemplate.convertAndSend(exchangeName,"sms","2"); rabbitTemplate.convertAndSend(exchangeName,"sms","3"); rabbitTemplate.convertAndSend(exchangeName,"sms","4"); } }
(2).消费者
1. 绑定 2. 生命队列 3. 声明交换机 4. 路由key
package com.jsxs.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:45 * @PackageName:com.jsxs.service.faout * @ClassName: EmailConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(bindings = @QueueBinding( // 利用注解声明队列 value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"), // 利用注解声明交换机 exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC), // 路由key是 "#.sms.#" key = "#.sms.#" )) public class EmailConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("email接收到的信息是:->"+message); } }
message
package com.jsxs.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: MessageConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(bindings = @QueueBinding( // 利用注解声明队列 value = @Queue(value = "message.topic.queue",durable = "true",autoDelete = "false"), // 利用注解声明交换机 exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC), // 路由key是 "#.sms.#" key = "#.sms.#" )) public class MessageConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("Message接收到的信息是:->"+message); } }
sms
package com.jsxs.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: SmsConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(bindings = @QueueBinding( // 利用注解声明队列 value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"), // 利用注解声明交换机 exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC), // 路由key是 "#.sms.#" key = "#.sms.#" )) public class SmsConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("sms接收到的信息是:->"+message); } }
package com.jsxs.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: WechatConsumer * @Description: TODO * @Version 1.0 */ @Service // 监听的同时 我们对其进行绑定 @RabbitListener(bindings = @QueueBinding( // 利用注解声明队列 value = @Queue(value = "wechat.topic.queue",durable = "true",autoDelete = "false"), // 利用注解声明交换机 exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC), // 路由key是 "#.sms.#" key = "#.sms.#" )) public class WechatConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("wechat接收到的信息是:->"+message); } }
我们模糊查询的是只要包含的有 sms ,就发送信息,
(五)、RabbitMQ高级
1.过期时间TTL (队列)
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置 x-message-ttl
- 第一种方法是通过
队列属性
设置,队列中所有消息都有相同的过期时间 - 第二种方法是对
消息进行
单独设置,每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息.
1. 设置队列TTL
(1).生产者
package com.jsxs.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.UUID; /** * @Author Jsxs * @Date 2023/4/2 11:24 * @PackageName:com.jsxs.service * @ClassName: OrderService * @Description: TODO : 分别给对应的路由Key发送消息. * @Version 1.0 */ @Service public class OrderService { @Resource // 获取rabbitMQ的服务 private RabbitTemplate rabbitTemplate; /** * * @param userId * @param productID * @param num */ public void makeOrder(String userId,String productID,int num){ //1. 生成订单 String orderID = UUID.randomUUID().toString().replace("-",""); System.out.println("订单号已经生产成功-"+orderID); //2. 设置交换机名字和路由 String exchangeName="ttl_order_producer"; //3. 发送消息 // 参数: (交换机、路由key或队列名、消息内容) rabbitTemplate.convertAndSend(exchangeName,"ttl","1"); rabbitTemplate.convertAndSend(exchangeName,"ttl","2"); rabbitTemplate.convertAndSend(exchangeName,"ttl","3"); rabbitTemplate.convertAndSend(exchangeName,"ttl","4"); } }
(2).配置文件
1. 声明交换机: (假如交换机已经被定义了,我们通过代码对其进行修改属性,那么我们的代码一定会报错的。) 2. 我们声明队列的同时: 要通过HashMap定义他的过期时间和传参 (队列名、是否持久化、是否自动删除、是否、参数) 3. 将队列绑定我们的交换机 4. "x-message-ttl 这是key值
package com.jsxs.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author Jsxs * @Date 2023/4/3 16:16 * @PackageName:com.jsxs.config * @ClassName: TTLRabbitMQConfig * @Description: TODO * @Version 1.0 */ import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import java.util.HashMap; @Configuration public class TTLRabbitMQConfig { // 1. 声明注册direct模式的交换机 @Bean public DirectExchange directExchange(){ // (交换机的名字、是否持久化。是否自动删除) return new DirectExchange("ttl_order_producer",true,false); } // 2. 声明队列: 以及过期时间 @Bean public Queue SmsQueue(){ HashMap<String, Object> args = new HashMap<>(); args.put("x-message-ttl",5000); return new Queue("sms.ttl.queue",true,false,false,args); } @Bean public Queue MessageQueue(){ HashMap<String, Object> args = new HashMap<>(); args.put("x-message-ttl",5000); return new Queue("message.ttl.queue",true,false,false,args); } @Bean public Queue EmailQueue(){ HashMap<String, Object> args = new HashMap<>(); args.put("x-message-ttl",5000); return new Queue("email.ttl.queue",true,false,false,args); } @Bean public Queue WeChatQueue(){ HashMap<String, Object> args = new HashMap<>(); args.put("x-message-ttl",5000); return new Queue("wechat.ttl.queue",true,false,false,args); } // 3. 将队列与交换机进行绑定的操作 @Bean public Binding SmsBind(){ return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("ttl"); } @Bean public Binding MessageBind(){ return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("ttl"); } @Bean public Binding EmailBind(){ return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("ttl"); } @Bean public Binding WechatBind(){ return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("ttl"); } }
(3).消费者
1 .email
package com.jsxs.service.ttl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:45 * @PackageName:com.jsxs.service.faout * @ClassName: EmailConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"email.ttl.queue"}) // 这个客户端的队列是哪个? public class EmailConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("email接收到的信息是:->"+message); } }
- message
package com.jsxs.service.ttl; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author Jsxs * @Date 2023/4/2 13:44 * @PackageName:com.jsxs.service.faout * @ClassName: MessageConsumer * @Description: TODO * @Version 1.0 */ @Service @RabbitListener(queues = {"message.ttl.queue"}) // 这个客户端的队列是哪个? public class MessageConsumer { @RabbitHandler // 接收到的消息放在这 public void receiveMessage(String message){ System.out.println("Message接收到的信息是:->"+message); } }