原文链接:http://blog.csdn.net/dwc_fly/article/details/11096071
前几章都是直接发送MapMessage类型的数据,拿前面的例子来讲,如果生产者发送的是TextMessage,消费者也是必须TextMessage;如果我们自己要发送的数据不是TextMessage类型,而消费者还是TextMessage的,那该怎么办?难道每次接受后都要增加一个转换方法么?其实spring早就考虑到这种情况了。转化器在很多组件中都是必不缺少的东西Spring的MessageConverter接口提供了对消息转换的支持。
1、转换类的相关代码POJO
新建一个类MsgPoJo,就是一个简单的Pojo类。具体代码如下:
-
package jms.mq.spring;
-
-
import java.io.Serializable;
-
-
publicclass MsgPoJo implements Serializable{
-
private String id;
-
private String text;
-
public String getId() {
-
return id;
-
}
-
publicvoid setId(String id) {
-
this.id = id;
-
}
-
public String getText() {
-
return text;
-
}
-
publicvoid setText(String text) {
-
this.text = text;
-
}
-
}
2.转换类的实现
新建一个类MsgConverter.java,实现MessageConverter接口。生成的代码如下
-
package jms.mq.spring;
-
-
import javax.jms.JMSException;
-
import javax.jms.Message;
-
import javax.jms.Session;
-
import javax.jms.TextMessage;
-
-
import org.springframework.jms.support.converter.MessageConversionException;
-
import org.springframework.jms.support.converter.MessageConverter;
-
-
publicclass MsgConverter implements MessageConverter{
-
-
@Override
-
public Object fromMessage(Message message) throws JMSException,
-
MessageConversionException {
-
if (!(message instanceof TextMessage)) {
-
thrownew MessageConversionException("Message is not TextMessage");
-
}
-
System.out.println("--转换接收的消息--");
-
TextMessage textMessage = (TextMessage) message;
-
MsgPoJo msgPojo = new MsgPoJo();
-
String[] texts=textMessage.getText().split(",");
-
msgPojo.setId(texts[0]);
-
msgPojo.setText(texts[1]);
-
return msgPojo;
-
}
-
-
@Override
-
public Message toMessage(Object object, Session session) throws JMSException,
-
MessageConversionException {
-
if (!(object instanceof MsgPoJo)) {
-
thrownew MessageConversionException("obj is not MsgPojo");
-
}
-
System.out.println("--转换发送的消息--");
-
MsgPoJo msgPojo = (MsgPoJo) object;
-
TextMessage textMessage = session.createTextMessage();
-
textMessage.setText(msgPojo.getId()+","+msgPojo.getText());
-
return textMessage;
-
}
-
}
代码很简单就是做些转换,有fromMessage和toMessage两个方法,真好对应发送转换toMessage和接受转换fromMessage。此时,发送和接收消息要换成template.convertAndSend(message);template.receiveAndConvert()。接下来我做一些配置,让spring知道我们的转换类。修改applicationContext.xml中jms模版配置的代码,修改后的代码如下:
-
-
<beanid="msgConverter"class="jms.mq.spring.MsgConverter"></bean>
-
-
-
<beanid="jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">
-
<propertyname="connectionFactory"ref="connectionFactory"/>
-
<propertyname="defaultDestination"ref="queueDest"/>
-
-
-
<propertyname="messageConverter"ref="msgConverter"></property>
-
</bean>
注意:如果你有队列监听容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能与队列容器配置冲突。
3、业务相关代码和配置
在QueueProducerService.java增加convertAndSend()方法并在其实现类中实现,实现类的代码如下:
-
package jms.mq.spring;
-
-
import java.util.Date;
-
-
import javax.jms.Destination;
-
import javax.jms.JMSException;
-
import javax.jms.Message;
-
import javax.jms.Session;
-
import javax.jms.TextMessage;
-
import org.springframework.jms.core.JmsTemplate;
-
import org.springframework.jms.core.MessageCreator;
-
-
publicclass QueueProducerService{
-
JmsTemplate jmsTemplate;
-
-
Destination destination;
-
-
publicvoid send() {
-
MessageCreator messageCreator = new MessageCreator() {
-
public Message createMessage(Session session) throws JMSException {
-
TextMessage message = session.createTextMessage();
-
message.setText("QueueProducerService发送消息"+new Date());
-
return message;
-
}
-
-
};
-
jmsTemplate.send(this.destination,messageCreator);
-
}
-
-
publicvoid convertAndSend(){
-
MsgPoJo msgPojo = new MsgPoJo();
-
msgPojo.setId("1");
-
msgPojo.setText("first msg");
-
System.out.println("--发送消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());
-
jmsTemplate.convertAndSend(this.destination, msgPojo);
-
}
-
-
-
publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {
-
this.jmsTemplate = jmsTemplate;
-
}
-
-
publicvoid setDestination(Destination destination) {
-
this.destination = destination;
-
}
-
}
同样在QueueConsumerService.java中增加receiveAndConvert()方法并在其实现类中实现,实现类的代码如下:
-
package jms.mq.spring;
-
-
import javax.jms.Destination;
-
import javax.jms.JMSException;
-
import javax.jms.TextMessage;
-
import org.springframework.jms.core.JmsTemplate;
-
-
-
publicclass QueueConsumerService{
-
-
JmsTemplate jmsTemplate;
-
-
Destination destination;
-
-
publicvoid receive() {
-
TextMessage message = (TextMessage) jmsTemplate.receive();
-
try {
-
System.out.println("QueueConsumerService收到消息:"+message.getText());
-
-
} catch (JMSException e) {
-
e.printStackTrace();
-
}
-
}
-
-
publicvoid receiveAndConvert() {
-
MsgPoJo msgPojo = (MsgPoJo)jmsTemplate.receiveAndConvert();
-
if(msgPojo!=null){
-
System.out.println("--收到消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());
-
}
-
}
-
-
publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {
-
this.jmsTemplate = jmsTemplate;
-
}
-
-
publicvoid setDestination(Destination destination) {
-
this.destination = destination;
-
}
-
}
修改我们的两个测试类,增加对转换方法的调用,不再赘述,直接上代码:
QueueConsumerTest.java测试类
-
package jms.mq.spring;
-
-
import org.springframework.context.ApplicationContext;
-
import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-
publicclass QueueConsumerTest {
-
privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");
-
-
privatestaticvoid receive() {
-
QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");
-
consumerService.receive();
-
}
-
-
privatestaticvoid receiveAndConvert() {
-
QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");
-
consumerService.receiveAndConvert();
-
}
-
-
-
publicstaticvoid main(String[] args) {
-
-
receiveAndConvert();
-
}
-
}
QueueProducerTest.java测试类
-
package jms.mq.spring;
-
-
import org.springframework.context.ApplicationContext;
-
import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-
publicclass QueueProducerTest {
-
privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");
-
-
privatestaticvoid send() {
-
QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");
-
producerService.send();
-
}
-
-
privatestaticvoid convertAndSend() {
-
QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");
-
producerService.convertAndSend();
-
}
-
-
publicstaticvoid main(String[] args) {
-
-
convertAndSend();
-
}
-
-
}
代码编写完毕,我们看一下我们的劳动成果。首先运行生产者类和消费者控制台信息如下:


收到的内容与发的内容相同,说明转换成功了。如果这一部分的程序使用的队列跟上面的一样,那你会发现发送的时候打印出的信息不值上面的一个,还包括一个接收的信息,这是为什么呢?了解spring原理的人应该知道,spring是把所有类都加载到内容中,当然也包括我们上门写的按个实现MessageListener的一个消费者类,他们也在运行,如果监听的地址跟你送的地址正好相同的话,他也有可能收到这个信息。所以在测试的时候要注意修改配置文件。
-
<beanid="queueProducerService"class="jms.mq.spring.QueueProducerService">
-
<propertyname="jmsTemplate"ref="jmsQueueTemplate"/>
-
<propertyname="destination"ref="queueDest"/>
-
</bean>
-
-
<beanid="queueConsumerService"class="jms.mq.spring.QueueConsumerService">
-
<propertyname="jmsTemplate"ref="jmsQueueTemplate"/>
-
<propertyname="destination"ref="queueDest"/>
-
</bean>
4、监听器上的使用方式
我再来学习一下跟监听器联合使用的方式,只在发布订阅者模式上演示一下。我们先来修改发布者的实现方式,在发布者中增加convertAndSend方法并在其实现类中实现,订阅者监听器没有类转换,不用修改,发布者修改后的代码如下:
-
package jms.mq.spring;
-
-
import java.util.Date;
-
-
import javax.jms.Destination;
-
import javax.jms.JMSException;
-
import javax.jms.MapMessage;
-
import javax.jms.Message;
-
import javax.jms.Session;
-
import javax.jms.TextMessage;
-
-
import org.springframework.jms.core.JmsTemplate;
-
import org.springframework.jms.core.MessageCreator;
-
-
import jms.spring.QueueProducerService;
-
-
publicclass TopicPublisherService{
-
JmsTemplate jmsTemplate;
-
-
Destination destination;
-
-
publicvoid send() {
-
MessageCreator messageCreator = new MessageCreator() {
-
-
public Message createMessage(Session session) throws JMSException {
-
TextMessage message = session.createTextMessage();
-
message.setText("QueueProducerService发送消息"+new Date());
-
return message;
-
}
-
};
-
jmsTemplate.send(this.destination,messageCreator);
-
}
-
-
publicvoid convertAndSend(Object obj) {
-
System.out.println("--发送PoJo对象...");
-
jmsTemplate.convertAndSend(destination, obj);
-
}
-
-
-
publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {
-
this.jmsTemplate = jmsTemplate;
-
}
-
-
publicvoid setDestination(Destination destination) {
-
this.destination = destination;
-
}
-
-
}
发布订阅者配置文件如下
-
-
<beanid="jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">
-
<propertyname="connectionFactory"ref="connectionFactory"/>
-
<propertyname="defaultDestination"ref="topicDest"/>
-
-
<propertyname="pubSubDomain"value="true"/>
-
-
<propertyname="messageConverter"ref="msgConverter"></property>
-
</bean>
-
<beanid="topicPublisherService"class="jms.mq.spring.TopicPublisherService">
-
<propertyname="jmsTemplate"ref="jmsTopicTemplate"/>
-
-
<propertyname="destination"ref="topicSubscriberMessageListenerDest"/>
-
</bean>
-
-
<beanid="topicSubscriberService"class="jms.mq.spring.TopicSubscriberService">
-
<propertyname="jmsTemplate"ref="jmsTopicTemplate"/>
-
<propertyname="destination"ref="topicDest"/>
-
</bean>
修改上面的发布测试类,修改增加对新增方法的调用,修改后的内容如下:
-
package jms.mq.spring;
-
-
import org.springframework.context.ApplicationContext;
-
import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-
publicclass TopicPublisherTest {
-
privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");
-
-
privatestaticvoid send() {
-
TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");
-
topicPublisherService.send();
-
}
-
privatestaticvoid convertAndSend() {
-
TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");
-
MsgPoJo msgPoJo = new MsgPoJo();
-
msgPoJo.setId("1");
-
msgPoJo.setText("测试内容");
-
topicPublisherService.convertAndSend(msgPoJo);
-
}
-
-
-
publicstaticvoid main(String[] args) {
-
-
convertAndSend();
-
}
-
}
运行发布测试类,运行结果如下:

写在到这里,ActiveMQ与spring整合就讲完了,主要讲了ActiveMQ与spring的简单整合,监听器和类转换这些主要功能.
本文转自yunlielai51CTO博客,原文链接:http://blog.51cto.com/4925054/1288918,如需转载请自行联系原作者