Spring Boot 整合RabbitMQ
搭建环境
创建测试项目:test_rabbitmq_boot
添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>xc_test_parent</artifactId> <groupId>com.czxy.xuecheng</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>test_rabbitmq_boot</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
添加yml文件
server: port: 8090 spring: application: name: test_rabbitmq_producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest passowrd: guest virtualHost: /
创建启动类:TestRabbitMQBootApplication
package com.czxy.xuecheng; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /> * Created by liangtong. */ @SpringBootApplication public class TestRabbitMQBootApplication { public static void main(String[] args) { SpringApplication.run(TestRabbitMQBootApplication.class, args); } }
配置类
package com.czxy.xuecheng.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /> * Created by liangtong. */ @Configuration public class RabbitConfig { // 交换机名称 public static final String EXCHANGE_TOPIC_INFORM = "inform_exchange_topic"; //队列名称 public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /> 交换机配置 * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 * channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC); * @return */ @Bean(EXCHANGE_TOPIC_INFORM) public Exchange exchange_topic() { //durable(true)持久化,消息队列重启后交换机仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build(); } /> * 声明队列 * channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); * @return */ @Bean(QUEUE_INFORM_SMS) public Queue queue_inform_sms(){ return new Queue(QUEUE_INFORM_SMS); } @Bean(QUEUE_INFORM_EMAIL) public Queue queue_inform_email(){ return new Queue(QUEUE_INFORM_EMAIL,true,false,false); } /> * 绑定队列到交换机 * channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPIC_INFORM, "inform.#.email.#"); * @param queue * @param exchange * @return */ @Bean public Binding binding_queue_inform_sms(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); } @Bean public Binding binding_queue_inform_email(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); } }
生产者
package com.czxy.xuecheng; import com.czxy.xuecheng.config.RabbitConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; /> * Created by liangtong. */ @RunWith(SpringRunner.class) @SpringBootTest(classes = TestRabbitMQBootApplication.class) public class Producer05Topic { @Resource private RabbitTemplate rabbitTemplate; @Test public void testSendEmail() { //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.email", null, message.getBytes()); for(int i = 0 ; i < 5 ; i ++) { String message = "email inform to user" + i; rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.email",message); System.out.println("Send Message is:'" + message + "'"); } } @Test public void testSendSms() { //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms", null, message.getBytes()); for(int i = 0 ; i < 5 ; i ++) { String message = "sms inform to user" + i; rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms",message); System.out.println("Send Message is:'" + message + "'"); } } @Test public void testSendSmsAndEmail() { //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms.email", null, message.getBytes()); for(int i = 0 ; i < 5 ; i ++) { String message = "sms and email inform to user" + i; rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms.email",message); System.out.println("Send Message is:'" + message + "'"); } } }
消费者
package com.czxy.xuecheng.listener; import com.czxy.xuecheng.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /> * Created by liangtong. */ @Component public class Consumer05Topic { @RabbitListener(queues = RabbitConfig.QUEUE_INFORM_EMAIL) public void receiveEmail(String msg , Message message){ System.out.println("receive message is:" + msg); } /* @RabbitListener(queues = RabbitConfig.QUEUE_INFORM_SMS) public void receiveSmS(String msg , Message message){ System.out.println("receive message is:" + msg); } */ }