Dubbo下的多版本并行开发测试解决方案(服务路由)

简介: Dubbo下的多版本并行开发测试解决方案(服务路由)

在很久之前的文章多版本并行开发测试解决方案 中挖了个坑

今天来给填上; 今天主要讲解实现方案;


主要思路

给不同版本的dubbo服务打上 标签version上

在dubbo 提供和消费的出入口上 带上 标签version

服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本

是不是很简单,就是打个标签,然后路由的时候找相同服务嘛


简单代码

打标签

写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application; 放在哪里自己决定能读取到就行;

/**
 * @author shirenchuang
 * Registry 的包装类
 * 修改URL 中的Application
 * @date 2019/12/5  8:35 下午
 */
public class DevVersionRegisterWrapper implements Registry {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private Registry registry;
    /**
     * 注入Register
     * @param registry
     */
    public DevVersionRegisterWrapper(Registry registry) {
        this.registry = registry;
    }
    @Override
    public URL getUrl() {
        return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl());
    }
    @Override
    public boolean isAvailable() {
        return registry.isAvailable();
    }
    @Override
    public void destroy() {
        registry.destroy();
    }
    @Override
    public void register(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
    @Override
    public void unregister(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }
    @Override
    public List<URL> lookup(URL url) {
        return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
}

写一个RegistryFactory的包装类

/**
 * @author shirenchuang
 * RegistryFactory 的包装类,在注册的时候 修改一下 Application
 * 如果是 迭代环境则把Appliacation=Application_迭代版本号
 * @date 2019/12/5  8:29 下午
 */
public class DevVersionRegisterFactoryWrapper implements RegistryFactory {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private RegistryFactory registryFactory;
    /**
     * 注入RegisterFactory
     */
    public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }
    @Override
    public Registry getRegistry(URL url) {
        //获取当前环境的迭代版本号
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            logger.info("=====启动的服务是迭代版本服务  devVersion:{}=====",MyThreadLocal.localVersion);
            System.out.println("====启动的服务是迭代版本服务  devVersion:"+MyThreadLocal.localVersion);
            return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url)));
        }
        logger.info("=====启动的服务是稳定版本====");
        System.out.println("=====启动的服务是稳定版本====");
        return registryFactory.getRegistry(url);
    }
    public static URL changeApplication(URL url){
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            String applicationKey = url.getParameter(Constants.APPLICATION_KEY)+MyThreadLocal.spiltString+MyThreadLocal.localVersion;
            URL url2 = url.addParameter(Constants.APPLICATION_KEY,
                    applicationKey);
            logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey);
            return url2;
        }
        return url;
    }
}

服务路由

Invoker 包装类

