RabbitMQ消息模型之DirectExchange消息模型实战

简介: RabbitMQ消息模型之DirectExchange消息模型实战

RabbitMQ多种消息模型实战

前面我们学习了RabbitMQ的核心基础组件,了解了基本消息模型由队列、交换机、路由构成。而在RabbitMQ的核心组件体系中,主要有4种消息模型:

基于HeadersExchange、DirectExchange、FanoutExchange、TopicExchange的消息模型;在实际生产环境中应用最广泛的莫过于后3中消息模型。

本篇文章主要介绍DirectExchange消息模型。

DirectExchange消息模型实战

DirectExchange,顾名思义也是一种交换机,具有直连传输消息的作用,即当消息进入交换机这个中转站时,交换机会检查哪个路由和自己绑定在一起,并根据生产者发送消息指定的路由进行匹配,如果能找到对应的绑定模型,则将消息直接路由传输到指定的队列,最终由队列对应的消费者进行监听消费。

此模型在RabbitMQ多种消费模型中可以说是比较正规的了,因为它需要严格意义上的绑定,即需要且必须指定特定的交换机和路由,并绑定到指定的队列中。这种严格意义的要求使得该消息模型在生产环境中具有很广泛的运用。如图为该消息模型的结构图:

20200401134307494.png

下面结合业务场景说明该消息模型的使用。


业务场景:将实体对象信息当做消息,并发送到基于DirectExchange构成的消息模型中,根据绑定的路由,将消息路由至对应绑定的队列中,最终由对应的消费者进行监听消费处理。


1.在自定义注入配置类RabbitmqConfig中创建交换机、多条队列及其绑定

package com.debug.middleware.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* @className:
* @PackageName: com.debug.middleware.server.config
* @author: youjp
* @create: 2020-04-06 16:39
* @description:    TODO  RabbitMQ自定义注入配置Bean相关组件
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class);
    //自动装配RabbitMQ的链接工厂实例
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面为单一消费者实例的配置
      * @return
     */
    @Bean("singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用json格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取到的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    /**
     * 多个消费者:主要针对高并发业务场景的配置
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(15);
        factory.setPrefetchCount(10);
        return factory;
    }
    /**
     * RabbitMQ发送消息的操作组件实例
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置发现消息后进行确认
        connectionFactory.setPublisherConfirms(true);
        //设置发现消息后返回确认信息
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件的实例对象
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出"消息发送成功"的反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }
    //定义读取配置文件的环境变量实例
    @Autowired
    private Environment env;
    /**  创建DirectExchange消息模型**/
    //创建队列1
    @Bean("directQueueOne")
    public Queue directQueueOne(){
        return new Queue(env.getProperty("mq.direct.queue.one.name"),true);
    }
    //队列2
    @Bean("directQueueTwo")
    public Queue directQueueTwo(){
        return new Queue(env.getProperty("mq.direct.queue.two.name"),true);
    }
    //创建直连交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(env.getProperty("mq.direct.exchange.name"));
    }
    //创建绑定1
    @Bean
    public Binding directBindingOne(){
        return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(env.getProperty("mq.direct.routing.key.one.name"));
    }
    //创建绑定2
    @Bean
    public Binding directBindingTwo(){
        return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(env.getProperty("mq.direct.routing.key.two.name"));
    }
}

application.yml文件配置交换机、队列名称

mq:
  env: local
  #定义直连式消息模型-directExchange: 创建队列1,2, 路由1,2 交换机
  direct:
     queue:
        one:
          name: ${mq.env}.middleware.mq.direct.one.queue
        two:
          name: ${mq.env}.middleware.mq.direct.two.queue
     exchange:
        name:  ${mq.env}.middleware.mq.direct.exchange
     routing:
        key:
          one:
            name:    ${mq.env}.middleware.mq.direct.routing.key.one.name
          two:
            name:  ${mq.env}.middleware.mq.direct.routing.key.two.name

2.创建对象实体信息EventInfo

