RabbitMQ中间件的使用(2)

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
性能测试 PTS,5000VUM额度
简介: RabbitMQ中间件的使用(2)

其他发送接收消息方式实战

RabbitMQ在实际的应用系统中,除了可以采用上诉所讲的发生字节型(通过getBytes()方法或者序列化方法)的消息和采用@RabbitListener接收字节数组类型的消息外,还可以发送、接收“对象类型”的方式实现消息的发送和接收。

下面以生产者发送学生对象信息到基本的消息模型中,并由消费者进行监听消费处理为例。

1.创建Student类

package com.debug.middleware.server.entity;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* @className:
* @PackageName: com.debug.middleware.server.entity
* @author: youjp
* @create: 2020-04-01 23:16
* @description:
* @Version: 1.0
*/
@Data
@ToString
public class Student implements Serializable {
    private String no;
    private String username;
    private String name;
    public Student() {
    }
    public Student(String no, String username, String name) {
        this.no = no;
        this.username = username;
        this.name = name;
    }
}

2.在之前的RabbitmqConfig文件中创建用于发送对象类型的消息队列、交换机和路由并且进行绑定

/**创建简单消息模型-对象类型:队列、交换机和路由 **/
@Bean(name = "objectQueue")
public Queue objectQueue(){
    return new Queue(env.getProperty("mq.object.info.queue.name"),true);
}
//创建交换机:在这里以DirectExchange为例,在后面章节中我们将继续详细介绍这种消息模型
@Bean
public DirectExchange objectExchange(){
    return new DirectExchange(env.getProperty("mq.object.info.exchange.name"),true,false);
}
//创建绑定
@Bean
public Binding objectBinding(){
    return BindingBuilder.bind(objectQueue()).to(objectExchange()).with(env.getProperty("mq.object.info.routing.key.name"));
}

3.application.yml文件中配置对于的队列、交换机、路由名称,

mq:
  env: local
#定义基本消息模型中队列、交换机和路由的名称
  basic:
      info:
        queue:
          name: ${mq.env}.middleware.mq.basic.info.queue
        exchange:
          name:  ${mq.env}.middleware.mq.basic.info.exchange
        routing:
          key:
            name: ${mq.env}.middleware.mq.basic.info.routing.key
# 基本消息模型: 对象消息中队列、交换机和路由的名称
  object:
    info:
      queue:
        name :  ${mq.env}.middleware.mq.object.info.queue
      exchange:
        name:  ${mq.env}.middleware.mq.object.info.exchange
      routing:
        key:
           name: ${mq.env}.middleware.mq.object.info.routing.key

4.开发用于发送对象类型消息的功能(生产者),这里将功能放到BasicPublisher类中

package com.debug.middleware.server.rabbitmq;
import com.debug.middleware.server.entity.Student;
import org.assertj.core.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* @className:
* @PackageName: com.debug.middleware.server.rabbitmq
* @author: youjp
* @create: 2020-04-06 20:31
* @description: TODO  基本消息模型- 生产者
* @Version: 1.0
*/
@Component
public class BasicPulisher {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(BasicPulisher.class);
    //定义RabbitMQ消息操作组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义环境读取实例
    @Autowired
    private Environment env;
    /**
     * 发送对象类型的消息
     * @param s
     */
    public void sendObjectMsg(Student s){
        if (s!=null){
            try {
                //定义消息传输格式为JSON字符串格式
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //指定消息模型中的交换机
                rabbitTemplate.setExchange(env.getProperty("mq.object.info.exchange.name"));
                //指定消息模型中的交换机
                rabbitTemplate.setRoutingKey(env.getProperty("mq.object.info.routing.key.name"));
                //采用convertAndSend方法即可发送消息
                rabbitTemplate.convertAndSend(s,new MessagePostProcessor(){
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //获取消息的属性
                        MessageProperties messageProperties=message.getMessageProperties();
                        //设置消息的持久化模式
                        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置消息的类型(在这里指定消息的类型为Student类型)message
                        messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Student.class);
                        return message;
                    }
                });
                logger.info("基本消息模型-生产者-发送对象类型的消息:{}",s);
            }catch (Exception e) {
                logger.error("基本消息模型-生产者-发送对象类型的消息发生异常:{}",s,e.fillInStackTrace());
            }
        }
    }
}

从上诉代码中可以得知,如果发送对象类型的消息,则需要借助RabbitTemplate的convertAndSend方法,该方法通过MessagePostProcessor的实现类直接指定待发送消息的类型,


5.开发用于监听消费处理对象的消费者功能。在BasicConsumer类中添加

package com.debug.middleware.server.rabbitmq;
import com.debug.middleware.server.entity.Student;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* @className:
* @PackageName: com.debug.middleware.server.entity
* @author: youjp
* @create: 2020-04-06 20:49
* @description: TODO 基本消费模型:消费者
* @Version: 1.0
*/
@Component
public class BasicConsumer {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(BasicConsumer.class);
    //定义RabbitMQ消息操作组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 监听并消费队列中的消息-监听消费处理对象信息。
     * @param s
     */
    @RabbitListener(queues = "${mq.object.info.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeObjectMsg(@Payload Student s){
        try {
            logger.info("基本消息模型-监听消费者处理对象-消费者-监听消费到消息:{}",s);
        } catch (Exception e) {
            logger.error("基本消息模型-消费者-监听发生异常:",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
}

从上面可知,消费者监听消费消息功能对应的方法也可以直接接受对象类型的参数,前提是生产者在发送消息时指定消息的类型几这段代码:

//设置消息的类型(在这里指定消息的类型为Student类型)message
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Student.class);

6.写个单元测试

@Test
public void test2()throws Exception{
    Student student=new Student("1001","youjp","收破烂");
    basicPulisher.sendObjectMsg(student);
}

运行测试方法

20200401134307494.png

查看rabbitmq客户端后台,可看到新增了一条队列记录

20200401134307494.png

点击,可查看到

20200401134307494.png

至此关于RabbitMQ在应用系统中采用其他方式发送、接收消息的部分已经讲解完毕。值得一提的是,本篇博客再我们采用了基于DirectExchange交换机进行构建,这属于RabbitMQ消息模型中的一种,在后续我也会讲解其他消息模型。

源码下载:

https://gitee.com/yjp245/middleware_study.git


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
15天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
25天前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
67 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
9天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
3月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
4月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
4月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
251 11

相关产品

  • 云消息队列 MQ