3. ActiveMQ集成
3.1 依赖导入和配置
在 Spring Boot 中集成 ActiveMQ 需要导入如下 starter 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
然后在 application.yml 配置文件中,对 activemq 做一下配置:
spring:
activemq:
# activemq url
broker-url: tcp://localhost:61616
in-memory: true
pool:
# 如果此处设置为true,需要添加activemq-pool的依赖包,否则会自动配置失败,无法注入JmsMessagingTemplate
enabled: false
3.2 Queue 和 Topic 的创建
首先我们需要创建两种消息 Queue 和 Topic,这两种消息的创建,我们放到 ActiveMqConfig 中来创建,如下:
/**
* activemq的配置
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
/**
* 发布/订阅模式队列名称
*/
public static final String TOPIC_NAME = "activemq.topic";
/**
* 点对点模式队列名称
*/
public static final String QUEUE_NAME = "activemq.queue";
@Bean
public Destination topic() {
return new ActiveMQTopic(TOPIC_NAME);
}
@Bean
public Destination queue() {
return new ActiveMQQueue(QUEUE_NAME);
}
}
可以看出创建 Queue 和 Topic 两种消息,分别使用 new ActiveMQQueue
和 new ActiveMQTopic
来创建,分别跟上对应消息的名称即可。这样在其他地方就可以直接将这两种消息作为组件注入进来了。
3.3 消息的发送接口
在 Spring Boot 中,我们只要注入 JmsMessagingTemplate 模板即可快速发送消息,如下:
/**
* 消息发送者
* @author shengwu ni
*/
@Service
public class MsgProducer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendMessage(Destination destination, String msg) {
jmsMessagingTemplate.convertAndSend(destination, msg);
}
}
convertAndSend
方法中第一个参数是消息发送的目的地,第二个参数是具体的消息内容。
3.4 点对点消息生产与消费
3.4.1 点对点消息的生产
消息的生产,我们放到 Controller 中来做,由于上面已经生成了 Queue 消息的组件,所以在 Controller 中我们直接注入进来即可。然后调用上文的消息发送方法 sendMessage
即可成功生产一条消息。
/**
* ActiveMQ controller
* @author shengwu ni
*/
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Resource
private MsgProducer producer;
@Resource
private Destination queue;
@GetMapping("/send/queue")
public String sendQueueMessage() {
logger.info("===开始发送点对点消息===");
producer.sendMessage(queue, "Queue: hello activemq!");
return "success";
}
}
3.4.2 点对点消息的消费
点对点消息的消费很简单,只要我们指定目的地即可,jms 监听器一直在监听是否有消息过来,如果有,则消费。
/**
* 消息消费者
* @author shengwu ni
*/
@Service
public class QueueConsumer {
/**
* 接收点对点消息
* @param msg
*/
@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
public void receiveQueueMsg(String msg) {
System.out.println("收到的消息为:" + msg);
}
}
可以看出,使用 @JmsListener
注解来指定要监听的目的地,在消息接收方法内部,我们可以根据具体的业务需求做相应的逻辑处理即可。
3.4.3 测试一下
启动项目,在浏览器中输入:http://localhost:8081/activemq/send/queue
,观察控制台的输出日志,出现下面的日志说明消息发送和消费成功。
收到的消息为:Queue: hello activemq!