依赖、配置
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置:
spring:
rabbitmq:
host: 192.168.31.10
port: 5672 #通过控制台可以查看
username: guest
password: guest
virtual-host: /vhost_sys_logs #可以不配置,会使用的是默认virtual-host
简单队列
模型
简单队列,consumer和producer通过队列直连。
代码示例
配置类:
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public Queue queue() { //name,名字;durable,是否开启持久化 return new Queue("logs",false); } }
生产者:
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes=PrivilegeSystemMain.class) public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void simpleTest() { rabbitTemplate.convertAndSend("logs","helo world!"); } }
消费者:
@Component @Slf4j public class ConsumeBean { @RabbitListener(queues={"logs"}) public void getMsg(String message){ log.info(message); } }
工作队列
模型
工作队列(work queue),让多个消费者去消费同一个消息队列中的消息,支持轮询分发(默认)、公平分发两种分发模式。
代码示例
配置类:
@Configuration public class RabbitMQConfig { @Bean public Queue queue() { //name,名字;durable,是否开启持久化 return new Queue("logs",false); } }
生产者:
@SpringBootTest(classes=PrivilegeSystemMain.class) public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void simpleTest() { rabbitTemplate.convertAndSend("logs","helo world 01!"); rabbitTemplate.convertAndSend("logs","helo world 02!"); } }
消费者:
@Component @Slf4j public class ConsumeBean { @RabbitListener(queues={"logs"}) public void consumer_01(String message){ log.info("consumer_01 get message "+message); } @RabbitListener(queues={"logs"}) public void consumer_02(String message){ log.info("consumer_02 get message "+message); } }
订阅模式
模型
订阅模式(fanout),也叫广播模式,见名知意,其特点是将消息广播出去。通过交换机将生产者生产的消息分发到多个队列中去,从而支持生产者生产的一个消息被多个消费者消费。
代码示例
配置类:
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public Queue queue_01() { //name,名字;durable,是否开启持久化 return new Queue("queue_01",false); } @Bean public Queue queue_02(){ return new Queue("queue_02",false); } //订阅模式的交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout_exchange",false,false); } //将队列绑定到交换机上 @Bean public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logsAccess).to(fanoutExchange); } @Bean public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logsError).to(fanoutExchange); } }
生产者:
@SpringBootTest(classes=PrivilegeSystemMain.class) public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void simpleTest() { rabbitTemplate.convertAndSend("fanout_exchange","","helo world 01!"); } }
消费者:
@Component @Slf4j public class ConsumeBean { @RabbitListener(queues={"queue_01"}) public void consumer_01(String message){ log.info("consumer_01 get message "+message); } @RabbitListener(queues={"queue_02"}) public void consumer_02(String message){ log.info("consumer_02 get message "+message); } }
路由模式
模型
路由模式(direct),在订阅模式支持一条消息被多个消费者消费的特性上增加了分类投递的特性,通过交换机,支持消息以类别(routing key)的方式投送到不同的消息队列中去。
代码示例
配置类:
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public Queue queue_01() { //durable,是否开启持久化 return new Queue("queue_01",false); } @Bean public Queue queue_02(){ return new Queue("queue_02",false); } //路由模式的交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("direct_exchange",false,false); } //将队列绑定到交换机上 @Bean public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, DirectExchange directExchange) { return BindingBuilder.bind(logsAccess).to(directExchange).with("routing_key_01"); } @Bean public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, DirectExchange directExchange) { return BindingBuilder.bind(logsError).to(directExchange).with("routing_key_02"); } }
生产者:
@SpringBootTest(classes=PrivilegeSystemMain.class) public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void simpleTest() { rabbitTemplate.convertAndSend("direct_exchange","routing_key_01","helo world!"); } }
消费者:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ConsumeBean { @RabbitListener(queues={"queue_01"}) public void consumer_01(String message){ log.info("consumer_01 get message "+message); } @RabbitListener(queues={"queue_02"}) public void consumer_02(String message){ log.info("consumer_02 get message "+message); } }
主题模式
模型
主题模式,也叫通配符模式,在路由模式以类别进行消息投送的基础上增加了对通配符的支持,这样就可以使用通配符将多个类别聚合成一个主题。
代码示例
配置类:
@Configuration public class RabbitMQConfig { @Bean public Queue queue_01() { //durable,是否开启持久化 return new Queue("queue_01",false); } @Bean public Queue queue_02(){ return new Queue("queue_02",false); } //主题模式的交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("topic_exchange",false,false); } //将队列绑定到交换机上 @Bean public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, TopicExchange topicExchange) { return BindingBuilder.bind(logsAccess).to(topicExchange).with("#.error.#"); } @Bean public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, TopicExchange topicExchange) { return BindingBuilder.bind(logsError).to(topicExchange).with("#.info.#"); } }
生产者:
@SpringBootTest(classes=PrivilegeSystemMain.class) public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void simpleTest() { rabbitTemplate.convertAndSend("topic_exchange","test.error.test","helo world!"); } }
消费者:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ConsumeBean { @RabbitListener(queues={"queue_01"}) public void consumer_01(String message){ log.info("consumer_01 get message "+message); } @RabbitListener(queues={"queue_02"}) public void consumer_02(String message){ log.info("consumer_02 get message "+message); } }
RPC
几乎不会使用,略......