Java中间件JMS(四)之ActiveMQ整合spring之类转换

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

原文链接:http://blog.csdn.net/dwc_fly/article/details/11096071

前几章都是直接发送MapMessage类型的数据,拿前面的例子来讲,如果生产者发送的是TextMessage,消费者也是必须TextMessage;如果我们自己要发送的数据不是TextMessage类型,而消费者还是TextMessage的,那该怎么办?难道每次接受后都要增加一个转换方法么?其实spring早就考虑到这种情况了。转化器在很多组件中都是必不缺少的东西Spring的MessageConverter接口提供了对消息转换的支持。


1、转换类的相关代码POJO

新建一个类MsgPoJo,就是一个简单的Pojo类。具体代码如下:

[java] view plain copy
  1. package jms.mq.spring;  

  2. import java.io.Serializable;  

  3. publicclass MsgPoJo implements Serializable{  

  4. private String id;  

  5. private String text;  

  6. public String getId() {  

  7. return id;  

  8.    }  

  9. publicvoid setId(String id) {  

  10. this.id = id;  

  11.    }  

  12. public String getText() {  

  13. return text;  

  14.    }  

  15. publicvoid setText(String text) {  

  16. this.text = text;  

  17.    }    

  18. }  


2.转换类的实现

新建一个类MsgConverter.java,实现MessageConverter接口。生成的代码如下


[java] view plain copy
  1. package jms.mq.spring;  

  2. import javax.jms.JMSException;  

  3. import javax.jms.Message;  

  4. import javax.jms.Session;  

  5. import javax.jms.TextMessage;  

  6. import org.springframework.jms.support.converter.MessageConversionException;  

  7. import org.springframework.jms.support.converter.MessageConverter;  

  8. publicclass MsgConverter implements MessageConverter{  

  9. @Override

  10. public Object fromMessage(Message message) throws JMSException,  

  11.    MessageConversionException {  

  12. if (!(message instanceof TextMessage)) {  

  13. thrownew MessageConversionException("Message is not TextMessage");  

  14.        }  

  15.        System.out.println("--转换接收的消息--");  

  16.        TextMessage textMessage = (TextMessage) message;  

  17.        MsgPoJo msgPojo = new MsgPoJo();  

  18.        String[] texts=textMessage.getText().split(",");  

  19.        msgPojo.setId(texts[0]);  

  20.        msgPojo.setText(texts[1]);  

  21. return msgPojo;  

  22.    }  

  23. @Override

  24. public Message toMessage(Object object, Session session) throws JMSException,  

  25.    MessageConversionException {  

  26. if (!(object instanceof MsgPoJo)) {  

  27. thrownew MessageConversionException("obj is not MsgPojo");  

  28.        }  

  29.        System.out.println("--转换发送的消息--");  

  30.        MsgPoJo msgPojo = (MsgPoJo) object;  

  31.        TextMessage textMessage = session.createTextMessage();  

  32.        textMessage.setText(msgPojo.getId()+","+msgPojo.getText());  

  33. return  textMessage;  

  34.    }  

  35. }  



代码很简单就是做些转换,有fromMessage和toMessage两个方法,真好对应发送转换toMessage和接受转换fromMessage。此时,发送和接收消息要换成template.convertAndSend(message);template.receiveAndConvert()。接下来我做一些配置,让spring知道我们的转换类。修改applicationContext.xml中jms模版配置的代码,修改后的代码如下:

[html] view plain copy
  1. <!-- 类转换 -->

  2. <beanid="msgConverter"class="jms.mq.spring.MsgConverter"></bean>

  3. <!-- 配置Jms模板 -->

  4. <beanid="jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">

  5. <propertyname="connectionFactory"ref="connectionFactory"/>

  6. <propertyname="defaultDestination"ref="queueDest"/>

  7. <!--<property name="receiveTimeout" value="10000" /> -->

  8. <!-- 类转换 -->

  9. <propertyname="messageConverter"ref="msgConverter"></property>

  10. </bean>

注意:如果你有队列监听容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能与队列容器配置冲突。

3、业务相关代码和配置

在QueueProducerService.java增加convertAndSend()方法并在其实现类中实现,实现类的代码如下:


