面试官:使用 RocketMQ 怎么进行灰度发布?

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
性能测试 PTS,5000VUM额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 今天来聊一聊 RocketMQ 的灰度方案。灰度发布是指在黑与白之间,平滑过渡的一种发布方式。在大流量的系统中,如果一次升级改造范围比较大,或者影响内容不太确定,一般会采用切量的方式进行升级,这样可以减少生产变更带来的影响。

今天来聊一聊 RocketMQ 的灰度方案。

灰度发布是指在黑与白之间,平滑过渡的一种发布方式。在大流量的系统中,如果一次升级改造范围比较大,或者影响内容不太确定,一般会采用切量的方式进行升级,这样可以减少生产变更带来的影响。

如上图,对 ServiceA 这个服务进行升级,采用灰度发布,先升级 Server5,一周后如果没有问题,升级 Server4 和 Server 3,再运行一周没有问题,把剩下两个节点都升级。

上面的案例是一个 RPC 的调用。但如果使用消息队列该怎么做呢?使用消息队列,并不能使用网关来进行流量转发。这里需要分不同场景进行分析。

1 只升级消费者

这是最简单的情况,比如只有消费者修改了消费逻辑,就是 RPC 调用的情况类似,我们只要把消费者进行灰度发布就可以。如下图:

2 生产者也升级

下面是一个订单的实体类,我们新加了一个属性,订单生成时间

public class Order {
    private Long id;
    private Long userId;
    private Long productId;
    private Integer count;
    private BigDecimal payAmount;
    /**订单状态:0:创建中;1:已完结*/
    private Integer status;
    /**新加属性,订单生成时间*/
    private String createTime;
}

消费端的改造是需要对 createTime 这个属性进行处理。

2.1 消费端过滤

在生产者的 Order 类中增加 createTime 属性,如果我们直接使用 createTime 属性来过滤,消费者并不能实现灰度,因为所有的消费者都可能会拉取到带有 createTime 属性的消息。

RocketMQ 中 Message 的定义如下:

public class Message implements Serializable {
    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;
}

可以在 properties 属性中增加一个灰度标识,比如生产者发送消息的时候封装如下:

Message msg = buildMessage(topic);
msg.putUserProperty("gray", "true");

注意:也可以在 SendMessageHook 这个钩子函数中定义。通过这种方式可以在消费端新增加一个灰度 Consumer Group,用来对灰度消息则进行消费。如下图:

对于灰度 Consumer Group 判断到 gray 属性是 true 时进行消费,而对于普通 Consumer Group,判断到 gray 属性不等于 true 时再进行消费。这里可以借助 RocketMQ 客户端的 FilterMessageHook,代码如下:

defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
 @Override
 public String hookName() {
  return "filterHook";
 }
 @Override
 public void filterMessage(FilterMessageContext context) {
  List<MessageExt> messages = context.getMsgList();
                context.setMsgList(messages.stream().filter(m -> StringUtils.equals(m.getProperty("gray"),"true"))
                        .collect(Collectors.toList()));
 }
});

不过这样会有两个问题,灰度和正常的两个 Consumer Group 相当于是广播组

  1. 两个组都要对所有的消息进行拉取,比如本来使用灰度发布计划切 10% 的流量,但实际上全部流量都切过去了,只是根据属性做了判断。这让消费端整体承担了两倍的压力;
  2. 因为两个消费者组都要去 Broker 拉取消息,Broker 的压力也增加了一倍。

2.2 Broker 过滤

2.2.1 使用 tag 过滤

如果一个 Consumer 不订阅一个 Topic 中的全部消息,可以通过 Tag 来过滤。比如一个 Consumer 订阅了 TopicA 这个 Topic 中的 Tag1 和 Tag2 这两个 tag,那这个 Consumer 的订阅关系如下图:

SubscriptionData 这个对象封装了 Topic、tag 以及所订阅 tag 的 hashcode 集合。

Consumer 发送拉取消息请求时,会把订阅关系传给 Broker(Broker 解析成 SubscriptionData 对象),Broker 使用 consumequeue 获取消息时,首先判断判断最后 8 个字节的 tag hashcode 是否在 SubscriptionData 的 codeSet 中,如果不在就跳过,如果存在把消息返回给 Consumer。如下图:

这样可以在灰度 Producer 发送消息时加上 Tag,如下代码:

