其他发送接收消息方式实战
RabbitMQ在实际的应用系统中,除了可以采用上诉所讲的发生字节型(通过getBytes()方法或者序列化方法)的消息和采用@RabbitListener接收字节数组类型的消息外,还可以发送、接收“对象类型”的方式实现消息的发送和接收。
下面以生产者发送学生对象信息到基本的消息模型中,并由消费者进行监听消费处理为例。
1.创建Student类
package com.debug.middleware.server.entity; import lombok.Data; import lombok.ToString; import java.io.Serializable; /** * @className: * @PackageName: com.debug.middleware.server.entity * @author: youjp * @create: 2020-04-01 23:16 * @description: * @Version: 1.0 */ @Data @ToString public class Student implements Serializable { private String no; private String username; private String name; public Student() { } public Student(String no, String username, String name) { this.no = no; this.username = username; this.name = name; } }
2.在之前的RabbitmqConfig文件中创建用于发送对象类型的消息队列、交换机和路由并且进行绑定
/**创建简单消息模型-对象类型:队列、交换机和路由 **/ @Bean(name = "objectQueue") public Queue objectQueue(){ return new Queue(env.getProperty("mq.object.info.queue.name"),true); } //创建交换机:在这里以DirectExchange为例,在后面章节中我们将继续详细介绍这种消息模型 @Bean public DirectExchange objectExchange(){ return new DirectExchange(env.getProperty("mq.object.info.exchange.name"),true,false); } //创建绑定 @Bean public Binding objectBinding(){ return BindingBuilder.bind(objectQueue()).to(objectExchange()).with(env.getProperty("mq.object.info.routing.key.name")); }
3.application.yml文件中配置对于的队列、交换机、路由名称,
mq: env: local #定义基本消息模型中队列、交换机和路由的名称 basic: info: queue: name: ${mq.env}.middleware.mq.basic.info.queue exchange: name: ${mq.env}.middleware.mq.basic.info.exchange routing: key: name: ${mq.env}.middleware.mq.basic.info.routing.key # 基本消息模型: 对象消息中队列、交换机和路由的名称 object: info: queue: name : ${mq.env}.middleware.mq.object.info.queue exchange: name: ${mq.env}.middleware.mq.object.info.exchange routing: key: name: ${mq.env}.middleware.mq.object.info.routing.key
4.开发用于发送对象类型消息的功能(生产者),这里将功能放到BasicPublisher类中
package com.debug.middleware.server.rabbitmq; import com.debug.middleware.server.entity.Student; import org.assertj.core.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; /** * @className: * @PackageName: com.debug.middleware.server.rabbitmq * @author: youjp * @create: 2020-04-06 20:31 * @description: TODO 基本消息模型- 生产者 * @Version: 1.0 */ @Component public class BasicPulisher { //定义日志 private static final Logger logger=LoggerFactory.getLogger(BasicPulisher.class); //定义RabbitMQ消息操作组件 @Autowired private RabbitTemplate rabbitTemplate; //定义环境读取实例 @Autowired private Environment env; /** * 发送对象类型的消息 * @param s */ public void sendObjectMsg(Student s){ if (s!=null){ try { //定义消息传输格式为JSON字符串格式 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //指定消息模型中的交换机 rabbitTemplate.setExchange(env.getProperty("mq.object.info.exchange.name")); //指定消息模型中的交换机 rabbitTemplate.setRoutingKey(env.getProperty("mq.object.info.routing.key.name")); //采用convertAndSend方法即可发送消息 rabbitTemplate.convertAndSend(s,new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { //获取消息的属性 MessageProperties messageProperties=message.getMessageProperties(); //设置消息的持久化模式 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置消息的类型(在这里指定消息的类型为Student类型)message messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Student.class); return message; } }); logger.info("基本消息模型-生产者-发送对象类型的消息:{}",s); }catch (Exception e) { logger.error("基本消息模型-生产者-发送对象类型的消息发生异常:{}",s,e.fillInStackTrace()); } } } }
从上诉代码中可以得知,如果发送对象类型的消息,则需要借助RabbitTemplate的convertAndSend方法,该方法通过MessagePostProcessor的实现类直接指定待发送消息的类型,
5.开发用于监听消费处理对象的消费者功能。在BasicConsumer类中添加
package com.debug.middleware.server.rabbitmq; import com.debug.middleware.server.entity.Student; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; /** * @className: * @PackageName: com.debug.middleware.server.entity * @author: youjp * @create: 2020-04-06 20:49 * @description: TODO 基本消费模型:消费者 * @Version: 1.0 */ @Component public class BasicConsumer { //定义日志 private static final Logger logger=LoggerFactory.getLogger(BasicConsumer.class); //定义RabbitMQ消息操作组件 @Autowired private RabbitTemplate rabbitTemplate; //定义json序列化和反序列化实例 @Autowired private ObjectMapper objectMapper; /** * 监听并消费队列中的消息-监听消费处理对象信息。 * @param s */ @RabbitListener(queues = "${mq.object.info.queue.name}",containerFactory = "singleListenerContainer") public void consumeObjectMsg(@Payload Student s){ try { logger.info("基本消息模型-监听消费者处理对象-消费者-监听消费到消息:{}",s); } catch (Exception e) { logger.error("基本消息模型-消费者-监听发生异常:",e.fillInStackTrace()); e.printStackTrace(); } } }
从上面可知,消费者监听消费消息功能对应的方法也可以直接接受对象类型的参数,前提是生产者在发送消息时指定消息的类型几这段代码:
//设置消息的类型(在这里指定消息的类型为Student类型)message messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Student.class);
6.写个单元测试
@Test public void test2()throws Exception{ Student student=new Student("1001","youjp","收破烂"); basicPulisher.sendObjectMsg(student); }
运行测试方法
查看rabbitmq客户端后台,可看到新增了一条队列记录
点击,可查看到
至此关于RabbitMQ在应用系统中采用其他方式发送、接收消息的部分已经讲解完毕。值得一提的是,本篇博客再我们采用了基于DirectExchange交换机进行构建,这属于RabbitMQ消息模型中的一种,在后续我也会讲解其他消息模型。
源码下载:
https://gitee.com/yjp245/middleware_study.git