package com.debug.middleware.server.rabbitmq.entity;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.entity
* @author: youjp
* @create: 2020-04-08 20:21
* @description:    TDOO 实体对象信息
* @Version: 1.0
*/
@Data
@ToString
public class EventInfo implements Serializable {
    private Integer id; //id标识
    private String module; //模块
    private String name; //名称
    private String desc;// 描述
    public EventInfo(){
    }
    public EventInfo(Integer id, String module, String name, String desc) {
        this.id = id;
        this.module = module;
        this.name = name;
        this.desc = desc;
    }
}

3.创建对象实体生产者,这里我们创建了两个路由、队列的绑定。

package com.debug.middleware.server.rabbitmq.publisher;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.publisher
* @author: youjp
* @create: 2020-04-08 20:26
* @description: TODO 消息生产者
* @Version: 1.0
*/
@Component
public class ModelPublisher {
    private Logger logger = LoggerFactory.getLogger(ModelPublisher.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //json序列化和反序列化组件
    @Autowired
    private ObjectMapper objectMapper;
    //定义读取配置文件的环境变量实例
    @Autowired
    private Environment env;
    /**
     * 使用DirectExchange消息模型:发送消息-one
     * @param eventInfo
     */
    public void sendMsgOneByDirectExchange(EventInfo eventInfo) {
        if (eventInfo != null) {
            //定义消息传输格式为json
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //定义交换机为直连式交换机
            rabbitTemplate.setExchange(env.getProperty("mq.direct.exchange.name"));
            //定义路由1
            rabbitTemplate.setRoutingKey(env.getProperty("mq.direct.routing.key.one.name"));
            try {
                //创建消息
                Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(eventInfo)).build();
                logger.info("消息模型DirectExchange-one-生产者-发送消息:{}", eventInfo);
                //发送消息
                rabbitTemplate.convertAndSend(message);
            } catch (JsonProcessingException e) {
                logger.error("消息模型DirectExchange-one-生产者-发送消息发送异常:{}", eventInfo, e.fillInStackTrace());
                e.printStackTrace();
            }
        }
    }
    /**
     * 使用DirectExchange消息模型:发送消息-two
     * @param eventInfo
     */
    public void sendMsgTwoByDirectExchange(EventInfo eventInfo) {
        if (eventInfo != null) {
            //定义消息传输格式为json
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //定义交换机为直连式交换机
            rabbitTemplate.setExchange(env.getProperty("mq.direct.exchange.name"));
            //定义路由1
            rabbitTemplate.setRoutingKey(env.getProperty("mq.direct.routing.key.two.name"));
            try {
                //创建消息
                Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(eventInfo)).build();
                logger.info("消息模型DirectExchange-two-生产者-发送消息:{}", eventInfo);
                //发送消息
                rabbitTemplate.convertAndSend(message);
            } catch (JsonProcessingException e) {
                logger.error("消息模型DirectExchange-two-生产者-发送消息发送异常:{}", eventInfo, e.fillInStackTrace());
                e.printStackTrace();
            }
        }
    }
}

4.开发用于监听消费消息的消费者方法。由于我们创建了两个路由、队列及其绑定,因为需要开发两个消费者方法,用于监听不同队列中的消息。

