ActiveMQ
是一个高性能的消息服务, 它已经实现JMS
接口(Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口), 所以我们可以直接在 Java
中使用
使用场景: 多项目解耦合、分布式事务、流量控制等等
JMS
里面有一些概念, 我们提前说明一下
- JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
- JMS生产者(Message Producer)
- JMS消费者(Message Consumer)
- JMS消息
- JMS队列
- JMS主题
JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
编程模型
- ConnectionFactory :连接工厂,JMS 用它创建连接
- Connection :JMS 客户端到JMS Provider 的连接
- Session: 一个发送或接收消息的线程
- Destination :消息的目的地;消息发送给谁.
- MessageConsumer / MessageProducer: 消息接收者,消费者
快速安装
cd bin/linux-x86-64/ ./activemq start # 启动
启动之后可以看看图形界面版 localhots:8161
点击manager
, 默认账号密码是 admin
SpringBoot配置activeMq
- 增加依赖 pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 连接池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
- 配置连接 application.properties
# 连接地址 默认连接端口为 61616 spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin # 开启连接池 最大连接数100 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100
- 启动文件配置 xxxApplication.java
package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.MultipartConfigFactory; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import org.springframework.util.unit.DataSize; import javax.jms.ConnectionFactory; import javax.jms.Queue; import javax.servlet.MultipartConfigElement; @SpringBootApplication @MapperScan("com.example.demo.mapper") // 直至mapper @EnableJms // 开启jms public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } // 配置默认队列 @Bean public Queue queue(){ return new ActiveMQQueue("common.queue"); } @Bean public ConnectionFactory connectionFactory(){ System.out.println("创建连接"); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL("tcp://localhost:61616"); connectionFactory.setUserName("admin"); connectionFactory.setPassword("admin"); return connectionFactory; } @Bean public JmsTemplate genJmsTemplate(){ return new JmsTemplate(connectionFactory()); } @Bean public JmsMessagingTemplate jmsMessagingTemplate() { return new JmsMessagingTemplate(connectionFactory()); }
- 创建消息生产者接口 ProducerService.java
package com.example.demo.service; import javax.jms.Destination; /** * 消息生产者 */ public interface ProducerService { /** *指定消息队列 、消息 * @param destination * @param message */ public void sendMessage(Destination destination, final String message); /** * 使用默认消息队列发送消息 * @param message */ public void sendMessage(final String message); }
- 创建消息生产者实现 ProducerServiceImpl.java
package com.example.demo.service.impl; import com.example.demo.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Queue; @Service public class ProducerServiceImpl implements ProducerService { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; // 发送消息 destination是发送到队列 message是待发送的消息 @Override public void sendMessage(javax.jms.Destination destination, String message) { jmsMessagingTemplate.convertAndSend(destination, message); } @Override public void sendMessage(String message) { jmsMessagingTemplate.convertAndSend(this.queue, message); } }
- 消息消费者 OrderConsumer.java
package com.example.demo.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class OrderConsumer { @JmsListener(destination = "order.queue") public void receiveQueue(String text) { System.out.println("接受到的消息 ====" + text); } }
- 控制器 OrderController.java
package com.example.demo.controller; import com.example.demo.bean.JsonUtils; import com.example.demo.service.ProducerService; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Destination; @RestController @RequestMapping("/api/v1/mq") public class OrderController { @Autowired private ProducerService producerService; @GetMapping("order") public Object order(String msg) { Destination destination = new ActiveMQQueue("order.queue"); producerService.sendMessage(destination, msg); return JsonUtils.buildSuccess("ok"); } @GetMapping("common") public Object common(String msg) { producerService.sendMessage(msg); return JsonUtils.buildSuccess("ok"); } }
- 消息产生
由于common队列我们并没有去创建消费者, 所以消息会堆积,但是order对列是有消费者的, 来让我们看看情况
正常项目肯定生产者和消费者不在同一个项目里面, 这里只是简单演示一下。
主要步骤就是: 导入依赖 -> 配置mq -> controller -> 生产者 -> 消费者