/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterInvoker<T> implements Invoker<T> {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private final Directory<T> directory;
    private final Invoker<T> invoker;
    public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
        this.directory = directory;
        this.invoker = invoker;
    }
    @Override
    public URL getUrl() {
        return directory.getUrl();
    }
    @Override
    public boolean isAvailable() {
        return directory.isAvailable();
    }
    @Override
    public void destroy() {
        this.invoker.destroy();
    }
    @Override
    public Class<T> getInterface() {
        return directory.getInterface();
    }
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 找到迭代版本号
        return doDevVersionInvoke(invocation, null);
    }
    @SuppressWarnings({"unchecked", "rawtypes"})
    private Result doDevVersionInvoke(Invocation invocation, RpcException e) {
        Result result ;
        Invoker<T> minvoker;
        List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation);
        if (devVersionInvokers==null||devVersionInvokers.size()==0) {
            logger.error("没有找到服务啊~~~~ ");
            throw new RpcException("没有找到服务啊~~~~");
        } else {
            minvoker = devVersionInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause());
        }
        return result;
    }
    private String getDevVersionExceptionMessage(Throwable t, Throwable mt) {
        String msg = "devVersion error : " + mt.getMessage();
        if (t != null) {
            msg = msg + ", invoke error is :" + StringUtils.toString(t);
        }
        return msg;
    }
    /**
     * 获取对应迭代版本服务
     * @param invocation
     * @return
     */
    private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        if (invocation instanceof RpcInvocation) {
            try {
                /**其实我们也可以给directory生生一个代理类,来做帅选操作**/
                invokers = directory.list(invocation);
                //经过了dubbo的栓选之后,我们来找自己需要的Invokes
                String devVersion = MyThreadLocal.getDevVersion();
                List<Invoker<T>> newInvokers = new ArrayList<>();
                List<Invoker<T>> stableInvokers = new ArrayList<>();
                for (Invoker invoker : invokers){
                    URL providerUrl ;
                    //获取应用名称
                    Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl");
                    getProviderUrl.setAccessible(true);
                    providerUrl = (URL)getProviderUrl.invoke(invoker);
                    String application  = providerUrl.getParameter(Constants.APPLICATION_KEY);
                    if(StringUtils.isEmpty(devVersion)){
                        if(application.indexOf(MyThreadLocal.spiltString)==-1){
                            //不是迭代过来或者本身不是迭代的请求    只能访问非迭代版本
                            newInvokers.add(invoker);
                        }
                    }else {
                        //是迭代的请求  就需要找对应的迭代服务
                        if(application.indexOf(MyThreadLocal.spiltString)!=-1){
                            String version = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
                            if(version.equals(devVersion)){
                                newInvokers.add(invoker);
                            }
                        }
                    }
                    //找到稳定环境
                    if(application.indexOf(MyThreadLocal.spiltString)==-1){
                        stableInvokers.add(invoker);
                    }
                }
                if(newInvokers==null||newInvokers.size()==0){
                    String serviceName = directory.getInterface().getName();
                    if(StringUtils.isEmpty(devVersion)){
                        String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>"+serviceName+" 的稳定版本!!";
                        logger.error(error);
                        throw new RuntimeException(error);
                    }else {
                        // 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境  )
                        if(stableInvokers!=null&&stableInvokers.size()>0){
                            StringBuffer sb = new StringBuffer();
                            sb.append("=======当前cap请求的版本为:").append(devVersion)
                                    .append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName)
                                    .append("没有找到与之对应的迭代版本;将会调用稳定版本");
                            logger.info(sb.toString());
                            return stableInvokers;
                        }else {
                            //可能有其他的迭代服务,但是不调用
                            logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName);
                            throw new RuntimeException("当前请求迭代版本:"+devVersion+",但是不存在迭代服务,也没有找到稳定服务;"+serviceName);
                        }
                    }
                }else {
                    return newInvokers;
                }
            } catch (RpcException e) {
                logger.error("获取 迭代版本 的服务时 发生错误~~:"+ directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        , e);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }
        return invokers;
    }
    @Override
    public String toString() {
        return "invoker :" + this.invoker + ",directory: " + this.directory;
    }
    public static void main(String[] args) {
        String application = "application"+MyThreadLocal.spiltString+"1.0.1";
        boolean b = application.indexOf(MyThreadLocal.spiltString)==-1;
        application = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
        System.out.print(application);
    }
}
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterWrapper implements Cluster {
    private Cluster cluster;
    public DevVersionClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }
    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //如果自己是迭代环境,则使用包装
        return new DevVersionClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
}

拦截器 带入 标签Version

消费者拦截器

/**
 * @Description 消费别人服务的时候会走到这里
 *              要把 迭代版本号 放到参数里面传到 服务提供者
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.CONSUMER})
public class DevVersionConsumerFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String traceId = TraceUtil.getTraceId();
        RpcContext.getContext().setAttachment("myTraceId",traceId);
        String toDevVersion = MyThreadLocal.getDevVersion();
        RpcContext.getContext().setAttachment("devVersion",toDevVersion);
        doLog(invoker,invocation,traceId);
        return invoker.invoke(invocation);
    }
    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(MyThreadLocal.getFromVersion())
                .append("; 往后传递版本:").append(MyThreadLocal.getDevVersion())
        .append(" ;调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

提供者拦截器

/**
 * @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号
 *              然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.PROVIDER})
public class DevVersionProviderFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String fromTraceId = RpcContext.getContext().getAttachment("myTraceId");
        TraceUtil.traceLocal.set(fromTraceId);
        String myTraceId = TraceUtil.getTraceId();
        String fromDevVersion = RpcContext.getContext().getAttachment("devVersion");
        //放入到本地线程存放
        MyThreadLocal.devVersion.set(fromDevVersion);
        doLog(invoker,invocation,myTraceId);
        return invoker.invoke(invocation);
    }
    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append(" ;将被调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

标签的存取

在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;

public class MyThreadLocal {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    public static ThreadLocal<String> devVersion = new TransmittableThreadLocal();
    //public static ThreadLocal<String> devVersion = new ThreadLocal();
    /**用户Application评价的固定字符;**/
    public static String spiltString = "_dev_";
    public static String localVersion  ;
    static {
        localVersion = System.getProperty("localVersion");
        logger.info("====devVersion:{}   ========",devVersion);
        System.out.println("s====devVersion:   ========"+devVersion);
    }
    public static String getFromVersion(){
        return devVersion.get();
    }
    /**
     * 如果本地变量没有  则可能是第一个发起方;
     * 则去当前服务的版本号,然后一直传递下去;
     * @return
     */
    public static String getDevVersion(){
        String fromVersion = getFromVersion();
        if(!StringUtils.isEmpty(fromVersion)){
            return fromVersion;
        }
        return localVersion;
    }
}

dubbo spi的配置

