一、引入spring和ActiveMQ的依赖。
<properties> <spring.version>5.0.2.RELEASE</spring.version> <activemq.version>5.15.5</activemq.version> </properties> <dependencies> <!--activemq需要的jar包 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> </dependencies>
二、整合queue消息模式
1.application-queue.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!--声明消息工厂--> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616"/> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!--声明目的地 队列--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-active-queue"/> </bean> <!-- Spring的JMS模版操作activeMQ工具 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="destinationQueue"/> <!-- 消息转化器 --> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> </beans>
2.QuenProducer.java消息生产者
public class QuenProducer { public static void main(String[] args) { // 创建IOC容器 ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); //指定队列名称地址 //jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen")); // 默认队列 jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { //创建消息 TextMessage textMessage = session.createTextMessage("spring整合ActiveMQ-queue"); return textMessage; } }); System.out.println("消息发送成功!!!"); } }
3.QuenConsumer.java消息消费者
public class QuenConsumer { public static void main(String[] args) throws JMSException { //创建IOC容器 ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); //消费队列名称地址 //jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen")); //如果不设置监听地址,默认目的地 TextMessage textMessage = (TextMessage) jmsTemplate.receive(); System.out.println(textMessage.getText()); //返回的是对象 消息转换器的作用 bean里面有配置 //Object object = jmsTemplate.receiveAndConvert(); //System.out.println("消息是:"+ object); } }
三、整合Topic消息模式
1.application-topic.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!--声明消息工厂--> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616"/> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!--声明目的地 队列--> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-active-topic"/> </bean> <!-- Spring的JMS模版操作activeMQ工具 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="destinationTopic"/> <!-- 消息转化器 --> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> </beans>
2.生产者和消费者,和queue模式一样,只需要修改配置文件即可。
四、监听者模式
1.application-queue-listener.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!--声明消息工厂--> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616"/> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!--声明目的地 队列--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-active-queue"/> </bean> <!-- Spring的JMS模版操作activeMQ工具 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="destinationQueue"/> <!-- 消息转化器 --> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> <!--声明一个监听器对象--> <bean id="myListener" class="com.wang.listener.MyListener"></bean> <!--配置监听容器--> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destination" ref="destinationQueue"/> <property name="messageListener" ref="myListener"/> </bean> </beans>
2.MyListener.java,监听相当于消费者。
public class MyListener implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("监听器收到消息,不需要配置消费者了!!!,内容是:"+ textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
3.QuenProducerListener.java 只需要配置生产者就可以了。上面监听相当于消费者。
public class QuenProducerListener { public static void main(String[] args) { // 创建IOC容器 ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue-listener.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); //指定队列名称地址 //jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen")); // 默认队列 jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { //创建消息 TextMessage textMessage = session.createTextMessage("spring整合ActiveMQ-queue"); return textMessage; } }); System.out.println("消息发送成功!!!"); } }