开发者社区> javaedge> 正文

用了这么久的RabbitMQ异步编程竟然都是错的!(中)

简介: 用了这么久的RabbitMQ异步编程竟然都是错的!
+关注继续查看

2.2 RabbitMQ广播、工作队列模式坑

消息模式是广播 Or 工作队列

  • 消息广播,即希望同一消息,不同消费者都能分别消费
  • 队列模式,即不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。

比如同一用户的注册消息

  • 会员服务需监听以发送欢迎短信
  • 营销服务需监听以发送新用户小礼物

但会员、营销服务可能都有多实例,业务需求同一用户的消息,可同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务1和会员服务2),不管哪个实例来处理,处理一次即可(工作队列模式):

image.png

实现代码时务必确认MQ系统的机制,确保消息的路由按期望。

RocketMQ实现类似功能比较简单直白:若消费者属于一个组,那么消息只会由同组的一个消费者消费;若消费者属不同组,每个组都能消费一遍消息。


而RabbitMQ的消息路由模式采用队列+交换器,队列是消息载体,交换器决定消息路由到队列的方式。

step1:会员服务-监听用户服务发出的新用户注册消息

若启动俩会员服务,那么同一用户的注册消息应只能被其中一个实例消费。

分别实现RabbitMQ队列、交换器、绑定三件套。

  • 队列使用匿名队列
  • 交换器使用DirectExchange,交换器绑定到匿名队列的路由Key是空字符串

收到消息之后,打印所在实例使用的端口。

  • 消息发布者、消费者、以及MQ的配置
  • 13.png
  • 使用12345和45678两个端口启动两个程序实例后,调用sendMessage接口发送一条消息,输出的日志,显示同一会员服务两个实例都收到了消息:
  • image.png
  • 问题在于不明

RabbitMQ直接交换器和队列的绑定关系

RabbitMQ的直接交换器根据routingKey路由消息。而程序每次启动都会创建匿名(随机命名)队列,所以每个会员服务实例都对应独立的队列,以空routingKey绑定到直接交换器。

用户服务发消息时也设置了空routingKey,所以直接交换器收到消息后,发现匹配俩队列,于是都转发消息

image.png

修复

对会员服务不要使用匿名队列,而使用同一队列。

将上面代码中的匿名队列换做普通队列:

private static final String QUEUE = "newuserQueue";
@Bean
public Queue queue() {
    return new Queue(QUEUE);
}

这样对同一消息,俩实例中只有一个实例可收到,不同消息被轮询发给不同实例。

  • 现在的交换器和队列关系
  • image.png

step2:用户服务-广播消息给会员、营销服务

期望会员、营销服务都能收到广播消息,但会员/营销服务中的每个实例只需收到一次消息。

声明一个队列和一个FanoutExchange,然后模拟俩用户服务和俩营销服务:

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutwrong")
public class FanoutQueueWrong {
    private static final String QUEUE = "newuser";
    private static final String EXCHANGE = "newuser";
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping
    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
    }
    //声明FanoutExchange,然后绑定到队列,FanoutExchange绑定队列的时候不需要routingKey
    @Bean
    public Declarables declarables() {
        Queue queue = new Queue(QUEUE);
        FanoutExchange exchange = new FanoutExchange(EXCHANGE);
        return new Declarables(queue, exchange,
                BindingBuilder.bind(queue).to(exchange));
    }
    //会员服务实例1
    @RabbitListener(queues = QUEUE)
    public void memberService1(String userName) {
        log.info("memberService1: welcome message sent to new user {}", userName);

    }
    //会员服务实例2
    @RabbitListener(queues = QUEUE)
    public void memberService2(String userName) {
        log.info("memberService2: welcome message sent to new user {}", userName);

    }
    //营销服务实例1
    @RabbitListener(queues = QUEUE)
    public void promotionService1(String userName) {
        log.info("promotionService1: gift sent to new user {}", userName);
    }
    //营销服务实例2
    @RabbitListener(queues = QUEUE)
    public void promotionService2(String userName) {
        log.info("promotionService2: gift sent to new user {}", userName);
    }
}

请求四次sendMessage注册四个用户。日志发现一条用户注册的消息,要么被会员服务收到,要么被营销服务收到,这不是广播。可使用的明明是FanoutExchange,为什么没起效呢?

image.png

因为广播交换器会忽略routingKey,广播消息到所有绑定的队列。该案例的俩会员服务和两个营销服务都绑定了同一队列,所以四服务只能收到一次消息:

image.png

修复

