RabbitMQ中间件的使用(1)

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: RabbitMQ中间件的使用(1)

RabbitMQ的介绍

rabbitMQ是一款用于接收、存储和转发消息的开源中间件,在实际的应用系统可以实现消息分发、异步通信和业务模块解耦、延迟处理等功能。

RabbitMQ的核心要点在于消息、消息模型、生产者和消费者,而RabbitMQ的“消息模型”有许多种,包括基于FanoutExchange的消息模型、基于DirectExchange的消息模型和基于TopicExchange的消息模型等。这些消息模型都有一个共性,那就是他们几乎都包含交换机、路由和队列等基础组件。

举个例子,RabbitMQ和邮局类型,寄过邮件的人想必知道邮件的核心要素主要包括了邮件、邮递箱子、寄信者、收信人、邮递员。如图所示:

20200401134307494.png

其中,寄信者相当于RabbitMQ的消息生产者,邮件相当于消息,收信人相当于RabbitMQ的消息消费者,而邮件信箱和快递员可以看做RabbitMQ消息模型中的交换机和队列。

接下来,简要介绍一下RabbitMQ在实际的应用开发中涉及的这些核心基础组件。

生产者:用于产生、发生消息的程序。


消费者:用于监听、接收、消费和处理消息的程序。


消息:可以看做是实际的数据,可能是一串文字,一张图片等。在RabbitMQ底层系统架构中,消息是可以通过二进制的数据流进行传输的。


队列:消息的暂存区或者存储区,可以看做是一个“中转站”,消息经过这个“中转站”后,便将消息传输到消费者手中。


交换机:同样可以看成消息的中转站点。用于首次接收和分发消息,其中包括Headers、Fanout、Direct和Topic这4种。


路由:相当于密钥、地址或者第三者,一般不单独使用,而是与交换机绑定在一起,将消息路由到指定的队列。

以上的介绍便是RabbitMQ的几大核心基础组件。值得一提的是,RabbitMQ的消息模型主要是由队列、交换机和路由三大组件组成,如图所示:

20200401134307494.png

Springboot整合RabbitMQ

1.在POM文件中添加RabbitMQ所需要的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.4.RELEASE</version>
</dependency>

2.在application.yml配置文件中加入RabbitMQ的配置,包括RabbitMQ服务器所在的Host、端口号、用户名和密码等配置,代码如下:

#RabitMQ的配置    
spring:
  rabbitmq:
    virtual-host: /
    host: 192.168.216.129
    port: 15672
    username: admin
    password: admin

3.自定义配置Bean相关组件


在spring boot整合RabbitMQ的项目中,为了方便的使用RabbitMQ的相关操作组件并跟踪消息在发送过程中的状态,可以在项目中自定义注入和配置Bean相关组件。下面我们将需要加入自定义配置的Bean组件放到RabbitmqConfig配置类中,该配置类的源代码如下:

