RabbitMQ 的扇出(fanout)交换机是将接收到到消息广播给它知道的所有队列,从而实现生产者发送一条消息,可以供多个消费者消费。
我们的计划是这样的:
- 创建一个交换机
business.test.exchange.fanout
,类型是Fanout
- 创建两个队列
business.test.queue1
和business.test.queue2
,绑定到交换机business.test.exchange.fanout
pom
<dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
yml
server: port: 8080 spring: rabbitmq: host: **.***.**.*** port: 5672 username: ** password: **
config
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { /** 创建一个交换机 类型是fanout ,创建两个队列 绑定到该交换机上 */ /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("business.test.exchange.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("business.test.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("business.test.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalTime; /** * @author: Snow * @date: 2023/1/5 * ************************************************** * 修改记录(时间--修改人--修改说明): */ @RestController @RequestMapping("/fanout") public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(){ // 队列名称 String exchangeName = "business.test.exchange.fanout"; // 消息 String message = "hello, fanout! " + LocalTime.now(); rabbitTemplate.convertAndSend(exchangeName, "", message); return message; } }
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author: Snow * @date: 2023/1/5 * ************************************************** * 修改记录(时间--修改人--修改说明): */ @Component public class Consumer { @RabbitListener(queues = "business.test.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "business.test.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); } }
测试结果
访问 http://localhost:8080/fanout/send
控制台打印
消费者1接收到Fanout消息:【hello, fanout! 09:06:01.540】 消费者2接收到Fanout消息:【hello, fanout! 09:06:01.540】