[java] view plain copy
  1. package jms.mq.spring;  

  2. import java.util.Date;  

  3. import javax.jms.Destination;  

  4. import javax.jms.JMSException;  

  5. import javax.jms.Message;  

  6. import javax.jms.Session;  

  7. import javax.jms.TextMessage;  

  8. import org.springframework.jms.core.JmsTemplate;  

  9. import org.springframework.jms.core.MessageCreator;  

  10. publicclass QueueProducerService{  

  11.    JmsTemplate jmsTemplate;  

  12.    Destination destination;  

  13. publicvoid send() {  

  14.        MessageCreator messageCreator = new MessageCreator() {  

  15. public Message createMessage(Session session) throws JMSException {  

  16.                TextMessage message = session.createTextMessage();  

  17.                message.setText("QueueProducerService发送消息"+new Date());  

  18. return message;  

  19.            }  

  20.        };  

  21.        jmsTemplate.send(this.destination,messageCreator);  

  22.    }  

  23. publicvoid convertAndSend(){  

  24.        MsgPoJo msgPojo = new MsgPoJo();  

  25.        msgPojo.setId("1");  

  26.        msgPojo.setText("first msg");  

  27.        System.out.println("--发送消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());  

  28.        jmsTemplate.convertAndSend(this.destination, msgPojo);  

  29.    }  

  30. publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  

  31. this.jmsTemplate = jmsTemplate;  

  32.    }  

  33. publicvoid setDestination(Destination destination) {  

  34. this.destination = destination;  

  35.    }  

  36. }  


同样在QueueConsumerService.java中增加receiveAndConvert()方法并在其实现类中实现,实现类的代码如下:


[java] view plain copy
  1. package jms.mq.spring;  

  2. import javax.jms.Destination;  

  3. import javax.jms.JMSException;  

  4. import javax.jms.TextMessage;  

  5. import org.springframework.jms.core.JmsTemplate;  

  6. publicclass QueueConsumerService{  

  7.    JmsTemplate jmsTemplate;  

  8.    Destination destination;  

  9. publicvoid receive() {  

  10.        TextMessage message = (TextMessage) jmsTemplate.receive();  

  11. try {  

  12.            System.out.println("QueueConsumerService收到消息:"+message.getText());  

  13.        } catch (JMSException e) {  

  14.            e.printStackTrace();  

  15.        }  

  16.    }  

  17. publicvoid receiveAndConvert() {  

  18.        MsgPoJo msgPojo = (MsgPoJo)jmsTemplate.receiveAndConvert();  

  19. if(msgPojo!=null){  

  20.            System.out.println("--收到消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());  

  21.        }  

  22.    }  

  23. publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  

  24. this.jmsTemplate = jmsTemplate;  

  25.    }  

  26. publicvoid setDestination(Destination destination) {  

  27. this.destination = destination;  

  28.    }  

  29. }  



修改我们的两个测试类,增加对转换方法的调用,不再赘述,直接上代码:

QueueConsumerTest.java测试类

[java] view plain copy
  1. package jms.mq.spring;  

  2. import org.springframework.context.ApplicationContext;  

  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  

  4. publicclass QueueConsumerTest {  

  5. privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  

  6. privatestaticvoid receive() {  

  7.        QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");  

  8.        consumerService.receive();  

  9.    }  

  10. privatestaticvoid receiveAndConvert() {  

  11.        QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");  

  12.        consumerService.receiveAndConvert();  

  13.    }  

  14. publicstaticvoid main(String[] args) {  

  15. //receive();

  16.        receiveAndConvert();  

  17.    }  

  18. }  

QueueProducerTest.java测试类

[java] view plain copy
  1. package jms.mq.spring;  

  2. import org.springframework.context.ApplicationContext;  

  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  

  4. publicclass QueueProducerTest {  

  5. privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  

  6. privatestaticvoid send() {  

  7.        QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");  

  8.        producerService.send();  

  9.    }  

  10. privatestaticvoid convertAndSend() {  

  11.        QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");  

  12.        producerService.convertAndSend();  

  13.    }  

  14. publicstaticvoid main(String[] args) {  

  15. //send();

  16.        convertAndSend();  

  17.    }  

  18. }  


代码编写完毕,我们看一下我们的劳动成果。首先运行生产者类和消费者控制台信息如下:


收到的内容与发的内容相同,说明转换成功了。如果这一部分的程序使用的队列跟上面的一样,那你会发现发送的时候打印出的信息不值上面的一个,还包括一个接收的信息,这是为什么呢?了解spring原理的人应该知道,spring是把所有类都加载到内容中,当然也包括我们上门写的按个实现MessageListener的一个消费者类,他们也在运行,如果监听的地址跟你送的地址正好相同的话,他也有可能收到这个信息。所以在测试的时候要注意修改配置文件。