Message msg = new Message();
msg.setBody("Test");
msg.setTopic("Topic");
msg.setTags("Gray");

而在灰度消费者订阅 Gray 这个 tag。这样就避免了 2.1 节中消息全量拉取的问题。

2.2.2 使用 SQL92 过滤

使用 SQL92 过滤,可以应对更加复杂的场景,不仅可以过滤 Tag,还可以过滤 UserProperty。

比如下面是一个生产者的代码:

Message msg = new Message();
msg.setTopic("testTopic");
msg.setTags("tag1");
msg.putUserProperty("gray","true");

这样消费者初始化的时候,可以定义使用 SQL92 过滤,代码如下:

consumer.subscribe("testTopic",
            MessageSelector.bySql("(TAGS is not null and TAGS in TAGS='''''tag1''''')" +
                "and (gray is not null gray='true')"));

下面是 bySql 的源代码:

public static MessageSelector bySql(String sql) {
 return new MessageSelector(ExpressionType.SQL92, sql);
}

3 总结

本文介绍了 RocketMQ 灰度消息的使用方法,场景比较简单。对于全链路的复杂灰度场景,可以参考使用阿里的微服引擎 MSE。

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 负载均衡
RocketMQ 面试题及答案整理,最新面试题
RocketMQ 面试题及答案整理,最新面试题
504 4
|
8月前
|
消息中间件 存储 监控
|
8月前
|
消息中间件 缓存 NoSQL
RabbitMQ 总结面试
RabbitMQ 总结面试
62 0
|
8月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
258 2
|
5天前
|
消息中间件 存储 Java
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
45岁资深架构师尼恩在其读者群中分享了关于如何提升RocketMQ顺序消费性能的高并发面试题解析。面对10W QPS的高并发场景,尼恩详细讲解了RocketMQ的调优策略,包括专用方案如增加ConsumeQueue数量、优化Topic设计等,以及通用方案如硬件配置(CPU、内存、磁盘、网络)、操作系统调优、Broker配置调整、客户端配置优化、JVM调优和监控与日志分析等方面。通过系统化的梳理,帮助读者在面试中充分展示技术实力,获得面试官的认可。相关真题及答案将收录于《尼恩Java面试宝典PDF》V175版本中,助力求职者提高架构、设计和开发水平。
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
|
13天前
|
消息中间件 运维 Java
招行面试:RocketMQ、Kafka、RabbitMQ,如何选型?
45岁资深架构师尼恩针对一线互联网企业面试题,特别是招商银行的高阶Java后端面试题,进行了系统化梳理。本文重点讲解如何根据应用场景选择合适的消息中间件(如RabbitMQ、RocketMQ和Kafka),并对比三者的性能、功能、可靠性和运维复杂度,帮助求职者在面试中充分展示技术实力,实现“offer直提”。此外,尼恩还提供了《尼恩Java面试宝典PDF》等资源,助力求职者提升架构、设计、开发水平,应对高并发、分布式系统的挑战。更多内容及技术圣经系列PDF,请关注【技术自由圈】获取。
|
2月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
3月前
|
消息中间件 存储 canal
阿里面试:canal+MQ,会有乱序的问题吗?
本文详细探讨了在阿里面试中常见的问题——“canal+MQ,会有乱序的问题吗?”以及如何保证RocketMQ消息有序。文章首先介绍了消息有序的基本概念,包括全局有序和局部有序,并分析了RocketMQ中实现消息有序的方法。接着,针对canal+MQ的场景,讨论了如何通过配置`canal.mq.partitionsNum`和`canal.mq.partitionHash`来保证数据同步的有序性。最后,提供了多个与MQ相关的面试题及解决方案,帮助读者更好地准备面试,提升技术水平。
阿里面试:canal+MQ,会有乱序的问题吗?
|
5月前
|
消息中间件 缓存 负载均衡
复盘女朋友面试4个月的RocketMQ面试题
这篇文章复盘了面试中关于RocketMQ的高频题目,包括架构组成、使用姿势、功能原理及高级特性,并强调了理解这些实现机制对于面试成功的重要性。
复盘女朋友面试4个月的RocketMQ面试题
|
6月前
|
消息中间件 Kafka API
面试题Kafka问题之RabbitMQ的扩展和二次开发如何解决
面试题Kafka问题之RabbitMQ的扩展和二次开发如何解决
52 1

相关产品

  • 云消息队列 MQ