将上面的DevVersionRegisterFactoryWrapper DevVersionClusterWrapper DevVersionProviderFilter DevVersionConsumerFilter配置一下使其生效


重点问题说明

上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖

参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用

ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;


目录
打赏
0
0
0
0
1004
分享
相关文章
OS-Copilot-ubuntu镜像版本的具体测试使用(安装方式有单独注明)
作为一名个人开发者,我主要负责云资源的运维和管理。在使用OS Copilot的过程中,我遇到了一些配置问题,特别是在ECS实例中设置AccessKey时,但最终成功解决了。通过使用OS Copilot的-t/-f/管道功能,我大大提升了效率,减少了命令编写的工作量,特别是在搭建Java运行环境时效果显著。此外,| 功能帮助我快速理解文档,整体体验非常流畅,推荐给其他开发者使用。
52 6
Apache Dubbo 正式发布 HTTP/3 版本 RPC 协议,弱网效率提升 6 倍
在 Apache Dubbo 3.3.0 版本之后,官方推出了全新升级的 Triple X 协议,全面支持 HTTP/1、HTTP/2 和 HTTP/3 协议。本文将围绕 Triple 协议对 HTTP/3 的支持进行详细阐述,包括其设计目标、实际应用案例、性能测试结果以及源码架构分析等内容。
303 10
如何在测试环境中保持操作系统、浏览器版本和服务器配置的稳定性和一致性?
如何在测试环境中保持操作系统、浏览器版本和服务器配置的稳定性和一致性?
深入探讨了“dubbo+nacos+springboot3的native打包成功后运行出现异常”的原因及解决方案
本文深入探讨了“dubbo+nacos+springboot3的native打包成功后运行出现异常”的原因及解决方案。通过检查GraalVM版本兼容性、配置反射列表、使用代理类、检查配置文件、禁用不支持的功能、查看日志文件、使用GraalVM诊断工具和调整GraalVM配置等步骤,帮助开发者快速定位并解决问题,确保服务的正常运行。
102 1
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
113 2
前端大模型应用笔记(二):最新llama3.2小参数版本1B的古董机测试 - 支持128K上下文,表现优异,和移动端更配
llama3.1支持128K上下文,6万字+输入,适用于多种场景。模型能力超出预期,但处理中文时需加中英翻译。测试显示,其英文支持较好,中文则需改进。llama3.2 1B参数量小,适合移动端和资源受限环境,可在阿里云2vCPU和4G ECS上运行。
257 1
目标检测实战(五): 使用YOLOv5-7.0版本对图像进行目标检测完整版(从自定义数据集到测试验证的完整流程)
本文详细介绍了使用YOLOv5-7.0版本进行目标检测的完整流程,包括算法介绍、环境搭建、数据集准备、模型训练、验证、测试以及评价指标。YOLOv5以其高精度、快速度和模型小尺寸在计算机视觉领域受到广泛应用。
1949 0
目标检测实战(五): 使用YOLOv5-7.0版本对图像进行目标检测完整版(从自定义数据集到测试验证的完整流程)
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
392 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
5月前
|
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
这篇文章是关于如何下载和部署Dubbo管理控制台(dubbo-admin)的教程,并分析了2.6.1版本及以后版本的变化。
230 0
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化

热门文章

最新文章

  • 1
    小鱼深度评测 | 通义灵码2.0,不仅可跨语言编码,自动生成单元测试,更炸裂的是集成DeepSeek模型且免费使用,太炸裂了。
    141
  • 2
    Star 4w+,Apache Dubbo 3.3 全新发布,Triple X 领衔,开启微服务通信新时代
    10
  • 3
    3天功能开发→3小时:通义灵码2.0+DEEPSEEK实测报告,单元测试生成准确率92%的秘密
    34
  • 4
    Potpie.ai:比Copilot更狠!这个AI直接接管项目代码,自动Debug+测试+开发全搞定
    14
  • 5
    【01】噩梦终结flutter配安卓android鸿蒙harmonyOS 以及next调试环境配鸿蒙和ios真机调试环境-flutter项目安卓环境配置-gradle-agp-ndkVersion模拟器运行真机测试环境-本地环境搭建-如何快速搭建android本地运行环境-优雅草卓伊凡-很多人在这步就被难倒了
    17
  • 6
    基于FPGA的图像双线性插值算法verilog实现,包括tb测试文件和MATLAB辅助验证
    6
  • 7
    大前端之前端开发接口测试工具postman的使用方法-简单get接口请求测试的使用方法-简单教学一看就会-以实际例子来说明-优雅草卓伊凡
    5
  • 8
    「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
    13
  • 9
    用户说 | 通义灵码2.0,跨语言编码+自动生成单元测试+集成DeepSeek模型且免费使用
    16
  • 10
    阿里云零门槛、轻松部署您的专属 DeepSeek模型体验测试
    23
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等