[html] view plain copy
  1. <beanid="queueProducerService"class="jms.mq.spring.QueueProducerService">

  2. <propertyname="jmsTemplate"ref="jmsQueueTemplate"/>

  3. <propertyname="destination"ref="queueDest"/>

  4. </bean>

  5. <beanid="queueConsumerService"class="jms.mq.spring.QueueConsumerService">

  6. <propertyname="jmsTemplate"ref="jmsQueueTemplate"/>

  7. <propertyname="destination"ref="queueDest"/>

  8. </bean>


4、监听器上的使用方式

我再来学习一下跟监听器联合使用的方式,只在发布订阅者模式上演示一下。我们先来修改发布者的实现方式,在发布者中增加convertAndSend方法并在其实现类中实现,订阅者监听器没有类转换,不用修改,发布者修改后的代码如下:


[java] view plain copy
  1. package jms.mq.spring;  

  2. import java.util.Date;  

  3. import javax.jms.Destination;  

  4. import javax.jms.JMSException;  

  5. import javax.jms.MapMessage;  

  6. import javax.jms.Message;  

  7. import javax.jms.Session;  

  8. import javax.jms.TextMessage;  

  9. import org.springframework.jms.core.JmsTemplate;  

  10. import org.springframework.jms.core.MessageCreator;  

  11. import jms.spring.QueueProducerService;  

  12. publicclass TopicPublisherService{  

  13.    JmsTemplate jmsTemplate;  

  14.    Destination destination;  

  15. publicvoid send() {  

  16.        MessageCreator messageCreator = new MessageCreator() {  

  17. public Message createMessage(Session session) throws JMSException {  

  18.                TextMessage message = session.createTextMessage();  

  19.                message.setText("QueueProducerService发送消息"+new Date());  

  20. return message;  

  21.            }  

  22.        };  

  23.        jmsTemplate.send(this.destination,messageCreator);  

  24.    }  

  25. publicvoid convertAndSend(Object obj) {  

  26.        System.out.println("--发送PoJo对象...");  

  27.        jmsTemplate.convertAndSend(destination, obj);  

  28.    }  

  29. publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {  

  30. this.jmsTemplate = jmsTemplate;  

  31.    }  

  32. publicvoid setDestination(Destination destination) {  

  33. this.destination = destination;  

  34.    }  

  35. }  


发布订阅者配置文件如下


[html] view plain copy
  1. <!-- 配置TopicJms模板 -->

  2. <beanid="jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">

  3. <propertyname="connectionFactory"ref="connectionFactory"/>

  4. <propertyname="defaultDestination"ref="topicDest"/>

  5. <!-- 配置是否为发布订阅者模式,默认为false -->

  6. <propertyname="pubSubDomain"value="true"/>

  7. <!--<property name="receiveTimeout" value="10000" /> -->

  8. <propertyname="messageConverter"ref="msgConverter"></property>

  9. </bean>

[html] view plain copy
  1. <beanid="topicPublisherService"class="jms.mq.spring.TopicPublisherService">

  2. <propertyname="jmsTemplate"ref="jmsTopicTemplate"/>

  3. <!-- <property name="destination" ref="topicDest" /> -->

  4. <propertyname="destination"ref="topicSubscriberMessageListenerDest"/>

  5. </bean>

  6. <beanid="topicSubscriberService"class="jms.mq.spring.TopicSubscriberService">

  7. <propertyname="jmsTemplate"ref="jmsTopicTemplate"/>

  8. <propertyname="destination"ref="topicDest"/>

  9. </bean>


修改上面的发布测试类,修改增加对新增方法的调用,修改后的内容如下:

[java] view plain copy
  1. package jms.mq.spring;  

  2. import org.springframework.context.ApplicationContext;  

  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  

  4. publicclass TopicPublisherTest {  

  5. privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  

  6. privatestaticvoid send() {  

  7.        TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");  

  8.        topicPublisherService.send();  

  9.    }  

  10. privatestaticvoid convertAndSend() {  

  11.        TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");  

  12.        MsgPoJo msgPoJo = new MsgPoJo();  

  13.        msgPoJo.setId("1");  

  14.        msgPoJo.setText("测试内容");  

  15.        topicPublisherService.convertAndSend(msgPoJo);  

  16.    }  

  17. publicstaticvoid main(String[] args) {  

  18. //send();

  19.        convertAndSend();  

  20.    }  

  21. }  

