RabbitMQ消息模型之FanoutExchange消息模型实战

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ消息模型之FanoutExchange消息模型实战

前面我们学习了RabbitMQ的核心基础组件,了解了基本消息模型由队列、交换机、路由构成。而在RabbitMQ的核心组件体系中,主要有4种消息模型:基于HeadersExchange、DirectExchange、FanoutExchange、TopicExchange的消息模型;在实际生产环境中应用最广泛的莫过于后3中消息模型。

本篇主要介绍FanoutExchange消息模型。

FanoutExchange消息模型实战

FanoutExchange,顾名思义是交换机的一种,具有广播消息的作用,即当前消息进入交换机这个中转站以后,交换机会检查哪个队列和自己有绑定在一起,找到相关的队列后,将消息发送到绑定的队列中,并最终由队列对应的消费者进行监听消费。


心细的人,可能会发现这里我们没有用到路由。为什么没有说道它呢? 因为,基于FanoutExchange的消息模式具有广播式的作用,纵然为它绑定了路由,也是起不了作用的。所以严格来说,FanoutExchange的模型不能称为真正的消息模型,但是该消费模型中仍然是有交换机、队列和隐形的“路由”,所以在这里我们也将它当做消息模型中的一种。


如图,为基于FanoutExchange的消息模型结构图:

20200401134307494.png

从图可以看到,生产者生产的消息首先进入交换机,并由交换机中转至绑定的N条队列中,其中N>=1, 并最终由队列所绑定的消费者进行监听接收消费处理。


下面以案例说明:将一个实体对象充当消息,然后发送到基于FanoutExchange构成的消息模型中,最终绑定的多条队列对应的消费者进行监听消费接收处理。


1.在自定义注入配置类RabbitmqConfig中创建交换机、多条队列及其绑定

package com.debug.middleware.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* @className:
* @PackageName: com.debug.middleware.server.config
* @author: youjp
* @create: 2020-04-06 16:39
* @description:    TODO  RabbitMQ自定义注入配置Bean相关组件
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class);
    //自动装配RabbitMQ的链接工厂实例
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面为单一消费者实例的配置
      * @return
     */
    @Bean("singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用json格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取到的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    /**
     * 多个消费者:主要针对高并发业务场景的配置
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(10);
        factory.setMaxConcurrentConsumers(15);
        factory.setPrefetchCount(10);
        return factory;
    }
    /**
     * RabbitMQ发送消息的操作组件实例
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置发现消息后进行确认
        connectionFactory.setPublisherConfirms(true);
        //设置发现消息后返回确认信息
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件的实例对象
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出"消息发送成功"的反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        //发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }
    //定义读取配置文件的环境变量实例
    @Autowired
    private Environment env;
    /**  创建消息模型FanoutExchange消息模型**/
    //创建队列1
    @Bean("fanoutQueueOne")
    public Queue fanoutQueueOne(){
    return new Queue(env.getProperty("mq.fanout.queue.one.name"),true);
    }
    //创建队列2
    @Bean("fanoutQueueTwo")
    public  Queue fanoutQueueTwo(){
        return new Queue(env.getProperty("mq.fanout.queue.two.name"),true);
    }
    //创建交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(env.getProperty("mq.fanout.queue.exchange.name"),true,false);
    }
    //创建绑定1
    @Bean
    public Binding fanoutBindingOne(){
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }
    //创建绑定2
    @Bean
    public Binding fanoutBindingTwo(){
        return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
    }
}

application.yml文件配置交换机、队列名称

mq:
  env: local
#定义广播式消息模型-fanoutExchange:创建队列1,2
  fanout:
    queue:
      one:
        name:  ${mq.env}.middleware.mq.fanout.one.queue
      two:
        name:  ${mq.env}.middleware.mq.fanout.two.queue
      exchange:
        name:  ${mq.env}.middleware.mq.fanout.exchange

2.创建对象实体信息EventInfo

package com.debug.middleware.server.rabbitmq.entity;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.entity
* @author: youjp
* @create: 2020-04-08 20:21
* @description:    TDOO 实体对象信息
* @Version: 1.0
*/
@Data
@ToString
public class EventInfo implements Serializable {
    private Integer id; //id标识
    private String module; //模块
    private String name; //名称
    private String desc;// 描述
    public EventInfo(){
    }
    public EventInfo(Integer id, String module, String name, String desc) {
        this.id = id;
        this.module = module;
        this.name = name;
        this.desc = desc;
    }
}

3.创建对象实体生产者

