Dubbo下的多版本并行开发测试解决方案(服务路由)

简介: Dubbo下的多版本并行开发测试解决方案(服务路由)

在很久之前的文章多版本并行开发测试解决方案 中挖了个坑

今天来给填上; 今天主要讲解实现方案;


主要思路

给不同版本的dubbo服务打上 标签version上

在dubbo 提供和消费的出入口上 带上 标签version

服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本

是不是很简单,就是打个标签,然后路由的时候找相同服务嘛


简单代码

打标签

写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application; 放在哪里自己决定能读取到就行;

/**
 * @author shirenchuang
 * Registry 的包装类
 * 修改URL 中的Application
 * @date 2019/12/5  8:35 下午
 */
public class DevVersionRegisterWrapper implements Registry {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private Registry registry;
    /**
     * 注入Register
     * @param registry
     */
    public DevVersionRegisterWrapper(Registry registry) {
        this.registry = registry;
    }
    @Override
    public URL getUrl() {
        return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl());
    }
    @Override
    public boolean isAvailable() {
        return registry.isAvailable();
    }
    @Override
    public void destroy() {
        registry.destroy();
    }
    @Override
    public void register(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
    @Override
    public void unregister(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }
    @Override
    public List<URL> lookup(URL url) {
        return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
}

写一个RegistryFactory的包装类

/**
 * @author shirenchuang
 * RegistryFactory 的包装类,在注册的时候 修改一下 Application
 * 如果是 迭代环境则把Appliacation=Application_迭代版本号
 * @date 2019/12/5  8:29 下午
 */
public class DevVersionRegisterFactoryWrapper implements RegistryFactory {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private RegistryFactory registryFactory;
    /**
     * 注入RegisterFactory
     */
    public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }
    @Override
    public Registry getRegistry(URL url) {
        //获取当前环境的迭代版本号
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            logger.info("=====启动的服务是迭代版本服务  devVersion:{}=====",MyThreadLocal.localVersion);
            System.out.println("====启动的服务是迭代版本服务  devVersion:"+MyThreadLocal.localVersion);
            return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url)));
        }
        logger.info("=====启动的服务是稳定版本====");
        System.out.println("=====启动的服务是稳定版本====");
        return registryFactory.getRegistry(url);
    }
    public static URL changeApplication(URL url){
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            String applicationKey = url.getParameter(Constants.APPLICATION_KEY)+MyThreadLocal.spiltString+MyThreadLocal.localVersion;
            URL url2 = url.addParameter(Constants.APPLICATION_KEY,
                    applicationKey);
            logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey);
            return url2;
        }
        return url;
    }
}

服务路由

Invoker 包装类