运行发布测试类,运行结果如下:

写在到这里,ActiveMQ与spring整合就讲完了,主要讲了ActiveMQ与spring的简单整合,监听器和类转换这些主要功能.















本文转自yunlielai51CTO博客,原文链接: http://blog.51cto.com/4925054/1288918 ,如需转载请自行联系原作者
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
17天前
|
存储 NoSQL Java
使用Java和Spring Data构建数据访问层
本文介绍了如何使用 Java 和 Spring Data 构建数据访问层的完整过程。通过创建实体类、存储库接口、服务类和控制器类,实现了对数据库的基本操作。这种方法不仅简化了数据访问层的开发,还提高了代码的可维护性和可读性。通过合理使用 Spring Data 提供的功能,可以大幅提升开发效率。
60 21
|
7天前
|
网络协议 Java Shell
java spring 项目若依框架启动失败,启动不了服务提示端口8080占用escription: Web server failed to start. Port 8080 was already in use. Action: Identify and stop the process that’s listening on port 8080 or configure this application to listen on another port-优雅草卓伊凡解决方案
java spring 项目若依框架启动失败,启动不了服务提示端口8080占用escription: Web server failed to start. Port 8080 was already in use. Action: Identify and stop the process that’s listening on port 8080 or configure this application to listen on another port-优雅草卓伊凡解决方案
32 7
|
6天前
|
安全 Java 编译器
JAVA泛型类的使用(二)
接上一篇继续介绍Java泛型的高级特性。3. **编译时类型检查**:尽管运行时发生类型擦除,编译器会在编译阶段进行严格类型检查,并允许通过`extends`关键字对类型参数进行约束,确保类型安全。4. **桥方法**:为保证多态性,编译器会生成桥方法以处理类型擦除带来的问题。5. **运行时获取泛型信息**:虽然泛型信息在运行时被擦除,但可通过反射机制部分恢复这些信息,例如使用`ParameterizedType`来获取泛型参数的实际类型。
|
6天前
|
安全 Java 编译器
JAVA泛型类的使用(一)
Java 泛型类是 JDK 5.0 引入的重要特性,提供编译时类型安全检测,增强代码可读性和可维护性。通过定义泛型类如 `Box&lt;T&gt;`,允许使用类型参数。其核心原理是类型擦除,即编译时将泛型类型替换为边界类型(通常是 Object),确保与旧版本兼容并优化性能。例如,`Box&lt;T&gt;` 编译后变为 `Box&lt;Object&gt;`,从而实现无缝交互和减少内存开销。
|
1月前
|
Java Spring
Java Spring Boot监听事件和处理事件
通过上述步骤,我们可以在Java Spring Boot应用中实现事件的发布和监听。事件驱动模型可以帮助我们实现组件间的松耦合,提升系统的可维护性和可扩展性。无论是处理业务逻辑还是系统事件,Spring Boot的事件机制都提供了强大的支持和灵活性。希望本文能为您的开发工作提供实用的指导和帮助。
101 15
|
3月前
|
Java 开发者
在 Java 中,一个类可以实现多个接口吗?
这是 Java 面向对象编程的一个重要特性,它提供了极大的灵活性和扩展性。
200 58
|
1月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
2月前
|
JSON Java Apache
Java基础-常用API-Object类
继承是面向对象编程的重要特性,允许从已有类派生新类。Java采用单继承机制,默认所有类继承自Object类。Object类提供了多个常用方法,如`clone()`用于复制对象,`equals()`判断对象是否相等,`hashCode()`计算哈希码,`toString()`返回对象的字符串表示,`wait()`、`notify()`和`notifyAll()`用于线程同步,`finalize()`在对象被垃圾回收时调用。掌握这些方法有助于更好地理解和使用Java中的对象行为。
|
2月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
97 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
3月前
|
存储 缓存 安全
java 中操作字符串都有哪些类,它们之间有什么区别
Java中操作字符串的类主要有String、StringBuilder和StringBuffer。String是不可变的,每次操作都会生成新对象;StringBuilder和StringBuffer都是可变的,但StringBuilder是非线程安全的,而StringBuffer是线程安全的,因此性能略低。
104 8