过滤器的设计
好了基本的调用链路大概是如同上边的描述给梳理出来了。接下来就是一些扩展功能模块了。
发送过程中需要做一些装饰包装,以及过滤的相关功能。此时就可以采用责任链的方式进行设计。
过滤器部分我大概分了两种类型,一种是消费者使用的过滤器,一种是服务提供者专属的过滤器。
过滤器部分的设计主要是用了责任链的模式实现,这块比较简单,不打算做过多介绍了。
延时任务的设计
在微服务调用的中间件中,延时任务是一种经常会使用到的设计,例如在超时重试,定时心跳发送,注册中心发布失败重试等场景下。其核心的共同点都是在当前时间戳过后的指定时间点执行某个任务。这类设计我看了下JDK内部的Timer和DelayedQueue设计的原理。
常规的JDK 的 java.util.Timer 和 DelayedQueue 等工具类,可实现简单的定时任务,底层用的是堆数据结构,存取复杂度都是 O(nlog(n)),无法支撑海量定时任务。
而在定时任务量大、性能要求高的场景,为将任务存取及取消操作时间复杂度降为 O(1),会使用时间轮方案。
在自己实现RPC框架中,尝试使用了时间轮的机制来实现心跳包发送部分。
什么是时间轮
一种高效批量管理定时任务的调度模型。时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务。指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。
Dubbo 的时间轮实现位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包中,如果感兴趣的朋友可以深入阅读下内部的源代码设计与实现。
注册中心的引入
为了能够保证服务发布之后及时通知到各个服务的调用方,注册中心的设计必不可少。除此之外,注册中心的角色还能够较好地协调各个微服务调用之间的一些配置参数,例如权重,分组,版本隔离等等属性。
在自己进行实现落地的过程中,我选择了zookeeper作为默认的注册中心。为了方便后期的扩展,也是参考了Dubbo内部关于注册中心的实现思路,通过一个Registry的接口抽象,随机扩展了一些模版类等等。大概的设计如下图所示:
整体的服务注册接口代码如下:
public interface RegistryService { /** * 注册url * * 将dubbo服务写入注册中心节点 * 当出现网络抖动的时候需要进行适当的重试做法 * 注册服务url的时候需要写入持久化文件中 * * @param url */ void register(URL url); /** * 服务下线 * * 持久化节点是无法进行服务下线操作的 * 下线的服务必须保证url是完整匹配的 * 移除持久化文件中的一些内容信息 * * @param url */ void unRegister(URL url); /** * 消费方订阅服务 * * @param urlStr * @param providerServiceName */ void subscribe(String urlStr,String providerServiceName); /** * 更新节点属性之后通知这里 * * @param url */ void doSubscribeAfterUpdate(URL url); /** * 新增节点之后通知这里 * * @param url */ void doSubscribeAfterAdd(URL url); /** * 执行取消订阅内部的逻辑 * * @param url */ void doUnSubscribe(URL url); } 复制代码
为了预防注册中心挂了之后,服务无法进行通信,每个通信节点都会将zk的服务注册节点信息提前预先持久化到本地进行暂存一份数据,从而保证一个服务的可用性。
负载均衡策略的实现
在集群进行调用的时候,不可避免会有负载均衡的问题,这块的设计逻辑我参考了Dubbo的设计思路将其通过spi加载组件的方式进行框架的注入。
统一抽取了一个叫做LoadBalance的接口,然后底层实现了具体的负载均衡策略:
public class WeightLoadBalance implements LoadBalance { public static Map<String, URL[]> randomWeightMap = new ConcurrentHashMap<>(); public static Map<String, Integer> lastIndexVisitMap = new ConcurrentHashMap<>(); @Override public void doSelect(Invocation invocation) { URL[] weightArr = randomWeightMap.get(invocation.getServiceName()); if (weightArr == null) { List<URL> urls = invocation.getUrls(); Integer totalWeight = 0; for (URL url : urls) { //weight如果设置地过大,容易造成内存占用过高情况发生,所以weight统一限制最大大小应该为100 Integer weight = Integer.valueOf(url.getParameters().get("weight")); totalWeight += weight; } weightArr = new URL[totalWeight]; RandomList<URL> randomList = new RandomList(totalWeight); for (URL url : urls) { int weight = Integer.parseInt(url.getParameters().get("weight")); for (int i = 0; i < weight; i++) { randomList.randomAdd(url); } } int len = randomList.getRandomList().size(); for (int i = 0; i < len; i++) { URL url = randomList.getRandomList().get(i); weightArr[i] = url; } randomWeightMap.put(invocation.getServiceName(), weightArr); } Integer lastIndex = lastIndexVisitMap.get(invocation.getServiceName()); if (lastIndex == null) { lastIndex = 0; } if (lastIndex >= weightArr.length) { lastIndex = 0; } URL referUrl = weightArr[lastIndex]; lastIndex++; lastIndexVisitMap.put(invocation.getServiceName(), lastIndex); invocation.setReferUrl(referUrl); } } 复制代码
这里面的负载均衡实现手段并不是实时计算的思路,而是提前随机算好一组调用顺序,然后每次请求的时候按照这个已经具备随机性的数组进行挨个轮训发送服务调用。
这样可以避免每次请求过来都需要进行机器实时筛选计算的性能开销。
SPI扩展机制的设计
其实Spi的加载实现部分的关键就是将一份配置文件按照规定格式写好,然后通过某个loader对象将配置文件内部的每个类都提前加载到一份Map中进行管理。
下边我给出一份自己手写的简单案例,但是不包含自适应spi加载和spi内部自动依赖注入的功能。
public class ExtensionLoader { /** * 存储扩展spi的map,key是spi文件里面写入的key */ private static Map<String, Class<?>> extensionClassMap = new ConcurrentHashMap<>(); private static final String EXTENSION_LOADER_DIR_PREFIX = "META-INF/ietty/"; public static Map<String, Class<?>> getExtensionClassMap(){ return extensionClassMap; } public void loadDirectory(Class clazz) throws IOException { synchronized (ExtensionLoader.class){ String fileName = EXTENSION_LOADER_DIR_PREFIX + clazz.getName(); ClassLoader classLoader = this.getClass().getClassLoader(); Enumeration<URL> enumeration = classLoader.getResources(fileName); while (enumeration.hasMoreElements()) { URL url = enumeration.nextElement(); InputStreamReader inputStreamReader = new InputStreamReader(url.openStream(), "utf-8"); BufferedReader bufferedReader = new BufferedReader(inputStreamReader); String line; while ((line = bufferedReader.readLine()) != null) { if(line.startsWith("#")){ continue; } String[] keyClassInstance = line.split("="); try { extensionClassMap.put(keyClassInstance[0],Class.forName(keyClassInstance[1],true,classLoader)); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } } } public static <T>Object initClassInstance(String className) { if(extensionClassMap!=null && extensionClassMap.size()>0){ try { return (T)extensionClassMap.get(className).newInstance(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } return null; } } 复制代码
底层通信组件
整套RPC框架的底层部分是采用了Netty组件进行实现的,主要的写法其实和通用的netty编程没有太大的差别,这里我简单贴出下代码截图吧:
客户端:
服务端:
小结
可能整篇文章写下来,很多的技术细节点和实现方式因为篇幅问题不能很好的展示出来。但是整体设计的几个大难点以及难点的解决思路都基本贴出来了,希望能够对你有一定的启发。
整个基础中间件写下来之后感觉头发掉了不少,因为底层的细节点实在是太多了,不管是结构设计,数据并发问题,异步处理设计等诸多都需要考虑,所以感觉这是一件非常具有综合挑战性的事情。