/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterInvoker<T> implements Invoker<T> {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private final Directory<T> directory;
    private final Invoker<T> invoker;
    public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
        this.directory = directory;
        this.invoker = invoker;
    }
    @Override
    public URL getUrl() {
        return directory.getUrl();
    }
    @Override
    public boolean isAvailable() {
        return directory.isAvailable();
    }
    @Override
    public void destroy() {
        this.invoker.destroy();
    }
    @Override
    public Class<T> getInterface() {
        return directory.getInterface();
    }
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 找到迭代版本号
        return doDevVersionInvoke(invocation, null);
    }
    @SuppressWarnings({"unchecked", "rawtypes"})
    private Result doDevVersionInvoke(Invocation invocation, RpcException e) {
        Result result ;
        Invoker<T> minvoker;
        List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation);
        if (devVersionInvokers==null||devVersionInvokers.size()==0) {
            logger.error("没有找到服务啊~~~~ ");
            throw new RpcException("没有找到服务啊~~~~");
        } else {
            minvoker = devVersionInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause());
        }
        return result;
    }
    private String getDevVersionExceptionMessage(Throwable t, Throwable mt) {
        String msg = "devVersion error : " + mt.getMessage();
        if (t != null) {
            msg = msg + ", invoke error is :" + StringUtils.toString(t);
        }
        return msg;
    }
    /**
     * 获取对应迭代版本服务
     * @param invocation
     * @return
     */
    private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        if (invocation instanceof RpcInvocation) {
            try {
                /**其实我们也可以给directory生生一个代理类,来做帅选操作**/
                invokers = directory.list(invocation);
                //经过了dubbo的栓选之后,我们来找自己需要的Invokes
                String devVersion = MyThreadLocal.getDevVersion();
                List<Invoker<T>> newInvokers = new ArrayList<>();
                List<Invoker<T>> stableInvokers = new ArrayList<>();
                for (Invoker invoker : invokers){
                    URL providerUrl ;
                    //获取应用名称
                    Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl");
                    getProviderUrl.setAccessible(true);
                    providerUrl = (URL)getProviderUrl.invoke(invoker);
                    String application  = providerUrl.getParameter(Constants.APPLICATION_KEY);
                    if(StringUtils.isEmpty(devVersion)){
                        if(application.indexOf(MyThreadLocal.spiltString)==-1){
                            //不是迭代过来或者本身不是迭代的请求    只能访问非迭代版本
                            newInvokers.add(invoker);
                        }
                    }else {
                        //是迭代的请求  就需要找对应的迭代服务
                        if(application.indexOf(MyThreadLocal.spiltString)!=-1){
                            String version = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
                            if(version.equals(devVersion)){
                                newInvokers.add(invoker);
                            }
                        }
                    }
                    //找到稳定环境
                    if(application.indexOf(MyThreadLocal.spiltString)==-1){
                        stableInvokers.add(invoker);
                    }
                }
                if(newInvokers==null||newInvokers.size()==0){
                    String serviceName = directory.getInterface().getName();
                    if(StringUtils.isEmpty(devVersion)){
                        String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>"+serviceName+" 的稳定版本!!";
                        logger.error(error);
                        throw new RuntimeException(error);
                    }else {
                        // 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境  )
                        if(stableInvokers!=null&&stableInvokers.size()>0){
                            StringBuffer sb = new StringBuffer();
                            sb.append("=======当前cap请求的版本为:").append(devVersion)
                                    .append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName)
                                    .append("没有找到与之对应的迭代版本;将会调用稳定版本");
                            logger.info(sb.toString());
                            return stableInvokers;
                        }else {
                            //可能有其他的迭代服务,但是不调用
                            logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName);
                            throw new RuntimeException("当前请求迭代版本:"+devVersion+",但是不存在迭代服务,也没有找到稳定服务;"+serviceName);
                        }
                    }
                }else {
                    return newInvokers;
                }
            } catch (RpcException e) {
                logger.error("获取 迭代版本 的服务时 发生错误~~:"+ directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        , e);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }
        return invokers;
    }
    @Override
    public String toString() {
        return "invoker :" + this.invoker + ",directory: " + this.directory;
    }
    public static void main(String[] args) {
        String application = "application"+MyThreadLocal.spiltString+"1.0.1";
        boolean b = application.indexOf(MyThreadLocal.spiltString)==-1;
        application = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
        System.out.print(application);
    }
}
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterWrapper implements Cluster {
    private Cluster cluster;
    public DevVersionClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }
    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //如果自己是迭代环境,则使用包装
        return new DevVersionClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
}

拦截器 带入 标签Version

消费者拦截器

/**
 * @Description 消费别人服务的时候会走到这里
 *              要把 迭代版本号 放到参数里面传到 服务提供者
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.CONSUMER})
public class DevVersionConsumerFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String traceId = TraceUtil.getTraceId();
        RpcContext.getContext().setAttachment("myTraceId",traceId);
        String toDevVersion = MyThreadLocal.getDevVersion();
        RpcContext.getContext().setAttachment("devVersion",toDevVersion);
        doLog(invoker,invocation,traceId);
        return invoker.invoke(invocation);
    }
    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(MyThreadLocal.getFromVersion())
                .append("; 往后传递版本:").append(MyThreadLocal.getDevVersion())
        .append(" ;调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

提供者拦截器

/**
 * @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号
 *              然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.PROVIDER})
public class DevVersionProviderFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String fromTraceId = RpcContext.getContext().getAttachment("myTraceId");
        TraceUtil.traceLocal.set(fromTraceId);
        String myTraceId = TraceUtil.getTraceId();
        String fromDevVersion = RpcContext.getContext().getAttachment("devVersion");
        //放入到本地线程存放
        MyThreadLocal.devVersion.set(fromDevVersion);
        doLog(invoker,invocation,myTraceId);
        return invoker.invoke(invocation);
    }
    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append(" ;将被调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

标签的存取

在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;

public class MyThreadLocal {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    public static ThreadLocal<String> devVersion = new TransmittableThreadLocal();
    //public static ThreadLocal<String> devVersion = new ThreadLocal();
    /**用户Application评价的固定字符;**/
    public static String spiltString = "_dev_";
    public static String localVersion  ;
    static {
        localVersion = System.getProperty("localVersion");
        logger.info("====devVersion:{}   ========",devVersion);
        System.out.println("s====devVersion:   ========"+devVersion);
    }
    public static String getFromVersion(){
        return devVersion.get();
    }
    /**
     * 如果本地变量没有  则可能是第一个发起方;
     * 则去当前服务的版本号,然后一直传递下去;
     * @return
     */
    public static String getDevVersion(){
        String fromVersion = getFromVersion();
        if(!StringUtils.isEmpty(fromVersion)){
            return fromVersion;
        }
        return localVersion;
    }
}

