一、activeMQ与spring整合
1、需要的依赖:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.9.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.23.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.9</version> </dependency>
2、applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 1、开启自动扫描 --> <context:component-scan base-package="com.zhu.study"/> <!-- 2、配置连接 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.x.xx:61616"/> </bean> </property> <property name="maxConnections" value="100"/> </bean> <!-- 3、配置目的地,队列 --> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-active-queue"/> </bean> <!-- 配置目的地,主题 --> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-active-topic"/> </bean> <!-- 4、配置spring提供的jms模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="destinationQueue"/> <!--<property name="defaultDestination" ref="destinationTopic"/>--> <property name="messageConverter"> <!-- 做消息类型转换的 --> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> </beans>
配置很简单,就是四步:
- 开启注解扫描;
- 配置连接;
- 配置目的地;
- 配置spring提供的jmsTemplate.
如果想将目的地由queue换成topic,只需要在配置jmsTemplate的时候,将defaultDestination指向你上面配置的topic即可,然后启动时先启动消费者,其他任何地方不用改。
3、生产者:
@Service public class Produce { @Autowired private JmsTemplate jmsTemplate; public static void main(String[] args){ // 1、加载配置 ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); // 2、获取bean Produce produce = context.getBean(Produce.class); // 3、调用jmsTemplate发送消息 /*produce.jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage("spring整合activeMQ"); return message; } });*/ // lambda方式:编程口诀(拷贝小括号,写死右箭头,落地大括号) produce.jmsTemplate.send((Session session) -> { TextMessage message = session.createTextMessage("spring整合activeMQ"); return message; }); System.out.println("activemq send success!"); } }
4、消费者:
@Service public class Consumer { @Autowired private JmsTemplate jmsTemplate; public static void main(String[] args){ // 1、加载配置 ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); // 2、获取bean Consumer consumer = context.getBean(Consumer.class); // 3、消费消息 String result = (String)consumer.jmsTemplate.receiveAndConvert(); System.out.println("消费者收到消息:" + result); } }
这样就搞定了。不过上面说了,如果把目的地换成topic要先启动消费者。其实还可以配置监听程序,这样就不需要手动启动消费者了,消费者会一直处于待命状态。先写一个监听的类。
5、监听程序:
@Component public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { if (null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("从activemq收到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
然后在applicationContext.xml中配置:
<!-- 5、配置监听程序 --> <bean id="jmsListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destination" ref="destinationTopic"/> <property name="messageListener" ref="myMessageListener"/> </bean>
这样就只需启动生产者即可了,监听程序会自动监听,收到消息就会进行消费。
二、activeMQ与springboot整合
首先新建一个springboot项目,用来编写生产者代码。
1、需要的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
2、application.yml:
server: port: 6666 spring: activemq: broker-url: tcp://192.168.x.xx:61616 user: admin password: admin # false = Queue; true = Topic jms: pub-sub-domain: false #队列名称 myqueue: boot-activemq
上面配置的是队列,要用主题的话,把上面的false改成true。
3、配置类:
@Component @EnableJms // 这个注解必不可少 public class ConfigBean { @Value("${myqueue}") private String myQueue; // 1、创建队列 @Bean public Queue queue(){ return new ActiveMQQueue(myQueue); } }
这个配置类主要就是创建了一个队列,队列名从配置文件中读取。
4、生产者:
这里主要有两种生产模式,一种是触发投递,一种是定时投递。触发投递就是程序启动后,满足某个条件才会去调用发送消息的方法;定时投递就是相当于一个定时任务。
@Component public class Produce { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; // 触发投递 public void produceMsg(){ String message = "springboot整合activemq成功"; jmsMessagingTemplate.convertAndSend(queue,message); System.out.println("消息触发投递成功"); } // 定时生产,每隔5秒中向MQ发送一次消息(还需在启动类上加上@EnableScheduling注解) @Scheduled(fixedDelay = 5000) public void produceMsgScheduled(){ String message = "定时投递消息"; jmsMessagingTemplate.convertAndSend(queue,message); System.out.println("消息定时投递成功"); } }
注意定时投递需要在启动类上加@EnableScheduling注解!要测试定时投递,直接运行spring boot的启动类就好了,就可以看到每隔5秒“消息定时投递成功”就会被打印一次。要测试触发投递,就需要我们手动地去调用produceMsg方法,可以写个如下的测试类:
@RunWith(SpringRunner.class) @SpringBootTest public class ActivemqspringbootApplicationTests { @Autowired private Produce produce; @Test public void contextLoads() { produce.produceMsg(); } }
运行这个测试类就可以看到会打印出“消息触发投递成功”,然后程序就会停止。
5、消费者:
我们知道消费消息有两种方式,一种是用receive方法,还有就是监听。用receive方法和spring中的一样,这里讲如何配置监听。
@Component public class Queue_Consumer { @JmsListener(destination = "${myqueue}") public void recevice(TextMessage message) throws Exception{ System.out.println("消费者收到消息:" + message.getText()); } }
没错,就是这么简单!在spring中还需要我们自己新建监听类,然后配置到配置文件中,在springboot中,一个注解就搞定了!
6、发布订阅:
上面用的是队列,主题的使用方法如下:
- 生产者和消费者项目配置文件中的
pub-sub-domain
的值改为true。
- 生产者的配置类中,new的不是队列,而是主题了,如下:
@Bean public Topic topic(){ return new ActiveMQTopic(myTopic); }
- 生产消息的时候注入主题,而非队列,其他无异。
可以看出,springboot整合activemq比spring整个它简单很多!