【kafka】kafka的服务复用与隔离设计方案

简介: 【kafka】kafka的服务复用与隔离设计方案

之前在写 多版本并行开发测试解决方案 的时候 占了个坑,今天来补上;


这篇文章主要讲一下 kafka的服务复用与隔离;

主要解决的问题是,在多个迭代环境下; 让消息的提供者和消费者都能正确的发出和消费;

这个比dubbo的服务路由与隔离更复杂一点


1.问题描述

概念说明:

稳定版本: ABC 属于全局共用的一套稳定服务;

迭代版本: A1 C1 C2 属于他们对应系统的迭代版本, 比如针对A系统进行需求改动,部署一套新的迭代服务A1;


要求: mq提供者服务提供出去的消息尽量让 相同版本的消费者进行消费;


1.1. 入口是稳定服务


image.png

image.png

上图, 假设入口是 稳定服务A ,发出消息; 那么消息链路中互相消费的就是 ABC ;跟迭代版本没啥事

1.2.入口是迭代服务

image.png

上图,假设入口是 迭代服务A1 发出消息; 则整个链路中尽量让相同迭代版本的服务去消费;


A1发消息

A1发了消息; 找B系统发现只有稳定的B,没有迭代版本,那么就让B消费;

A1发了消息;C也是有订阅的,然后发现C系统有迭代C1,跟A1版本相同,则让C1消费; C和C2都不消费;


B发消息

B消费了A1过来的消息后也发出了消息; A系统有消费,那么这个时候B发出的消息应该让A1消费而不是A;

同理, 也应该是C1消费而不是C或者C2


C1发消息

C1发消息 让A1消费;

C1发消息 让B消费;


1.3.dubbo服务传入迭代版本

image.png

上图D1调用了B的dubbo接口并且传递了版本号; B此时发出消息也是属于迭代消息; 跟2一样;


2.解决方案

我们在之前的文章中有讲解如何 在dubbo中实现这样的功能; 通过spi给dubbo重新根据version来进行路由;


但是在kafka中,并没有这消费者路由这么一回事,那么也就无法控制哪个服务去消费这条消息;


那么下面,我给出自己的一些解决方案,如果觉得有问题,欢迎批评指正;


设计方案:

image.png

方案关键步骤:


消息发送的时候,在Header上加上Version信息

发送消息 将消息发2条出去,消息体相同,但是Topic不同; 迭代消息的Topic加上前缀 VERSION:对应的版本_

迭代服务启动的时候用javaagent修改所有监听的Topic; 加上前缀 VERSION:对应的版本_

迭代服务消费对应的迭代消息

稳定服务 是否需要消费消息 需要判断

当前消息Header不携带Version 则直接消费

当前消息Header携带Version,再判断是否有对应的迭代服务存在;有则不消费,无则直接消费

消费消息时,需要把Version保存到 ThreadLocal中; 以便进行链路流转

使用ThreadLocal的时候,在线程池的情况下,值传递会有问题. 解决方案 用javaagent 方式使用TransmittableThreadLocal

全程代码0侵入;kafka的两个拦截器的和配置 都通过Javaagent来就行增强

如何判断迭代服务是否存在


上面的设计方案中,在kafka consumner 拦截器 判断是否需要消费的时候 写了两种方式


1. 方式一:

获取当前消息的消费组currentGroupId = KafkaUtils.getConsumerGroupId()

获取所有消费组adminClient.listConsumerGroups()

然后再所有消费组中查找有没有 VERSION:1_currentGroupId 的消费组;

如果有,则说明该消息会被迭代服务进行消费. 稳定环境就不用消费了;

当前还有一部不可少,就是如何让迭代服务的 所有消费组名都加上前缀

当然还是通过javaagent 去增强咯, 找到合适修改点,修改掉消费组名;合适的修改点自然是配置消费消费组名的地方; 有统一的消费组名; 每个Listener也可以配置单独的消费组名;找到Listener注解就行增强;

缺点: 这种方式有一个缺点就是 如果迭代服务刚好宕机了那么 消息就会问稳定服务消费了;


2.方式二(推荐)

读取一个外部配置,这个配置维护了哪个服务是有迭代服务的;

这样就很方便了;

缺点: 就是需要维护这么一个配置

优点: 规避了方式一的缺点; 也不需要用javaagent去修改消费组名称;


3.需要注意的问题

我们在传递version的时候,入口一般都是http接口;

但是如果入口不是http,是系统内部呢,那这样外面的版本信息就传不进来了;


说一个在出行行业 的情景

A: 是叫单服务

B: 是派单服务

C: 是订单/司机服务


在一个需求中, A C都有改动; B没有改动; 就有迭代服务A1 C1;

假设他们使用MQ交流的;我们期望的是下面流转

A1 ---->B----->C1


但是A1告诉了B有订单进来了, B会把A1给的信息存到redis中; B有一个线程一直在不停从redis中捞取数据进行和司机的匹配;匹配成功了之后 再发消息出去 匹配成功了;

B的这条链路就断了; B存redis之后,就没有下一步操作了, ThreadLocal中的version也就没有了; B的匹配线程获取到的是 稳定版本;自然匹配成功发出去的消息就是 稳定消息;那么接收到的不是C1 而是 C了;


如何解决这类型的问题;


这种情况就应该将B也弄一个迭代版本B1;那么流转路径就是

A1-B1-C1 ;这样就是正确的了;


还要注意: DB隔离;


相关文章
|
2月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
62 1
|
5月前
|
消息中间件 Kafka Shell
Linux【脚本 02】shell脚本离线安装配置Zookeeper及Kafka并添加service服务和开机启动(脚本分析)
Linux【脚本 02】shell脚本离线安装配置Zookeeper及Kafka并添加service服务和开机启动(脚本分析)
49 0
|
7月前
|
消息中间件 分布式计算 网络协议
服务搭建篇(六) 搭建基于Kafka + Zookeeper的集群
用来解决分布式集群中应用系统的一致性问题。Zookeeper 的设计目标是将那些复杂且容 易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的 接口提供给用户使用。
109 0
|
消息中间件 Kafka
面试官问:如何判断一个服务是正常的(例如kafka)
面试官问:如何判断一个服务是正常的(例如kafka)
|
消息中间件 运维 Dubbo
【kafka】kafka的服务复用与隔离设计方案
【kafka】kafka的服务复用与隔离设计方案
【kafka】kafka的服务复用与隔离设计方案
|
消息中间件 运维 算法
【kafka思考】最小成本的扩缩容副本设计方案
在这篇文章开始前,你需要先了解 【kafka源码】kafka分区副本的分配规则 从【kafka源码】kafka分区副本的分配规则 中我们已经知道了,如何分区副本是如何进行分配的 那么当我们想要批量进行副本扩缩的时候, 如果按照之前 --generate的重新计算分配方式来做的话, 那么这个数据迁移量是非常大的; 很有可能大部分的副本都有变动(牵一发而动全身) 那么我们有没有什么方式能够尽量减少这种变动吗, 根据这个目标,我们本篇文章就好好思考一下设计方案
【kafka思考】最小成本的扩缩容副本设计方案
|
消息中间件 Dubbo NoSQL
【kafka】kafka的服务复用与隔离设计方案
这篇文章主要讲一下 kafka的服务复用与隔离; 主要解决的问题是,在多个迭代环境下; 让消息的提供者和消费者都能正确的发出和消费; 这个比dubbo的服务路由与隔离更复杂一点
【kafka】kafka的服务复用与隔离设计方案
|
3月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
482 2
2024年了,如何更好的搭建Kafka集群?
|
4月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
49 0
|
7月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
266 0

热门文章

最新文章