dubbo spi的配置

将上面的DevVersionRegisterFactoryWrapper DevVersionClusterWrapper DevVersionProviderFilter DevVersionConsumerFilter配置一下使其生效


重点问题说明

上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖

参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用

ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;


目录
相关文章
|
机器学习/深度学习 人工智能 并行计算
AI部署架构:A100、H100、A800、H800、H20的差异以及如何选型?开发、测试、生产环境如何进行AI大模型部署架构?
AI部署架构:A100、H100、A800、H800、H20的差异以及如何选型?开发、测试、生产环境如何进行AI大模型部署架构?
AI部署架构:A100、H100、A800、H800、H20的差异以及如何选型?开发、测试、生产环境如何进行AI大模型部署架构?
|
9月前
|
存储 测试技术 API
数据驱动开发软件测试脚本
今天刚提交了我的新作《带着ChatGPT玩转软件开发》给出版社,在写作期间跟着ChatGPT学到许多新知识。下面分享数据驱动开发软件测试脚本。
377 0
|
传感器 人工智能 JavaScript
鸿蒙开发:DevEcoTesting中的稳定性测试
DevEcoTesting主要的目的也是用于软件的测试,可以让开发者无需复杂的配置,即可一键执行测试任务,同时提供了测试报告和分析,无论是对于开发者还是测试同学来说,都是一个非常方便的工具。
353 3
鸿蒙开发:DevEcoTesting中的稳定性测试
|
11月前
|
敏捷开发 运维 数据可视化
DevOps看板工具中的协作功能:如何打破开发、测试与运维之间的沟通壁垒
在DevOps实践中,看板工具通过可视化任务管理和自动化流程,提升开发与运维团队的协作效率。它支持敏捷开发、持续交付,助力团队高效应对需求变化,实现跨职能协作与流程优化。
|
11月前
|
运维 jenkins 测试技术
"还在苦等开发部署环境?3步教你用Jenkins拿回测试主动权"
测试工程师最头疼的问题是什么?依赖开发部署环境! 开发延期→测试时间被压缩→紧急上线后BUG频出→测试背锅。传统流程中,测试被动等待部署,效率低下。而Jenkins自动化部署让测试人员自主搭建环境,实现: ✅ 随时触发测试,不再苦等开发 ✅ 部署效率提升10倍,抢回测试时间 ✅ 改善团队协作,减少互相甩锅 学习Jenkins部署能力,成为高效测试工程师,告别被动等待!
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
431 2
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
316 0
|
Dubbo 应用服务中间件 Apache
Star 4w+,Apache Dubbo 3.3 全新发布,Triple X 领衔,开启微服务通信新时代
在 Apache Dubbo 突破 4w Star 之际,Apache Dubbo 团队正式宣布,Dubbo 3.3 正式发布!作为全球领先的开源微服务框架,Dubbo 一直致力于为开发者提供高性能、可扩展且灵活的分布式服务解决方案。此次发布的 Dubbo 3.3,通过 Triple X 的全新升级,突破了以往局限,实现了对南北向与东西向流量的全面支持,并提升了对云原生架构的友好性。
550 115
|
负载均衡 Dubbo 应用服务中间件
框架巨擘:Dubbo如何一统异构微服务江湖,成为开发者的超级武器!
【8月更文挑战第8天】在软件开发中,微服务架构因灵活性和可扩展性备受欢迎。面对异构微服务的挑战,Apache Dubbo作为高性能Java RPC框架脱颖而出。它具备服务注册与发现、负载均衡及容错机制等核心特性,支持多种通信协议和序列化方式,能有效连接不同技术栈的微服务。Dubbo的插件化设计保证了面向未来的扩展性,使其成为构建稳定高效分布式系统的理想选择。
412 80

热门文章

最新文章