package com.debug.middleware.server.rabbitmq.consumer;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.consumer
* @author: youjp
* @create: 2020-04-08 20:41
* @description:    TODO 消息消费者
* @Version: 1.0
*/
@Component
public class ModelConsumer {
    private Logger logger=LoggerFactory.getLogger(ModelPublisher.class);
    //json序列化和反序列化组件
    @Autowired
    private ObjectMapper objectMapper;
    /** 使用DirectExchange消息模型的消费案例**/
    /**
     * @Param [bytes]
     * @return void 使用DirectExchange消息模型的消费方法
     * @Author youjp
     * @Description //TODO 监听并消费队列中消息:directExchange-one
     * @throw
     **/
    @RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.direct.queue.one.name}")
    public void consumerDirectMsgOne(@Payload byte[] bytes){
        try {
          EventInfo eventInfo=  objectMapper.readValue(bytes,EventInfo.class);
            logger.info("消息模型-directExchange-one-消费者-监听到消息:{}",eventInfo);
        } catch (IOException e) {
            logger.error("消息模型-directExchange-one-消费者-监听到消息异常:{}",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
    /**
     * @Param [bytes]
     * @return void 使用DirectExchange消息模型的消费方法
     * @Author youjp
     * @Description //TODO 监听并消费队列中消息:directExchange-two
     * @throw
     **/
    @RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.direct.queue.two.name}")
    public void consumerDirectMsgTwo(@Payload byte[] bytes){
        try {
            EventInfo eventInfo=  objectMapper.readValue(bytes,EventInfo.class);
            logger.info("消息模型-directExchange-two-消费者-监听到消息:{}",eventInfo);
        } catch (IOException e) {
            logger.error("消息模型-directExchange-two-消费者-监听到消息异常:{}",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
}

5.编写单元测试,触发生产者生产消息

package com.debug.middleware.server;
import com.debug.middleware.server.entity.Student;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @className:
* @PackageName: com.debug.middleware.server
* @author: youjp
* @create: 2020-04-06 20:58
* @description: TODO rabbitMQ 的java单元测试
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqBasicTest {
    //定义日志
    private static final Logger logger = LoggerFactory.getLogger(RabbitmqBasicTest.class);
    //定义Json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    //定义fanoutExchange消息模型中的发生消息的生产者
    @Autowired
    private ModelPublisher modelPublisher;
    //测试直连式消息模型
    @Test
    public void testDirect(){
        //构造第一个实体对象
        EventInfo eventInfoOne =new EventInfo(1,"使用DirectExchange模型","测试Direct模块1","基于DirectExchange消息模型-1");
        //第一个生产者发送消息
        modelPublisher.sendMsgOneByDirectExchange(eventInfoOne);
        //构造第二个实体对象
        EventInfo eventInfoTwo =new EventInfo(2,"使用DirectExchange模型","测试Direct模块2","基于DirectExchange消息模型-2");
        //第二个生产者发送消息
        modelPublisher.sendMsgTwoByDirectExchange(eventInfoTwo);
    }
}

点击运行测试,控制台查看测试结果:

20200401134307494.png

打开浏览器,查看RabbitMQ客户端控制台。可查看相应的队列和交换机列表

20200401134307494.png

选择其中一条队列点击,即可查看其详细信息,包括队列名称、持久化策略及绑定信息。

20200401134307494.png

至此,基于DirectExchange消息模型的大概使用已经讲解完毕。此种模型适用于业务数据需要直接传输并消费的场景,比如业务模块之间的消息交互,一般业务服务直接的通信是直接的、实时的,因而可以借助基于DirectExchange的消息模型进行通信。 事实上,在实际应用系统中,有90%的业务场景都可以采用直连式消息模型实现。

源码下载:

https://gitee.com/yjp245/middleware_study.git
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件
RabbitMQ消息模型之Work Queues
RabbitMQ消息模型之Work Queues
52 1
RabbitMQ消息模型之Work Queues
|
7月前
|
消息中间件
RabbitMQ消息模型之Routing-Topic
RabbitMQ消息模型之Routing-Topic
46 0
|
7月前
|
消息中间件 缓存
RabbitMQ消息模型之Sample
RabbitMQ消息模型之Sample
40 0
|
7月前
|
消息中间件
RabbitMQ消息模型之Routing-Direct
RabbitMQ消息模型之Routing-Direct
98 1
|
7月前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
87 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
4月前
|
消息中间件 测试技术 Kafka
Apache RocketMQ 批处理模型演进之路
RocketMQ 早期批处理模型存在一定的约束条件,为进一步提升性能,RocketMQ 进行了索引构建流水线改造,同时 BatchCQ 模型和 AutoBatch 模型也优化了批处理流程,提供了更简便的使用体验,快点击本文查看详情及配置展示~
19773 78
|
3月前
|
消息中间件 存储 缓存
RabbitMQ:WorkQueues模型
RabbitMQ:WorkQueues模型
47 8
RabbitMQ:WorkQueues模型
|
7月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
2月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
87 0
|
7月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。