RabbitMQ中间件的使用(1)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: RabbitMQ中间件的使用(1)

RabbitMQ的介绍

rabbitMQ是一款用于接收、存储和转发消息的开源中间件,在实际的应用系统可以实现消息分发、异步通信和业务模块解耦、延迟处理等功能。

RabbitMQ的核心要点在于消息、消息模型、生产者和消费者,而RabbitMQ的“消息模型”有许多种,包括基于FanoutExchange的消息模型、基于DirectExchange的消息模型和基于TopicExchange的消息模型等。这些消息模型都有一个共性,那就是他们几乎都包含交换机、路由和队列等基础组件。

举个例子,RabbitMQ和邮局类型,寄过邮件的人想必知道邮件的核心要素主要包括了邮件、邮递箱子、寄信者、收信人、邮递员。如图所示:

20200401134307494.png

其中,寄信者相当于RabbitMQ的消息生产者,邮件相当于消息,收信人相当于RabbitMQ的消息消费者,而邮件信箱和快递员可以看做RabbitMQ消息模型中的交换机和队列。

接下来,简要介绍一下RabbitMQ在实际的应用开发中涉及的这些核心基础组件。

生产者:用于产生、发生消息的程序。


消费者:用于监听、接收、消费和处理消息的程序。


消息:可以看做是实际的数据,可能是一串文字,一张图片等。在RabbitMQ底层系统架构中,消息是可以通过二进制的数据流进行传输的。


队列:消息的暂存区或者存储区,可以看做是一个“中转站”,消息经过这个“中转站”后,便将消息传输到消费者手中。


交换机:同样可以看成消息的中转站点。用于首次接收和分发消息,其中包括Headers、Fanout、Direct和Topic这4种。


路由:相当于密钥、地址或者第三者,一般不单独使用,而是与交换机绑定在一起,将消息路由到指定的队列。

以上的介绍便是RabbitMQ的几大核心基础组件。值得一提的是,RabbitMQ的消息模型主要是由队列、交换机和路由三大组件组成,如图所示:

20200401134307494.png

Springboot整合RabbitMQ

1.在POM文件中添加RabbitMQ所需要的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.4.RELEASE</version>
</dependency>

2.在application.yml配置文件中加入RabbitMQ的配置,包括RabbitMQ服务器所在的Host、端口号、用户名和密码等配置,代码如下:

#RabitMQ的配置    
spring:
  rabbitmq:
    virtual-host: /
    host: 192.168.216.129
    port: 15672
    username: admin
    password: admin

3.自定义配置Bean相关组件


在spring boot整合RabbitMQ的项目中,为了方便的使用RabbitMQ的相关操作组件并跟踪消息在发送过程中的状态,可以在项目中自定义注入和配置Bean相关组件。下面我们将需要加入自定义配置的Bean组件放到RabbitmqConfig配置类中,该配置类的源代码如下:

package com.debug.middleware.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @className:
* @PackageName: com.debug.middleware.server.config
* @author: youjp
* @create: 2020-04-06 16:39
* @description:    TODO  RabbitMQ自定义注入配置Bean相关组件
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class);
    //自动装配RabbitMQ的链接工厂实例
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面为单一消费者实例的配置
      * @return
     */
    @Bean("singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用json格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取到的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    /**
     * 多个消费者:主要针对高并发业务场景的配置
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(15);
        factory.setPrefetchCount(10);
        return factory;
    }
    /**
     * RabbitMQ发送消息的操作组件实例
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置发现消息后进行确认
        connectionFactory.setPublisherConfirms(true);
        //设置发现消息后返回确认信息
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件的实例对象
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出"消息发送成功"的反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }
}

RabbitMQ发送、接收消息实战

至此,一切准备工作已经就绪。接下来进行代码实战环节。即基于Spring Boot整合RabbitMQ的项目中创建队列、交换机、路由及其绑定,并采用多种方式实现消息发送和接收。下面以“生产者发送一串简单的字符串信息到基本的消息模型中,并由消费者进行监听消费处理”为例进行代码演练。


1.在刚才的RabbitmqConfig类中创建队列、交换机、路由及其绑定,代码如下:

//定义读取配置文件的环境变量实例
@Autowired
private Environment env;
//创建队列
@Bean("basicQueue")
public Queue basicQueue(){
    return new Queue(env.getProperty("mq.basic.info.queue.name"),true);
}
//创建交换机:这里使用的是DirectExchange 消息模型
@Bean("basicExcange")
public DirectExchange basicExchange(){
    return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false);
}
//创建绑定
@Bean
public Binding basicBinding(){
    return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name"));
}

其中,环节变量实例env读取的相关变量是配置在配置文件application.yml中的,相关配置如下:

mq:
  env: local
#定义基本消息模型中队列、交换机和路由的名称
  basic:
      info:
        queue:
          name: ${mq.env}.middleware.mq.basic.info.queue
        exchange:
          name:  ${mq.env}.middleware.mq.basic.info.exchange
        routing:
          key:
            name: ${mq.env}.middleware.mq.basic.info.routing.key

2.开发发送消息的生产者BasicPublisher,在这里指定待发送的消息为一串字符串,代码如下:

package com.debug.middleware.server.rabbitmq;
import org.assertj.core.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* @className:
* @PackageName: com.debug.middleware.server.rabbitmq
* @author: youjp
* @create: 2020-04-06 20:31
* @description: TODO  基本消息模型- 生产者
* @Version: 1.0
*/
@Component
public class BasicPulisher {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(BasicPulisher.class);
    //定义RabbitMQ消息操作组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义环境读取实例
    @Autowired
    private Environment env;
    /**
     * 发送消息
     * @param msg
     */
    public void sendMsg(String msg){
        //判断字符串值是否为空
        if (!Strings.isNullOrEmpty(msg)){
            //定义消息传输格式为JSON字符串格式
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //指定消息模型中的交换机
            rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
            //指定消息模型中的交换机
            rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
            try {
                //将字符串转化为待发送的消息,即一串二进制的数据流
             Message message=  MessageBuilder.withBody(msg.getBytes("utf-8")).build();
                //转化并发送消息
             rabbitTemplate.convertAndSend(message);
             logger.info("基本消息模型-生产者-发送消息:{}",msg);
            } catch (Exception e) {
                logger.info("基本消息模型-生产者-发送消息发生异常{}",msg,e.fillInStackTrace());
                e.printStackTrace();
            }
        }
    }
}

