Topic交换机
在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green.
在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定键为 blackgreen.和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。
Topic要求
发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse” ,“nyse.xvmw”,
“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#|(井号)可以替代零个或多个单词
Topic交换机(消费者)
public class ReceiveLogsTopic01 { //交换机的名称 public static final String EXCHANGE_NAME = "topic_logs"; //接收消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //声明队列 String queueName = "Q1"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*"); System.out.println("接收消息"); //接收消息 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8")); }; channel.basicConsume(queueName,true,deliverCallback,consumeTag->{}); } }
public class ReceiveLogsTopic02 { //交换机的名称 public static final String EXCHANGE_NAME = "topic_logs"; //接收消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //声明队列 String queueName = "Q2"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#"); System.out.println("接收消息"); //接收消息 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8")); }; channel.basicConsume(queueName,true,deliverCallback,consumeTag->{}); } }
Topic交换机(生产者)
public class EmitLogTopic { //交换机的名称 public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Map<String,String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","1"); bindingKeyMap.put("lazy.orange.elephant","2"); bindingKeyMap.put("quick.orange.fox","3"); bindingKeyMap.put("lazy.brown.fox","4"); bindingKeyMap.put("lazy.pink.rabbit","5"); bindingKeyMap.put("quick.brown.fox","6"); bindingKeyMap.put("quick.orange.male.rabbit","7"); bindingKeyMap.put("lazy.orange.male.rabbit","8"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes()); System.out.println("生产者发出消息"+message); } } }
运行结果:
死信队列
某些时候由于特定的原因导致queue中的某些信息无法被消费,但这样的消息如果没有后续的处理,就变成死信,有死信自然就有了死信队列
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq.中)
消息被拒绝(basic.reject或 basic.nack)并且requeue=false
死信实战
死信实战(消费者1)
public class Consumer01 { //普通交换机的名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机的名称 public static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列的名称 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列的名称 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信和普通交换机,类型为direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //声明死信队列和普通队列 Map<String ,Object> arguments = new HashMap<>(); //过期时间 //正常队列设置死信交换机 //过期时间 10s-10000ms //arguments.put("x-message-ttl",1000000); arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信RoutineKey arguments.put("x-dead-letter-routing-key","lisi"); channel.queueDeclare(NORMAL_QUEUE,false,false,false,null); channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //绑定普通的交换机与队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); //绑定死信的交换机与队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("Consumer01接收的消息是"+new String(message.getBody(),"UTF-8")); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{}); } }
死信实战(生产者)
public class Producer { //普通交换机的名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //死信消息,设置TTL时间 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for(int i=1;i<11;i++){ String message = "info"+i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes()); } } }
死信实战(消费者2)
public class Consumer02 { //死信队列的名称 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("Consumer02接收的消息是"+new String(message.getBody(),"UTF-8")); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{}); } }
延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列使用场景
1.订单在十分钟之内未支付则自动取消
⒉.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
3.用户注册成功后,如果三天内没有登{陆则进行短信提醒
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
整合SpringBoot
第一步:导入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.10</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.yc.springbootrabbitmq</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--RabbitMQ依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
第二步:导入Swagger配置类
@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(){ return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(){ return new ApiInfoBuilder() .title("rabbitmq接口文档") .description("文档描述了rabbitmq微服务接口定义") .version("1.0") .contact(new Contact("enjoy6288","http://atguigu.com","2439317465@qq.com")) .build(); } }
队列TTL
第一步:配置文件类代码
//Ttl队列 配置文件类代码 @Configuration public class TtlQueueConfig { //普通交换机的名称 public static final String X_EXCHANGE = "X"; //死信交换机的名称 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列的名称 public static final String QUEUE_A="QA"; public static final String QUEUE_B="QB"; //死信队列的名称 public static final String DEAD_LETTER_QUEUE="QD"; //普通队列的名称 public static final String QUEUE_C="QC"; @Bean("queueC") public Queue queueC(){ Map<String,Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutineKey arguments.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } //声明xExchange 别名 @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } //声明yExchange 别名 @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明普通队列 TTL 为 10s @Bean("queueA") public Queue queueA(){ Map<String,Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); //设置TTL arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } //声明普通队列 TTL 为 10s @Bean("queueB") public Queue queueB(){ Map<String,Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); //设置TTL arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } //死信队列 @Bean("queueD") public Queue queueD(){ return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } //绑定 @Bean public Binding queueABingdingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //绑定 @Bean public Binding queueBBingdingY(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //绑定 @Bean public Binding queueDBingdingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } }
第二步:配置生产者
@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message); rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message); } //开始发消息 消息 TTL @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message); rabbitTemplate.convertAndSend("X","XC",message,msg->{ //发送消息的时候 延迟时长 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } }
第三步:配置消费者
@Slf4j @Component public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws Exception{ String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg); } }
基于死信存在的问题
一旦发出两条以上消息,看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
Rabbitmq插件实现延迟队列
安装延时队列插件
在官网上下载https://www.rabbitmg.com/community-plugins.html,下载
rabbitmq_delayed_message_exchange插件,然后解压放置到 RabbitMQ的插件目录。进入RabbitMQ.的安装目录下的plgins.目录,执行下面命令让该插件生效,然后重启RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq _server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
之前:
现在:
示例代码:
第一步:创建配置类
@Configuration public class DelayedQueueConfig { //交换机 public static final String DELAYED_EXCHANGE_NAME="delayed.exchange"; //队列 public static final String DELAYED_QUEUE_NAME="delayed.queue"; //routingKey public static final String DELAYED_ROUTING_KEY="delayed.routingkey"; @Bean public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE_NAME); } //声明交换机 @Bean public CustomExchange delayedExchange(){ Map<String,Object> arguments = new HashMap<>(); arguments.put("x-delayed-type","direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false); } //绑定 public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue ,@Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
第三步:创建消费者
//基于插件的延迟消息 @Slf4j @Component public class DelayQueueConsumer { //监听消息 @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayQueue(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的信息:{}",new Date().toString(),msg); } }
运行结果: