Dubbo下的多版本并行开发测试解决方案(服务路由)

简介: Dubbo下的多版本并行开发测试解决方案(服务路由)

在很久之前的文章多版本并行开发测试解决方案 中挖了个坑

今天来给填上; 今天主要讲解实现方案;


主要思路

给不同版本的dubbo服务打上 标签version上

在dubbo 提供和消费的出入口上 带上 标签version

服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本

是不是很简单,就是打个标签,然后路由的时候找相同服务嘛


简单代码

打标签

写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application; 放在哪里自己决定能读取到就行;

/**
 * @author shirenchuang
 * Registry 的包装类
 * 修改URL 中的Application
 * @date 2019/12/5  8:35 下午
 */
public class DevVersionRegisterWrapper implements Registry {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private Registry registry;
    /**
     * 注入Register
     * @param registry
     */
    public DevVersionRegisterWrapper(Registry registry) {
        this.registry = registry;
    }
    @Override
    public URL getUrl() {
        return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl());
    }
    @Override
    public boolean isAvailable() {
        return registry.isAvailable();
    }
    @Override
    public void destroy() {
        registry.destroy();
    }
    @Override
    public void register(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
    @Override
    public void unregister(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }
    @Override
    public List<URL> lookup(URL url) {
        return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
}

写一个RegistryFactory的包装类

/**
 * @author shirenchuang
 * RegistryFactory 的包装类,在注册的时候 修改一下 Application
 * 如果是 迭代环境则把Appliacation=Application_迭代版本号
 * @date 2019/12/5  8:29 下午
 */
public class DevVersionRegisterFactoryWrapper implements RegistryFactory {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private RegistryFactory registryFactory;
    /**
     * 注入RegisterFactory
     */
    public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }
    @Override
    public Registry getRegistry(URL url) {
        //获取当前环境的迭代版本号
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            logger.info("=====启动的服务是迭代版本服务  devVersion:{}=====",MyThreadLocal.localVersion);
            System.out.println("====启动的服务是迭代版本服务  devVersion:"+MyThreadLocal.localVersion);
            return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url)));
        }
        logger.info("=====启动的服务是稳定版本====");
        System.out.println("=====启动的服务是稳定版本====");
        return registryFactory.getRegistry(url);
    }
    public static URL changeApplication(URL url){
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            String applicationKey = url.getParameter(Constants.APPLICATION_KEY)+MyThreadLocal.spiltString+MyThreadLocal.localVersion;
            URL url2 = url.addParameter(Constants.APPLICATION_KEY,
                    applicationKey);
            logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey);
            return url2;
        }
        return url;
    }
}

服务路由

Invoker 包装类

/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterInvoker<T> implements Invoker<T> {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    private final Directory<T> directory;
    private final Invoker<T> invoker;
    public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
        this.directory = directory;
        this.invoker = invoker;
    }
    @Override
    public URL getUrl() {
        return directory.getUrl();
    }
    @Override
    public boolean isAvailable() {
        return directory.isAvailable();
    }
    @Override
    public void destroy() {
        this.invoker.destroy();
    }
    @Override
    public Class<T> getInterface() {
        return directory.getInterface();
    }
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 找到迭代版本号
        return doDevVersionInvoke(invocation, null);
    }
    @SuppressWarnings({"unchecked", "rawtypes"})
    private Result doDevVersionInvoke(Invocation invocation, RpcException e) {
        Result result ;
        Invoker<T> minvoker;
        List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation);
        if (devVersionInvokers==null||devVersionInvokers.size()==0) {
            logger.error("没有找到服务啊~~~~ ");
            throw new RpcException("没有找到服务啊~~~~");
        } else {
            minvoker = devVersionInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause());
        }
        return result;
    }
    private String getDevVersionExceptionMessage(Throwable t, Throwable mt) {
        String msg = "devVersion error : " + mt.getMessage();
        if (t != null) {
            msg = msg + ", invoke error is :" + StringUtils.toString(t);
        }
        return msg;
    }
    /**
     * 获取对应迭代版本服务
     * @param invocation
     * @return
     */
    private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        if (invocation instanceof RpcInvocation) {
            try {
                /**其实我们也可以给directory生生一个代理类,来做帅选操作**/
                invokers = directory.list(invocation);
                //经过了dubbo的栓选之后,我们来找自己需要的Invokes
                String devVersion = MyThreadLocal.getDevVersion();
                List<Invoker<T>> newInvokers = new ArrayList<>();
                List<Invoker<T>> stableInvokers = new ArrayList<>();
                for (Invoker invoker : invokers){
                    URL providerUrl ;
                    //获取应用名称
                    Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl");
                    getProviderUrl.setAccessible(true);
                    providerUrl = (URL)getProviderUrl.invoke(invoker);
                    String application  = providerUrl.getParameter(Constants.APPLICATION_KEY);
                    if(StringUtils.isEmpty(devVersion)){
                        if(application.indexOf(MyThreadLocal.spiltString)==-1){
                            //不是迭代过来或者本身不是迭代的请求    只能访问非迭代版本
                            newInvokers.add(invoker);
                        }
                    }else {
                        //是迭代的请求  就需要找对应的迭代服务
                        if(application.indexOf(MyThreadLocal.spiltString)!=-1){
                            String version = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
                            if(version.equals(devVersion)){
                                newInvokers.add(invoker);
                            }
                        }
                    }
                    //找到稳定环境
                    if(application.indexOf(MyThreadLocal.spiltString)==-1){
                        stableInvokers.add(invoker);
                    }
                }
                if(newInvokers==null||newInvokers.size()==0){
                    String serviceName = directory.getInterface().getName();
                    if(StringUtils.isEmpty(devVersion)){
                        String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>"+serviceName+" 的稳定版本!!";
                        logger.error(error);
                        throw new RuntimeException(error);
                    }else {
                        // 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境  )
                        if(stableInvokers!=null&&stableInvokers.size()>0){
                            StringBuffer sb = new StringBuffer();
                            sb.append("=======当前cap请求的版本为:").append(devVersion)
                                    .append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName)
                                    .append("没有找到与之对应的迭代版本;将会调用稳定版本");
                            logger.info(sb.toString());
                            return stableInvokers;
                        }else {
                            //可能有其他的迭代服务,但是不调用
                            logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName);
                            throw new RuntimeException("当前请求迭代版本:"+devVersion+",但是不存在迭代服务,也没有找到稳定服务;"+serviceName);
                        }
                    }
                }else {
                    return newInvokers;
                }
            } catch (RpcException e) {
                logger.error("获取 迭代版本 的服务时 发生错误~~:"+ directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        , e);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }
        return invokers;
    }
    @Override
    public String toString() {
        return "invoker :" + this.invoker + ",directory: " + this.directory;
    }
    public static void main(String[] args) {
        String application = "application"+MyThreadLocal.spiltString+"1.0.1";
        boolean b = application.indexOf(MyThreadLocal.spiltString)==-1;
        application = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
        System.out.print(application);
    }
}
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterWrapper implements Cluster {
    private Cluster cluster;
    public DevVersionClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }
    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //如果自己是迭代环境,则使用包装
        return new DevVersionClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
}

拦截器 带入 标签Version

消费者拦截器

/**
 * @Description 消费别人服务的时候会走到这里
 *              要把 迭代版本号 放到参数里面传到 服务提供者
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.CONSUMER})
public class DevVersionConsumerFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String traceId = TraceUtil.getTraceId();
        RpcContext.getContext().setAttachment("myTraceId",traceId);
        String toDevVersion = MyThreadLocal.getDevVersion();
        RpcContext.getContext().setAttachment("devVersion",toDevVersion);
        doLog(invoker,invocation,traceId);
        return invoker.invoke(invocation);
    }
    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(MyThreadLocal.getFromVersion())
                .append("; 往后传递版本:").append(MyThreadLocal.getDevVersion())
        .append(" ;调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

提供者拦截器

/**
 * @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号
 *              然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.PROVIDER})
public class DevVersionProviderFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String fromTraceId = RpcContext.getContext().getAttachment("myTraceId");
        TraceUtil.traceLocal.set(fromTraceId);
        String myTraceId = TraceUtil.getTraceId();
        String fromDevVersion = RpcContext.getContext().getAttachment("devVersion");
        //放入到本地线程存放
        MyThreadLocal.devVersion.set(fromDevVersion);
        doLog(invoker,invocation,myTraceId);
        return invoker.invoke(invocation);
    }
    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append(" ;将被调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

标签的存取

在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;

public class MyThreadLocal {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");
    public static ThreadLocal<String> devVersion = new TransmittableThreadLocal();
    //public static ThreadLocal<String> devVersion = new ThreadLocal();
    /**用户Application评价的固定字符;**/
    public static String spiltString = "_dev_";
    public static String localVersion  ;
    static {
        localVersion = System.getProperty("localVersion");
        logger.info("====devVersion:{}   ========",devVersion);
        System.out.println("s====devVersion:   ========"+devVersion);
    }
    public static String getFromVersion(){
        return devVersion.get();
    }
    /**
     * 如果本地变量没有  则可能是第一个发起方;
     * 则去当前服务的版本号,然后一直传递下去;
     * @return
     */
    public static String getDevVersion(){
        String fromVersion = getFromVersion();
        if(!StringUtils.isEmpty(fromVersion)){
            return fromVersion;
        }
        return localVersion;
    }
}

dubbo spi的配置

将上面的DevVersionRegisterFactoryWrapper DevVersionClusterWrapper DevVersionProviderFilter DevVersionConsumerFilter配置一下使其生效


重点问题说明

上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖

参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用

ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;


相关文章
|
2月前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
31 0
|
2天前
|
缓存 监控 前端开发
【Flutter前端技术开发专栏】Flutter应用的性能调优与测试
【4月更文挑战第30天】本文探讨了Flutter应用的性能调优策略和测试方法。性能调优对提升用户体验、降低能耗和增强稳定性至关重要。优化布局(避免复杂嵌套,使用`const`构造函数)、管理内存、优化动画、实现懒加载和按需加载,以及利用Flutter的性能工具(如DevTools)都是有效的调优手段。性能测试包括基准测试、性能分析、压力测试和电池效率测试。文中还以ListView为例,展示了如何实践这些优化技巧。持续的性能调优是提升Flutter应用质量的关键。
【Flutter前端技术开发专栏】Flutter应用的性能调优与测试
|
2天前
|
前端开发 测试技术 持续交付
【Flutter 前端技术开发专栏】Flutter 中的 UI 测试与自动化测试
【4月更文挑战第30天】本文探讨了 Flutter 应用中UI测试和自动化测试的重要性,包括保障质量、提高效率和增强开发信心。Flutter提供`flutter_test`库进行Widget测试,以及`flutter_driver`进行集成测试。UI测试涵盖界面布局、交互和状态变化的验证,最佳实践建议尽早引入测试、保持用例简洁,并结合手动测试。未来,随着Flutter技术发展,UI测试和自动化测试将更加完善,助力开发高质量应用。
【Flutter 前端技术开发专栏】Flutter 中的 UI 测试与自动化测试
|
2天前
|
测试技术 持续交付 Swift
【Swift开发专栏】Swift中的测试驱动开发(TDD)
【4月更文挑战第30天】Test-Driven Development (TDD) 是一种软件开发方法,强调先编写测试用例再写代码。通过测试驱动代码、简明设计、重构和持续集成的循环过程,TDD助力构建高质量Swift软件。在Swift中,使用XCTest框架进行TDD实践,包括编写测试用例、实现功能、运行测试和重构。TDD的优势在于提升代码质量、减少调试时间,且与持续集成相结合。然而,学习曲线和确保测试覆盖率是挑战。TDD不仅是技术实践,也是思维方式,随着Swift的发展,其应用将更广泛。
|
3天前
|
SQL DataWorks Java
DataWorks操作报错合集之在阿里云 DataWorks 中,代码在开发测试阶段能够成功运行,但在提交后失败并报错“不支持https”如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
16 1
DataWorks操作报错合集之在阿里云 DataWorks 中,代码在开发测试阶段能够成功运行,但在提交后失败并报错“不支持https”如何解决
|
4天前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo: 微服务通信的高效解决方案
【4月更文挑战第28天】在微服务架构的发展中,服务间的高效通信至关重要。Spring Cloud Dubbo 提供了一种基于 RPC 的通信方式,使得服务间的调用就像本地方法调用一样简单。本篇博客将探讨 Spring Cloud Dubbo 的核心概念,并通过具体实例展示其在项目中的实战应用。
13 2
|
5天前
|
测试技术 开发者
【专栏】测试驱动开发(TDD)与行为驱动开发(BDD)的比较与选择
【4月更文挑战第27天】本文探讨了测试驱动开发(TDD)和行为驱动开发(BDD)的核心概念与实践。TDD强调先写测试用例,通过测试推动设计,确保代码质量与可维护性。BDD侧重软件行为和业务价值,提倡使用通用语言描述行为,减少沟通障碍。选择TDD或BDD取决于项目复杂性、团队技能和业务需求。理解两者差异有助于团队做出合适的选择,发挥测试的最大价值。
|
7天前
|
JavaScript API
【vue】分环境构建(开发/测试/生产)配置
【vue】分环境构建(开发/测试/生产)配置
12 1
|
2月前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
59 0
|
2月前
|
Cloud Native Dubbo 应用服务中间件
【Dubbo3高级特性】「微服务云原生架构」带你从零基础认识搭建公司内部服务用户中心体系(实战指南-序章)
【Dubbo3高级特性】「微服务云原生架构」带你从零基础认识搭建公司内部服务用户中心体系(实战指南-序章)
60 0