在很久之前的文章多版本并行开发测试解决方案 中挖了个坑
今天来给填上; 今天主要讲解实现方案;
主要思路
给不同版本的dubbo服务打上 标签version上
在dubbo 提供和消费的出入口上 带上 标签version
服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本
是不是很简单,就是打个标签,然后路由的时候找相同服务嘛
简单代码
打标签
写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application; 放在哪里自己决定能读取到就行;
/** * @author shirenchuang * Registry 的包装类 * 修改URL 中的Application * @date 2019/12/5 8:35 下午 */ public class DevVersionRegisterWrapper implements Registry { private static final Logger logger = LoggerFactory.getLogger("devVersion"); private Registry registry; /** * 注入Register * @param registry */ public DevVersionRegisterWrapper(Registry registry) { this.registry = registry; } @Override public URL getUrl() { return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl()); } @Override public boolean isAvailable() { return registry.isAvailable(); } @Override public void destroy() { registry.destroy(); } @Override public void register(URL url) { registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url))); } @Override public void unregister(URL url) { registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url))); } @Override public void subscribe(URL url, NotifyListener listener) { registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener); } @Override public void unsubscribe(URL url, NotifyListener listener) { registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener); } @Override public List<URL> lookup(URL url) { return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url))); } }
写一个RegistryFactory
的包装类
/** * @author shirenchuang * RegistryFactory 的包装类,在注册的时候 修改一下 Application * 如果是 迭代环境则把Appliacation=Application_迭代版本号 * @date 2019/12/5 8:29 下午 */ public class DevVersionRegisterFactoryWrapper implements RegistryFactory { private static final Logger logger = LoggerFactory.getLogger("devVersion"); private RegistryFactory registryFactory; /** * 注入RegisterFactory */ public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) { this.registryFactory = registryFactory; } @Override public Registry getRegistry(URL url) { //获取当前环境的迭代版本号 if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){ logger.info("=====启动的服务是迭代版本服务 devVersion:{}=====",MyThreadLocal.localVersion); System.out.println("====启动的服务是迭代版本服务 devVersion:"+MyThreadLocal.localVersion); return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url))); } logger.info("=====启动的服务是稳定版本===="); System.out.println("=====启动的服务是稳定版本===="); return registryFactory.getRegistry(url); } public static URL changeApplication(URL url){ if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){ String applicationKey = url.getParameter(Constants.APPLICATION_KEY)+MyThreadLocal.spiltString+MyThreadLocal.localVersion; URL url2 = url.addParameter(Constants.APPLICATION_KEY, applicationKey); logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey); return url2; } return url; } }
服务路由
Invoker 包装类
/** * @author shirenchuang * 2019/12/10 * 集群扩展包装器 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker} */ public class DevVersionClusterInvoker<T> implements Invoker<T> { private static final Logger logger = LoggerFactory.getLogger("devVersion"); private final Directory<T> directory; private final Invoker<T> invoker; public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) { this.directory = directory; this.invoker = invoker; } @Override public URL getUrl() { return directory.getUrl(); } @Override public boolean isAvailable() { return directory.isAvailable(); } @Override public void destroy() { this.invoker.destroy(); } @Override public Class<T> getInterface() { return directory.getInterface(); } @Override public Result invoke(Invocation invocation) throws RpcException { // 找到迭代版本号 return doDevVersionInvoke(invocation, null); } @SuppressWarnings({"unchecked", "rawtypes"}) private Result doDevVersionInvoke(Invocation invocation, RpcException e) { Result result ; Invoker<T> minvoker; List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation); if (devVersionInvokers==null||devVersionInvokers.size()==0) { logger.error("没有找到服务啊~~~~ "); throw new RpcException("没有找到服务啊~~~~"); } else { minvoker = devVersionInvokers.get(0); } try { result = minvoker.invoke(invocation); } catch (RpcException me) { if (me.isBiz()) { result = new RpcResult(me.getCause()); } else { throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause()); } } catch (Throwable me) { throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause()); } return result; } private String getDevVersionExceptionMessage(Throwable t, Throwable mt) { String msg = "devVersion error : " + mt.getMessage(); if (t != null) { msg = msg + ", invoke error is :" + StringUtils.toString(t); } return msg; } /** * 获取对应迭代版本服务 * @param invocation * @return */ private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) { List<Invoker<T>> invokers = null; if (invocation instanceof RpcInvocation) { try { /**其实我们也可以给directory生生一个代理类,来做帅选操作**/ invokers = directory.list(invocation); //经过了dubbo的栓选之后,我们来找自己需要的Invokes String devVersion = MyThreadLocal.getDevVersion(); List<Invoker<T>> newInvokers = new ArrayList<>(); List<Invoker<T>> stableInvokers = new ArrayList<>(); for (Invoker invoker : invokers){ URL providerUrl ; //获取应用名称 Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl"); getProviderUrl.setAccessible(true); providerUrl = (URL)getProviderUrl.invoke(invoker); String application = providerUrl.getParameter(Constants.APPLICATION_KEY); if(StringUtils.isEmpty(devVersion)){ if(application.indexOf(MyThreadLocal.spiltString)==-1){ //不是迭代过来或者本身不是迭代的请求 只能访问非迭代版本 newInvokers.add(invoker); } }else { //是迭代的请求 就需要找对应的迭代服务 if(application.indexOf(MyThreadLocal.spiltString)!=-1){ String version = application.substring(application.indexOf(MyThreadLocal.spiltString)+5); if(version.equals(devVersion)){ newInvokers.add(invoker); } } } //找到稳定环境 if(application.indexOf(MyThreadLocal.spiltString)==-1){ stableInvokers.add(invoker); } } if(newInvokers==null||newInvokers.size()==0){ String serviceName = directory.getInterface().getName(); if(StringUtils.isEmpty(devVersion)){ String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>"+serviceName+" 的稳定版本!!"; logger.error(error); throw new RuntimeException(error); }else { // 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境 ) if(stableInvokers!=null&&stableInvokers.size()>0){ StringBuffer sb = new StringBuffer(); sb.append("=======当前cap请求的版本为:").append(devVersion) .append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName) .append("没有找到与之对应的迭代版本;将会调用稳定版本"); logger.info(sb.toString()); return stableInvokers; }else { //可能有其他的迭代服务,但是不调用 logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName); throw new RuntimeException("当前请求迭代版本:"+devVersion+",但是不存在迭代服务,也没有找到稳定服务;"+serviceName); } } }else { return newInvokers; } } catch (RpcException e) { logger.error("获取 迭代版本 的服务时 发生错误~~:"+ directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName() , e); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } return invokers; } @Override public String toString() { return "invoker :" + this.invoker + ",directory: " + this.directory; } public static void main(String[] args) { String application = "application"+MyThreadLocal.spiltString+"1.0.1"; boolean b = application.indexOf(MyThreadLocal.spiltString)==-1; application = application.substring(application.indexOf(MyThreadLocal.spiltString)+5); System.out.print(application); } }
/** * @author shirenchuang * 2019/12/10 * 集群扩展包装器 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker} */ public class DevVersionClusterWrapper implements Cluster { private Cluster cluster; public DevVersionClusterWrapper(Cluster cluster) { this.cluster = cluster; } @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { //如果自己是迭代环境,则使用包装 return new DevVersionClusterInvoker<T>(directory, this.cluster.join(directory)); } }
拦截器 带入 标签Version
消费者拦截器
/** * @Description 消费别人服务的时候会走到这里 * 要把 迭代版本号 放到参数里面传到 服务提供者 * @Author shirenchuang * @Date 2019/12/1 10:20 PM **/ @Activate(group = {Constants.CONSUMER}) public class DevVersionConsumerFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger("devVersion"); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String traceId = TraceUtil.getTraceId(); RpcContext.getContext().setAttachment("myTraceId",traceId); String toDevVersion = MyThreadLocal.getDevVersion(); RpcContext.getContext().setAttachment("devVersion",toDevVersion); doLog(invoker,invocation,traceId); return invoker.invoke(invocation); } private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){ String interfaceName = invoker.getInterface().getCanonicalName(); String method = invocation.getMethodName(); String methodFullName = interfaceName + "." + method; StringBuffer sb = new StringBuffer(); sb.append("==TraceId:").append(traceId) .append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion) .append("; 接收传递版本:").append(MyThreadLocal.getFromVersion()) .append("; 往后传递版本:").append(MyThreadLocal.getDevVersion()) .append(" ;调用服务=> ").append(methodFullName); logger.info(sb.toString()); } }
提供者拦截器
/** * @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号 * 然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号 * @Author shirenchuang * @Date 2019/12/1 10:20 PM **/ @Activate(group = {Constants.PROVIDER}) public class DevVersionProviderFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger("devVersion"); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String fromTraceId = RpcContext.getContext().getAttachment("myTraceId"); TraceUtil.traceLocal.set(fromTraceId); String myTraceId = TraceUtil.getTraceId(); String fromDevVersion = RpcContext.getContext().getAttachment("devVersion"); //放入到本地线程存放 MyThreadLocal.devVersion.set(fromDevVersion); doLog(invoker,invocation,myTraceId); return invoker.invoke(invocation); } private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){ String interfaceName = invoker.getInterface().getCanonicalName(); String method = invocation.getMethodName(); String methodFullName = interfaceName + "." + method; StringBuffer sb = new StringBuffer(); sb.append("==TraceId:").append(traceId) .append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion) .append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion")) .append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion")) .append(" ;将被调用服务=> ").append(methodFullName); logger.info(sb.toString()); } }
标签的存取
在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;
public class MyThreadLocal { private static final Logger logger = LoggerFactory.getLogger("devVersion"); public static ThreadLocal<String> devVersion = new TransmittableThreadLocal(); //public static ThreadLocal<String> devVersion = new ThreadLocal(); /**用户Application评价的固定字符;**/ public static String spiltString = "_dev_"; public static String localVersion ; static { localVersion = System.getProperty("localVersion"); logger.info("====devVersion:{} ========",devVersion); System.out.println("s====devVersion: ========"+devVersion); } public static String getFromVersion(){ return devVersion.get(); } /** * 如果本地变量没有 则可能是第一个发起方; * 则去当前服务的版本号,然后一直传递下去; * @return */ public static String getDevVersion(){ String fromVersion = getFromVersion(); if(!StringUtils.isEmpty(fromVersion)){ return fromVersion; } return localVersion; } }
dubbo spi的配置
将上面的DevVersionRegisterFactoryWrapper DevVersionClusterWrapper DevVersionProviderFilter DevVersionConsumerFilter配置一下使其生效
重点问题说明
上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖
参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用
ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;