3.5 发布/订阅消息的生产和消费
3.5.1 发布/订阅消息的生产
和点对点消息一样,我们注入 topic 并调用 producer 的 sendMessage
方法即可发送订阅消息,如下,不再赘述:
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Resource
private MsgProducer producer;
@Resource
private Destination topic;
@GetMapping("/send/topic")
public String sendTopicMessage() {
logger.info("===开始发送订阅消息===");
producer.sendMessage(topic, "Topic: hello activemq!");
return "success";
}
}
3.5.2 发布/订阅消息的消费
发布/订阅消息的消费和点对点不同,订阅消息支持多个消费者一起消费。其次,Spring Boot 中默认的时点对点消息,所以在使用 topic 时,会不起作用,我们需要在配置文件 application.yml 中添加一个配置:
spring:
jms:
pub-sub-domain: true
该配置是 false 的话,则为点对点消息,也是 Spring Boot 默认的。这样是可以解决问题,但是如果这样配置的话,上面提到的点对点消息又不能正常消费了。所以二者不可兼得,这并非一个好的解决办法。
比较好的解决办法是,我们定义一个工厂,@JmsListener
注解默认只接收 queue 消息,如果要接收 topic 消息,需要设置一下 containerFactory。我们还在上面的那个 ActiveMqConfig 配置类中添加:
/**
* activemq的配置
*
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
// 省略其他内容
/**
* JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory
*/
@Bean
public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 相当于在application.yml中配置:spring.jms.pub-sub-domain=true
factory.setPubSubDomain(true);
return factory;
}
}
经过这样的配置之后,我们在消费的时候,在 @JmsListener
注解中指定这个容器工厂即可消费 topic 消息。如下:
/**
* Topic消息消费者
* @author shengwu ni
*/
@Service
public class TopicConsumer1 {
/**
* 接收订阅消息
* @param msg
*/
@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
public void receiveTopicMsg(String msg) {
System.out.println("收到的消息为:" + msg);
}
}
指定 containerFactory 属性为上面我们自己配置的 topicListenerContainer 即可。由于 topic 消息可以多个消费,所以该消费的类可以拷贝几个一起测试一下,这里我就不贴代码了,可以参考我的源码测试。
3.5.3 测试一下
启动项目,在浏览器中输入:http://localhost:8081/activemq/send/topic
,观察控制台的输出日志,出现下面的日志说明消息发送和消费成功。
收到的消息为:Topic: hello activemq!
收到的消息为:Topic: hello activemq!
4. 总结
本章主要介绍了 jms 和 activemq 的相关概念、activemq 的安装与启动。详细分析了 Spring Boot 中点对点消息和发布/订阅消息两种方式的配置、消息生产和消费方式。ActiveMQ 是能力强劲的开源消息总线,在异步消息的处理上很有用,希望大家好好消化一下。
课程源代码下载地址:戳我下载