package com.debug.middleware.server.rabbitmq.publisher;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.publisher
* @author: youjp
* @create: 2020-04-08 20:26
* @description:    TODO 消息生产者
* @Version: 1.0
*/
@Component
public class ModelPublisher {
    private Logger logger=LoggerFactory.getLogger(ModelPublisher.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //json序列化和反序列化组件
    @Autowired
    private ObjectMapper objectMapper;
    //定义读取配置文件的环境变量实例
    @Autowired
    private Environment env;
    /**
     * 发送消息
     * @param eventInfo
     */
    public void sendMsg(EventInfo eventInfo){
        if(eventInfo!=null){
            //定义消息的传输格式为json
             rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
             //设置交换机
             rabbitTemplate.setExchange(env.getProperty("mq.fanout.queue.exchange.name"));
             //设置消息
            try {
                 Message msg= MessageBuilder.withBody(objectMapper.writeValueAsBytes(eventInfo)).build();
                 //发送消息
                 rabbitTemplate.convertAndSend(msg);
                 logger.info("消息模型fanoutExchange-生产者-发送消息:{}",msg);
            } catch (JsonProcessingException e) {
                logger.error("消息模型fanoutExchange-生产者-发送消息发送异常:{}",eventInfo,e.fillInStackTrace());
                e.printStackTrace();
            }
        }
    }
}

4.创建对象实体消费者,在该消费者类中,将开放两条队列分别对应的监听消费方法

package com.debug.middleware.server.rabbitmq.consumer;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.consumer
* @author: youjp
* @create: 2020-04-08 20:41
* @description:    TODO 消息消费者
* @Version: 1.0
*/
@Component
public class ModelConsumer {
    private Logger logger=LoggerFactory.getLogger(ModelPublisher.class);
    //json序列化和反序列化组件
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * @Param [msg]
     * @return void
     * @Author youjp
     * @Description //TODO 第一个队列消费者
     * @throw
     **/
    @RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.fanout.queue.one.name}")
    public void consumerFanoutMsgOne(@Payload byte[] msg){
        try {
            //监听消息队列中的消息,并进行解析处理
            EventInfo eventInfo=objectMapper.readValue(msg,EventInfo.class);
            logger.info("消息模型-fanoutExchange-one-消费者-监听到消息:{}",eventInfo);
        }catch (IOException e) {
            logger.error("消息模型-fanoutExchange-one-消费者-监听到消息异常:{}",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
    /**
     * @Param [msg]
     * @return void
     * @Author youjp
     * @Description //TODO 第2个队列消费者
     * @throw
     **/
    @RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.fanout.queue.two.name}")
    public void consumerFanoutMsgTwo(@Payload byte[] msg){
        try {
            //监听消息队列中的消息,并进行解析处理
            EventInfo eventInfo=objectMapper.readValue(msg,EventInfo.class);
            logger.info("消息模型-fanoutExchange-two-消费者-监听到消息:{}",eventInfo);
        }catch (IOException e) {
            logger.error("消息模型-fanoutExchange-two-消费者-监听到消息异常:{}",e.fillInStackTrace());
            e.printStackTrace();
        }
    }
}

5.编写单元测试,触发生产者生产消息

package com.debug.middleware.server;
import com.debug.middleware.server.entity.Student;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @className:
* @PackageName: com.debug.middleware.server
* @author: youjp
* @create: 2020-04-06 20:58
* @description:    TODO rabbitMQ 的java单元测试
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqBasicTest {
    //定义日志
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqBasicTest.class);
    //定义Json序列化和反序列化实例
    @Autowired
    private ObjectMapper objectMapper;
    //定义fanoutExchange消息模型中的发生消息的生产者
    @Autowired
    private ModelPublisher modelPublisher;
    @Test
    public void testFanout(){
        EventInfo eventInfo=new EventInfo(1,"基于FanoutExchange模型","测试fanout模块","这是基于FanoutExchange模型");
        //触发生产者发送消息
        modelPublisher.sendMsg(eventInfo);
    }
}

运行改测试方法,查看控制台的输出结果。可以看到生产者生产消息并由RabbitTemplate操作组件发送成功,同时由于该FanoutExchange绑定了两条队列,所以两条队列分别对应的消费者将监听接收到相应的消息。

20200401134307494.png

打开浏览器,在地址栏输入http://127.0.0.1:15672,输入账号密码,进入RabbitMQ后端控制台,可查看到相应队列和交换机列表:

20200401134307494.png

点击可查看到该交换机绑定的队列

20200401134307494.png

至此,基于FanoutExchange消息模型的大概使用已经讲解完毕,此消费模型适用于“业务数据需要广播式传输”的场景,比如“用户操作写日志”。


当用户在系统中做了某种操作以后,需要在本身业务系统中将用户的操作内容记入数据库。同时也需要单独将用户的操作内容传输到专门的日子系统中进行存储(以便后续系统,进行日志分析等)。这个时候,可以将用户操作的日志封装为实体对象,并将其序列化后的json数据充当消息,最终采用广播式的交换机,即FanoutExchange消息模型进行发送、接收和处理。

源码下载:

https://gitee.com/yjp245/middleware_study.git
相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
328 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
3月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
3月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
3月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
1月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
468 7
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
3月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
|
3月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
3月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
3月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
3月前
|
消息中间件 存储 Kafka
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案