前面文章介绍了ActiveMQ的相关内容,本文介绍ActiveMQ和Spring的整合开发
整合Spring框架
1.相关jar包
activemq-all-5.9.0.jar aopalliance-1.0.jar commons-logging-1.2.jar spring-aop-4.1.6.RELEASE.jar spring-beans-4.1.6.RELEASE.jar spring-context-4.1.6.RELEASE.jar spring-core-4.1.6.RELEASE.jar spring-expression-4.1.6.RELEASE.jar spring-jms-4.1.6.RELEASE.jar spring-messaging-4.1.6.RELEASE.jar spring-tx-4.1.6.RELEASE.jar spring-web-4.1.6.RELEASE.jar xbean-spring-4.5.jar
相关maven坐标
<dependencies> <!-- ActiveMQ客户端完整jar包依赖 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.5</version> </dependency> <!-- Spring-JMS插件相关jar包依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- Spring框架上下文jar包依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> </dependencies>
2.定义消费者
package com.dpb.consumer; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import com.dpb.bean.Order; /** * ActiveMQ Consumer消费者 * @author dengp * */ public class OrderConsumer implements MessageListener{ @Override public void onMessage(Message message) { try{ ObjectMessage objectMassage = (ObjectMessage) message; Order order = (Order) objectMassage.getObject(); System.out.println("the order is : " + order); }catch(Exception e){ e.printStackTrace(); } } }
3.定义生产者
package com.dpb.producer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import com.dpb.bean.Order; /** * ActiveMQ producer 生产者 * @author dengp * */ public class OrderProducer { private JmsTemplate template; public JmsTemplate getTemplate() { return template; } public void setTemplate(JmsTemplate template) { this.template = template; } /** * 生产者发送消息 * @param destinationName 目的地名称 * @param order 需要发送的订单数据 */ public void sendOrder(String destinationName, Order order){ try{ template.send(destinationName, new MessageCreator() { /** * 通过模板模式暴露的方法设置发送的信息 */ @Override public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(order); return objectMessage; } }); }catch(Exception e){ e.printStackTrace(); throw new RuntimeException("send order to MQ server error!!"); } } }
4.Spring配置文件整合ActiveMQ
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd"> <!-- 添加扫描 --> <context:component-scan base-package="com.dpb.*"></context:component-scan> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <!-- 需提供访问路径tcp://ip:61616;以及用户名,密码 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.88.121:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 消息生产者 start --> <!-- 定义JmsTemplate对象. 此类型由Spring框架JMS组件提供. 用于访问ActiveMQ使用. --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式, 默认数据可省略配置 --> <!-- <property name="pubSubDomain" value="false" /> --> </bean> <!-- 定义生成者对象 --> <bean id="orderProducer" class="com.dpb.producer.OrderProducer"> <!-- 为属性赋值 --> <property name="template" ref="jmsQueueTemplate"></property> </bean> <!--消息生产者 end --> <!-- 消息消费者 start --> <!-- 定义消息监听器, 此组件为spring-jms组件定义. 可以一次注册若干消息监听器. 属性解释: destination-type - 目的地类型, queue代表消息队列 可选值: queue | topic | durableTopic queue - 默认值. 代表消息队列 topic - 代表消息队列集合 durableTopic - 持久化的消息队列集合. ActiveMQ会保证消息的消费者一定接收到此消息. container-type - 容器类型 可选值: default | simple default - 默认值. 默认容器类型, 对应DefaultMessageListenerContainer simple - 简单容器类型, 对应SimpleMessageListenerContainer connection-factory - 链接工厂, 注入的是Spring-JMS组件提供的链接工厂对象. acknowledge - 确认方式 可选值: auto | client | dups-ok | transacted auto - 默认值, 即自动确认消息 client - 客户端确认消息 dups-ok - 可使用副本的客户端确认消息 transacted - 有事务的持久化消息确认机制. 需开启对ActiveMQ的事务控制才可应用. --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 注册消息监听器. 如果需要注册多个, 重复定义下述标签. --> <jms:listener destination="test-spring" ref="orderReciver" /> </jms:listener-container> <!-- 容器管理消息监听器实现类对象 --> <bean id="orderReciver" class="com.dpb.consumer.OrderConsumer"/> <!-- 消息消费者 end --> </beans>
5.测试
public class Test { public static void main(String[] args) { ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml"); OrderProducer pro = ac.getBean("orderProducer",OrderProducer.class); Order order = new Order(); order.setId("101"); order.setNick("波波烤鸭"); order.setPrice(10000L); order.setCreateTime(new Date()); pro.sendOrder("test-spring", order); } }