前言
SpringBoot 中使用消息队列无非就是集成消息队列的客户端,这里主要有有两种方式,一种是使用消息队列提供的原生API,但是需要手动经历创建连接工厂、创建连接、创建信道,以及配置主机、配置端口等等一系列比较繁琐的过程,使用起来非常麻烦。
这里更推荐的一种方式是使用 SpringAMQP,SpringAMQP 是对 java 原生客户端进行了一层封装,使用成本大大降低了。其中,AMQP 是一种高级消息队列协议,用于在应用程序之间传递业务消息的开放标准,在 Spring 中它的底层默认实现使用的是 RabbitMQ。
下面我们就学习一下如何集成使用 SpringAMQP,关于消息队列的基础知识和核心概念可以参考:RabbitMQ 消息队列
一、添加 SpringAMQP 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、添加配置信息
spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: yourHostName username: yourName password: yourPassword
三、声明队列、交换机、绑定
1、使用类声明
SpringAMQP 中提供了 Queue、Exchange、Binding 这样几个类用于我们声明队列、交换机、绑定。
@Configuration public class DirectConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("simple.direct1"); } @Bean public Queue directQueue() { return new Queue("simple.queue1"); } // 绑定队列simple.queue1和交换机simple.direct1 @Bean public Binding errorBinding(Queue directQueue,DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with("testKey"); } // 相同的方式,声明第二个队列、交换机、绑定。略…… }
2、使用注解声明
@Component public class MqListener { // Direct交换机示例 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "simple.queue1"), exchange = @Exchange(name = "simple.direct1",type = ExchangeTypes.DIRECT), key = {"testKey1","testKey2"} )) public void listenDirectQueue1(String msg) { System.out.println("消费者:收到的消息:"+ msg); } // 相同的方式,声明第二个队列、交换机、绑定。略…… }
这里更推荐使用注解的方式,因为使用注解更简单、更灵活。就以一个对列绑定队列时指定多个key为例,使用类的方式需要重复写两次Binding类,使用注解只需要在key属性后添加即可。
四、实现消息的基本收发
1、发送消息
@Component public class TestSendMsg { @Autowired private RabbitTemplate rabbitTemplate; public void testSendMessageToQueue() { String exchangeName = "simple.direct1"; String routingKey = "testKey"; // 发消息到指定队列 Map<String,Object> msg = new HashMap<>(); msg.put("name","jack"); msg.put("age",22); rabbitTemplate.convertAndSend(exchangeName,routingKey,msg); } }
发送消息需要借助 SpringAMQP 提供的 RabbitTemplate 工具类,因此需要注入 RabbitTemplate 对象.
发送消息调用 RabbitTemplate 对象的 convertAndSend() 方法。发送消息需要指定交换机名、routingKey、消息本体。(PS:如果是Fanout交换机,routingKey参数可以为null或空字符串
“”)
.这里需要注意如果,我们传入的是一个非字符串或非字节类型的对象时,默认Spring对消息的处理是由MessageConverter处理的,而默认实现是SimpleMessageConverter,基于JDK的OBjectOutputStream完成序列化,所以此时我们在RabbitMQ的管理页面看到的消息时序列化的结果,可读性较差。如果想要获取到可读性更强的消息,可以采用JSON序列化替代JDK序列化:
引入JSON依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
修改转换规则
@Configuration public class Config { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
2、接收消息
@Component public class MqListener { // Direct交换机示例 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "simple.queue1"), exchange = @Exchange(name = "simple.direct1",type = ExchangeTypes.DIRECT), key = {"testKey1","testKey2"} )) public void listenDirectQueue1(String msg) { System.out.println("消费者:收到的消息:"+ msg); // 处理逻辑…… } }
- 这里使用 SpringAMQP 提供的声明式的消息监听,通过注解在方法上声明要监听的对列名称,将来SpringAMQP就会把接收到的消息传递给当前方法,同时也可以用于队列、交换机、绑定的声明。
- 这里的消息接收类型取决于你发送时指定的消息类型。