3.开发监听并接收消费处理消息的消费者实例BasicConsumer,其源代码如下:

package com.debug.middleware.server.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
* @className:
* @PackageName: com.debug.middleware.server.entity
* @author: youjp
* @create: 2020-04-06 20:49
* @description: TODO 基本消费模型:消费者
* @Version: 1.0
*/
@Component
public class BasicConsumer {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(BasicConsumer.class);
    //定义RabbitMQ消息操作组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    @RabbitListener(queues = "${mq.basic.info.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMsg(@Payload byte[] msg){
        try {
            String message= new String(msg,"utf-8");
            logger.info("基本消息模型-消费者-监听消费到消息:{}",message);
        } catch (Exception e) {
            logger.info("基本消息模型-消费者-监听发生异常:",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
}

4.写个java单元测试类RabbitmqBasicTest,并在该类上开发用于触发上述基本消费模型中生产者发送消息的发法:

package com.debug.middleware.server;
import com.debug.middleware.server.rabbitmq.BasicPulisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @className:
* @PackageName: com.debug.middleware.server
* @author: youjp
* @create: 2020-04-06 20:58
* @description:    TODO rabbitMQ 的java单元测试
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqBasicTest {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqBasicTest.class);
    //定义Json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    //定义基本消息模型中的发生消息的生产者
    @Autowired
    private BasicPulisher basicPulisher;
    @Test
    public void test1() throws Exception{
        String msg="发送一段测试消息";
        basicPulisher.sendMsg(msg);
    }
}

运行测试案例,查看控制台输出

20200401134307494.png

查看RabbitMQ后端控制台创建的基本消息模型,可查看到自动添加了一条队列记录

20200401134307494.png

点击后可查看到该队列绑定的交换机

20200401134307494.png

20200401134307494.png


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 存储 网络协议
MQ(消息中间件)概述及 RabbitMQ 的基本介绍
MQ(消息中间件)概述及 RabbitMQ 的基本介绍
212 0
|
1月前
|
消息中间件 中间件 Kafka
原来RocketMQ中间件可以这么玩
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
44 0
|
1月前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
1月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
|
7月前
|
消息中间件 Java Maven
消息中间件系列教程(07) -RabbitMQ -案例代码(点对点队列模式)
消息中间件系列教程(07) -RabbitMQ -案例代码(点对点队列模式)
44 1
|
7月前
|
消息中间件 Java Maven
消息中间件系列教程(11) -RabbitMQ -案例代码(通配符模式)
消息中间件系列教程(11) -RabbitMQ -案例代码(通配符模式)
49 0
|
7月前
|
消息中间件 Java Maven
消息中间件系列教程(10) -RabbitMQ -案例代码(路由模式)
消息中间件系列教程(10) -RabbitMQ -案例代码(路由模式)
49 0
|
7月前
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
40 0
|
7月前
|
消息中间件 Java Maven
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
41 0
|
8月前
|
消息中间件 NoSQL 关系型数据库
【Kubernetes部署Shardingsphere、Mycat、Mysql、Redis、中间件Rocketmq、Rabbitmq、Nacos】
【Kubernetes部署Shardingsphere、Mycat、Mysql、Redis、中间件Rocketmq、Rabbitmq、Nacos】
162 0

相关产品

  • 云消息队列 MQ