package com.debug.middleware.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @className:
* @PackageName: com.debug.middleware.server.config
* @author: youjp
* @create: 2020-04-06 16:39
* @description:    TODO  RabbitMQ自定义注入配置Bean相关组件
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class);
    //自动装配RabbitMQ的链接工厂实例
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面为单一消费者实例的配置
      * @return
     */
    @Bean("singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用json格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取到的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    /**
     * 多个消费者:主要针对高并发业务场景的配置
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(15);
        factory.setPrefetchCount(10);
        return factory;
    }
    /**
     * RabbitMQ发送消息的操作组件实例
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置发现消息后进行确认
        connectionFactory.setPublisherConfirms(true);
        //设置发现消息后返回确认信息
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件的实例对象
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出"消息发送成功"的反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }
}

RabbitMQ发送、接收消息实战

至此,一切准备工作已经就绪。接下来进行代码实战环节。即基于Spring Boot整合RabbitMQ的项目中创建队列、交换机、路由及其绑定,并采用多种方式实现消息发送和接收。下面以“生产者发送一串简单的字符串信息到基本的消息模型中,并由消费者进行监听消费处理”为例进行代码演练。


1.在刚才的RabbitmqConfig类中创建队列、交换机、路由及其绑定,代码如下:

//定义读取配置文件的环境变量实例
@Autowired
private Environment env;
//创建队列
@Bean("basicQueue")
public Queue basicQueue(){
    return new Queue(env.getProperty("mq.basic.info.queue.name"),true);
}
//创建交换机:这里使用的是DirectExchange 消息模型
@Bean("basicExcange")
public DirectExchange basicExchange(){
    return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false);
}
//创建绑定
@Bean
public Binding basicBinding(){
    return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name"));
}

其中,环节变量实例env读取的相关变量是配置在配置文件application.yml中的,相关配置如下:

mq:
  env: local
#定义基本消息模型中队列、交换机和路由的名称
  basic:
      info:
        queue:
          name: ${mq.env}.middleware.mq.basic.info.queue
        exchange:
          name:  ${mq.env}.middleware.mq.basic.info.exchange
        routing:
          key:
            name: ${mq.env}.middleware.mq.basic.info.routing.key

2.开发发送消息的生产者BasicPublisher,在这里指定待发送的消息为一串字符串,代码如下:

package com.debug.middleware.server.rabbitmq;
import org.assertj.core.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* @className:
* @PackageName: com.debug.middleware.server.rabbitmq
* @author: youjp
* @create: 2020-04-06 20:31
* @description: TODO  基本消息模型- 生产者
* @Version: 1.0
*/
@Component
public class BasicPulisher {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(BasicPulisher.class);
    //定义RabbitMQ消息操作组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义环境读取实例
    @Autowired
    private Environment env;
    /**
     * 发送消息
     * @param msg
     */
    public void sendMsg(String msg){
        //判断字符串值是否为空
        if (!Strings.isNullOrEmpty(msg)){
            //定义消息传输格式为JSON字符串格式
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //指定消息模型中的交换机
            rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
            //指定消息模型中的交换机
            rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
            try {
                //将字符串转化为待发送的消息,即一串二进制的数据流
             Message message=  MessageBuilder.withBody(msg.getBytes("utf-8")).build();
                //转化并发送消息
             rabbitTemplate.convertAndSend(message);
             logger.info("基本消息模型-生产者-发送消息:{}",msg);
            } catch (Exception e) {
                logger.info("基本消息模型-生产者-发送消息发生异常{}",msg,e.fillInStackTrace());
                e.printStackTrace();
            }
        }
    }
}

3.开发监听并接收消费处理消息的消费者实例BasicConsumer,其源代码如下:

package com.debug.middleware.server.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
* @className:
* @PackageName: com.debug.middleware.server.entity
* @author: youjp
* @create: 2020-04-06 20:49
* @description: TODO 基本消费模型:消费者
* @Version: 1.0
*/
@Component
public class BasicConsumer {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(BasicConsumer.class);
    //定义RabbitMQ消息操作组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    @RabbitListener(queues = "${mq.basic.info.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMsg(@Payload byte[] msg){
        try {
            String message= new String(msg,"utf-8");
            logger.info("基本消息模型-消费者-监听消费到消息:{}",message);
        } catch (Exception e) {
            logger.info("基本消息模型-消费者-监听发生异常:",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
}

4.写个java单元测试类RabbitmqBasicTest,并在该类上开发用于触发上述基本消费模型中生产者发送消息的发法:

package com.debug.middleware.server;
import com.debug.middleware.server.rabbitmq.BasicPulisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @className:
* @PackageName: com.debug.middleware.server
* @author: youjp
* @create: 2020-04-06 20:58
* @description:    TODO rabbitMQ 的java单元测试
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqBasicTest {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqBasicTest.class);
    //定义Json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    //定义基本消息模型中的发生消息的生产者
    @Autowired
    private BasicPulisher basicPulisher;
    @Test
    public void test1() throws Exception{
        String msg="发送一段测试消息";
        basicPulisher.sendMsg(msg);
    }
}

运行测试案例,查看控制台输出

20200401134307494.png

查看RabbitMQ后端控制台创建的基本消息模型,可查看到自动添加了一条队列记录

20200401134307494.png

点击后可查看到该队列绑定的交换机

20200401134307494.png

20200401134307494.png


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
3月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
3月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
19天前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
43 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
2月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
113 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
1月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
4月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
4月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
5月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决

热门文章

最新文章

相关产品

  • 云消息队列 MQ