直接(direct)交换机:交换机可以通过路由(routingKey)与队列进行绑定,在接收到生产者发来消息后,通过路由发送给指定队列,从而达到指定消费者消费。
案例需求如下:
- 利用 @RabbitListener 声明 Exchange、Queue、RoutingKey
- 在 consumer 服务中,编写两个消费者方法,分别监听
business.test.direct.queue1
和business.test.direct.queue2
- 在publisher中编写测试方法,向
business.test.exchange.direct
交换机发送消息
pom
<dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
yml
server: port: 8080 spring: rabbitmq: host: **.***.**.*** port: 5672 username: ** password: **
基于注解声明队列和交换机
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author: Snow * @date: 2023/1/5 * ************************************************** * 修改记录(时间--修改人--修改说明): */ @RestController @RequestMapping("/direct") public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/send/{message}") public String send(@PathVariable("message") String message){ // 交换机名称 String exchangeName = "business.test.exchange.direct"; if(message.contains("red")){ rabbitTemplate.convertAndSend(exchangeName, "red", message); } if(message.contains("blue")){ rabbitTemplate.convertAndSend(exchangeName, "blue", message); } if(message.contains("yellow")){ rabbitTemplate.convertAndSend(exchangeName, "yellow", message); } return message; } }
消费者
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author: Snow * @date: 2023/1/5 * ************************************************** * 修改记录(时间--修改人--修改说明): */ @Component public class Consumer { // 声明和绑定在一起的 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "business.test.direct.queue1"), exchange = @Exchange(name = "business.test.exchange.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "business.test.direct.queue2"), exchange = @Exchange(name = "business.test.exchange.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到queue2的消息:【" + msg + "】"); } }