拆分队列,会员和营销两组服务分别使用一条独立队列绑定到广播交换器

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutright")
public class FanoutQueueRight {
    private static final String MEMBER_QUEUE = "newusermember";
    private static final String PROMOTION_QUEUE = "newuserpromotion";
    private static final String EXCHANGE = "newuser";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping
    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
    }
    @Bean
    public Declarables declarables() {
        //会员服务队列
        Queue memberQueue = new Queue(MEMBER_QUEUE);
        //营销服务队列
        Queue promotionQueue = new Queue(PROMOTION_QUEUE);
        //广播交换器
        FanoutExchange exchange = new FanoutExchange(EXCHANGE);
        //两个队列绑定到同一个交换器
        return new Declarables(memberQueue, promotionQueue, exchange,
                BindingBuilder.bind(memberQueue).to(exchange),
                BindingBuilder.bind(promotionQueue).to(exchange));
    }
    @RabbitListener(queues = MEMBER_QUEUE)
    public void memberService1(String userName) {
        log.info("memberService1: welcome message sent to new user {}", userName);
    }
    @RabbitListener(queues = MEMBER_QUEUE)
    public void memberService2(String userName) {
        log.info("memberService2: welcome message sent to new user {}", userName);
    }
    @RabbitListener(queues = PROMOTION_QUEUE)
    public void promotionService1(String userName) {
        log.info("promotionService1: gift sent to new user {}", userName);
    }
    @RabbitListener(queues = PROMOTION_QUEUE)
    public void promotionService2(String userName) {
        log.info("promotionService2: gift sent to new user {}", userName);
    }
}

现在的交换器和队列结构

image.png

从日志输出可以验证,对每条MQ消息,会员服务和营销服务分别都会收到一次,一条消息广播到两个服务同时,在每一个服务的两个实例中通过轮询接收:

image.png

异步的消息路由模式一旦配置出错,轻则可能导致消息重复处理,重则可能导致重要的服务无法接收到消息,最终造成业务逻辑错误。

小结

微服务场景下不同服务多个实例监听消息的情况,一般不同服务需要同时收到相同的消息,而相同服务的多个实例只需要轮询接收消息。我们需要确认MQ的消息路由配置是否满足需求,以避免消息重复或漏发问题。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Zuul技术分享
ZUUL是Netflix开源的微服务网关,它可以和Eureka、Ribbon、Hystrix等组件配合使用,Zuul组件的核心是一系列的过滤器,这些过滤器可以完成以下功能: 动态路由:动态将请求路由到不同后端集群 压力测试:逐渐增加指向集群的流量,以了解性能 负载分配:为每一种负载类型分配对应容量,并弃用超出限定值的请求 静态响应处理:边缘位置进行响应,避免转发到内部集群 身份认证和安全: 识别每一个资源的验证要求,并拒绝那些不符的请求。Spring Cloud对Zuul进行了整合和增强。 Spring Cloud对Zuul进行了整合和增强
4 0
Hystrix-开源容错系统(下)
Hystrix-开源容错系统(下)
5 0
白话微服务架构中的服务发现
如果你想跟朋友失去联系的最简单方法就是在不通知他们的情况下更改您的电话号码。同样适用于微服务架构系统中的服务。两个服务可能会愉快地相互通信,直到其中一个服务移动到另一个IP地址,而不通知对方,导致服务不可用。
4 0
进程间通信和线程间通信的几种方式
进程间通信和线程间通信的几种方式
4 0
SpringBoot 实现发送邮件
发送邮件是很多公司必须的业务场景之一。常见的比如预警邮件、入职邮件、报告邮件等等。我们公司就有很多场景需要使用到邮件功能,今天就一起来学习如何使用springboot实现邮件业务场景,掌握邮件业务类的核心逻辑。
5 0
SpringCloud:服务发现之如何搭建Nacos服务
Nacos 是阿里巴巴推出来的一个新开源项目,是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
6 0
Java 最常见的 208 道面试题(十六)
Java 最常见的 208 道面试题
5 0
+关注
javaedge
关注公众号:JavaEdge,后台回复面试,领取更多大厂求职资源。曾在百度、携程、华为等大厂搬砖,专注Java生态各种中间件原理、框架源码、微服务、中台等架构设计及落地实战,只生产硬核干货!
2303
文章
1
问答
文章排行榜
最热
最新
相关电子书
更多
OceanBase 入门到实战教程
立即下载
阿里云图数据库GDB,加速开启“图智”未来.ppt
立即下载
实时数仓Hologres技术实战一本通2.0版(下)
立即下载