RabbitMQ的介绍
rabbitMQ是一款用于接收、存储和转发消息的开源中间件,在实际的应用系统可以实现消息分发、异步通信和业务模块解耦、延迟处理等功能。
RabbitMQ的核心要点在于消息、消息模型、生产者和消费者,而RabbitMQ的“消息模型”有许多种,包括基于FanoutExchange的消息模型、基于DirectExchange的消息模型和基于TopicExchange的消息模型等。这些消息模型都有一个共性,那就是他们几乎都包含交换机、路由和队列等基础组件。
举个例子,RabbitMQ和邮局类型,寄过邮件的人想必知道邮件的核心要素主要包括了邮件、邮递箱子、寄信者、收信人、邮递员。如图所示:
其中,寄信者相当于RabbitMQ的消息生产者,邮件相当于消息,收信人相当于RabbitMQ的消息消费者,而邮件信箱和快递员可以看做RabbitMQ消息模型中的交换机和队列。
接下来,简要介绍一下RabbitMQ在实际的应用开发中涉及的这些核心基础组件。
生产者:用于产生、发生消息的程序。
消费者:用于监听、接收、消费和处理消息的程序。
消息:可以看做是实际的数据,可能是一串文字,一张图片等。在RabbitMQ底层系统架构中,消息是可以通过二进制的数据流进行传输的。
队列:消息的暂存区或者存储区,可以看做是一个“中转站”,消息经过这个“中转站”后,便将消息传输到消费者手中。
交换机:同样可以看成消息的中转站点。用于首次接收和分发消息,其中包括Headers、Fanout、Direct和Topic这4种。
路由:相当于密钥、地址或者第三者,一般不单独使用,而是与交换机绑定在一起,将消息路由到指定的队列。
以上的介绍便是RabbitMQ的几大核心基础组件。值得一提的是,RabbitMQ的消息模型主要是由队列、交换机和路由三大组件组成,如图所示:
Springboot整合RabbitMQ
1.在POM文件中添加RabbitMQ所需要的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.4.RELEASE</version> </dependency>
2.在application.yml配置文件中加入RabbitMQ的配置,包括RabbitMQ服务器所在的Host、端口号、用户名和密码等配置,代码如下:
#RabitMQ的配置 spring: rabbitmq: virtual-host: / host: 192.168.216.129 port: 15672 username: admin password: admin
3.自定义配置Bean相关组件
在spring boot整合RabbitMQ的项目中,为了方便的使用RabbitMQ的相关操作组件并跟踪消息在发送过程中的状态,可以在项目中自定义注入和配置Bean相关组件。下面我们将需要加入自定义配置的Bean组件放到RabbitmqConfig配置类中,该配置类的源代码如下:
package com.debug.middleware.server.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @className: * @PackageName: com.debug.middleware.server.config * @author: youjp * @create: 2020-04-06 16:39 * @description: TODO RabbitMQ自定义注入配置Bean相关组件 * @Version: 1.0 */ @Configuration public class RabbitmqConfig { //定义日志 private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class); //自动装配RabbitMQ的链接工厂实例 @Autowired private CachingConnectionFactory connectionFactory; //自动装配消息监听器所在的容器工厂配置类实例 @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 下面为单一消费者实例的配置 * @return */ @Bean("singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ //定义消息监听器所在的容器工厂 SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory(); //设置容器工厂所用的实例 factory.setConnectionFactory(connectionFactory); //设置消息在传输中的格式,在这里采用json格式进行传输 factory.setMessageConverter(new Jackson2JsonMessageConverter()); //设置并发消费者实例的初始数量。在这里为1个 factory.setConcurrentConsumers(1); //设置并发消费者实例的最大数量。在这里为1个 factory.setMaxConcurrentConsumers(1); //设置并发消费者实例中每个实例拉取到的消息数量-在这里为1个 factory.setPrefetchCount(1); return factory; } /** * 多个消费者:主要针对高并发业务场景的配置 * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setConcurrentConsumers(10); factory.setMaxConcurrentConsumers(15); factory.setPrefetchCount(10); return factory; } /** * RabbitMQ发送消息的操作组件实例 * @return */ @Bean public RabbitTemplate rabbitTemplate(){ //设置发现消息后进行确认 connectionFactory.setPublisherConfirms(true); //设置发现消息后返回确认信息 connectionFactory.setPublisherReturns(true); //构造发送消息组件的实例对象 RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); //发送消息后,如果发送成功,则输出"消息发送成功"的反馈信息 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } }
RabbitMQ发送、接收消息实战
至此,一切准备工作已经就绪。接下来进行代码实战环节。即基于Spring Boot整合RabbitMQ的项目中创建队列、交换机、路由及其绑定,并采用多种方式实现消息发送和接收。下面以“生产者发送一串简单的字符串信息到基本的消息模型中,并由消费者进行监听消费处理”为例进行代码演练。
1.在刚才的RabbitmqConfig类中创建队列、交换机、路由及其绑定,代码如下:
//定义读取配置文件的环境变量实例 @Autowired private Environment env; //创建队列 @Bean("basicQueue") public Queue basicQueue(){ return new Queue(env.getProperty("mq.basic.info.queue.name"),true); } //创建交换机:这里使用的是DirectExchange 消息模型 @Bean("basicExcange") public DirectExchange basicExchange(){ return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false); } //创建绑定 @Bean public Binding basicBinding(){ return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name")); }
其中,环节变量实例env读取的相关变量是配置在配置文件application.yml中的,相关配置如下:
mq: env: local #定义基本消息模型中队列、交换机和路由的名称 basic: info: queue: name: ${mq.env}.middleware.mq.basic.info.queue exchange: name: ${mq.env}.middleware.mq.basic.info.exchange routing: key: name: ${mq.env}.middleware.mq.basic.info.routing.key
2.开发发送消息的生产者BasicPublisher,在这里指定待发送的消息为一串字符串,代码如下:
package com.debug.middleware.server.rabbitmq; import org.assertj.core.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; /** * @className: * @PackageName: com.debug.middleware.server.rabbitmq * @author: youjp * @create: 2020-04-06 20:31 * @description: TODO 基本消息模型- 生产者 * @Version: 1.0 */ @Component public class BasicPulisher { //定义日志 private static final Logger logger=LoggerFactory.getLogger(BasicPulisher.class); //定义RabbitMQ消息操作组件 @Autowired private RabbitTemplate rabbitTemplate; //定义环境读取实例 @Autowired private Environment env; /** * 发送消息 * @param msg */ public void sendMsg(String msg){ //判断字符串值是否为空 if (!Strings.isNullOrEmpty(msg)){ //定义消息传输格式为JSON字符串格式 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //指定消息模型中的交换机 rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name")); //指定消息模型中的交换机 rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name")); try { //将字符串转化为待发送的消息,即一串二进制的数据流 Message message= MessageBuilder.withBody(msg.getBytes("utf-8")).build(); //转化并发送消息 rabbitTemplate.convertAndSend(message); logger.info("基本消息模型-生产者-发送消息:{}",msg); } catch (Exception e) { logger.info("基本消息模型-生产者-发送消息发生异常{}",msg,e.fillInStackTrace()); e.printStackTrace(); } } } }
3.开发监听并接收消费处理消息的消费者实例BasicConsumer,其源代码如下:
package com.debug.middleware.server.rabbitmq; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; /** * @className: * @PackageName: com.debug.middleware.server.entity * @author: youjp * @create: 2020-04-06 20:49 * @description: TODO 基本消费模型:消费者 * @Version: 1.0 */ @Component public class BasicConsumer { //定义日志 private static final Logger logger=LoggerFactory.getLogger(BasicConsumer.class); //定义RabbitMQ消息操作组件 @Autowired private RabbitTemplate rabbitTemplate; //定义json序列化和反序列化实例 @Autowired private ObjectMapper objectMapper; @RabbitListener(queues = "${mq.basic.info.queue.name}",containerFactory = "singleListenerContainer") public void consumeMsg(@Payload byte[] msg){ try { String message= new String(msg,"utf-8"); logger.info("基本消息模型-消费者-监听消费到消息:{}",message); } catch (Exception e) { logger.info("基本消息模型-消费者-监听发生异常:",e.fillInStackTrace()); e.printStackTrace(); } } }
4.写个java单元测试类RabbitmqBasicTest,并在该类上开发用于触发上述基本消费模型中生产者发送消息的发法:
package com.debug.middleware.server; import com.debug.middleware.server.rabbitmq.BasicPulisher; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * @className: * @PackageName: com.debug.middleware.server * @author: youjp * @create: 2020-04-06 20:58 * @description: TODO rabbitMQ 的java单元测试 * @Version: 1.0 */ @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RabbitmqBasicTest { //定义日志 private static final Logger logger=LoggerFactory.getLogger(RabbitmqBasicTest.class); //定义Json序列化和反序列化实例 @Autowired private ObjectMapper objectMapper; //定义基本消息模型中的发生消息的生产者 @Autowired private BasicPulisher basicPulisher; @Test public void test1() throws Exception{ String msg="发送一段测试消息"; basicPulisher.sendMsg(msg); } }
运行测试案例,查看控制台输出
查看RabbitMQ后端控制台创建的基本消息模型,可查看到自动添加了一条队列记录
点击后可查看到该队列绑定的交换机