首页> 标签> Dubbo
"Dubbo"
共 3018 条结果
全部 问答 文章 公开课 课程 电子书 技术圈 体验
dubbo-admin与多注册中心
dubbo-admin是否支持同时关联多个注册中心统一管理不同注册中心上注册的服务?答案是:不支持!这里指的多注册中心,并不是zookeeper集群,两者的差别看这里和这里。dubbo-admin提供了运维界面,辅助我们更好的管理和维护服务之间的依赖等信息。刚认识dubbo时确实被这么逆天的功能给震惊了(现在也是~)。那么该怎么办呢?目前想到的就只是为每一个注册中心部署一套admin管理后台。确实听起来有些土鳖,但应该是最省事儿的了吧~当然,也可以改造一下现有的dubbo-admin逻辑,只不过以现在我的道行还做不到啊~坐等高手!最后还要叮嘱的是,如果你的注册中心和服务之间的网络质量比较差,建议你配置一个较长的timeout时间,否则会出现:... nested exception is org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000 ... 这么写: <dubbo:registry protocol="zookeeper" address="192.168.76.138:2182,192.168.76.138:2181,192.168.76.138:2183" timeout="100000"/>
文章
运维  ·  Dubbo  ·  应用服务中间件
2023-02-07
dubbo中SPI的基础--Cooma微容器
之前把玩的时候自己甚至连一本完整的javaEE资料都没有看完过,虽然现在也还是入门级新手,不过在其他项目中的工作也让我有了比之前更多的积累,应该可以看得更透一些。从哪里开始呢?我个人认为应该从dubbo的SPI理念开始,由于dubbo的项目Leader一开始就把其定义成一个方便扩展的服务框架,所以在dubbo的架构设计中始终保持了良好的依赖扩展机制:微内核+插件。简而言之就是让扩展者可以和项目开发者拥有一样的灵活度,这也是dubbo得以迅速流行的一个必要条件。要想实现这种自由度,除了在架构分层组件上要保持高内聚低耦合外,底层也需要一套强大的类管理工具。在javaEE世界里,把这份工作做到极致的也已经有成熟的标准规范:OSGi。不过OSGi并不完全适配dubbo的需求,而且这玩意儿也有些过于重了,所以在此基础上,dubbo结合JDK标准的SPI机制设计出来一个轻量级的实现:Cooma。这篇文章,我就打算从Cooma说起,官方介绍的已经非常详细了,不过它在从dubbo独立出来发布之前是做过修改优化的,在dubbo项目中使用时可能会存在些许的不同,我们就从dubbo内部来研读这部分实现的代码,并结合dubbo中的上下文来了解一下dubbo是如何使用SPI的。我们把目标定位在dubbo的这个包上:com.alibaba.dubbo.common.extension看一下这个包的目录结构:com.alibaba.dubbo.common.extension | |--factory | |--AdaptiveExtensionFactory #稍后解释 | |--SpiExtensionFactory #稍后解释 | |--support | |--ActivateComparator | |--Activate #自动激活加载扩展的注解 |--Adaptive #自适应扩展点的注解 |--ExtensionFactory #扩展点对象生成工厂接口 |--ExtensionLoader #扩展点加载器,扩展点的查找,校验,加载等核心逻辑的实现类 |--SPI #扩展点注解 我们通过对照dubbo如何使用扩展点机制来完成扩展点工厂实例的选择与加载来了解一下扩展点实现的细节,这句话很拗口,有点递归的味道,我们不妨直接从代码中来理解:public class ExtensionLoader<T> { ... private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>(); private final Class<?> type; ... @SuppressWarnings("unchecked") public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) throw new IllegalArgumentException("Extension type == null"); if(!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } if(!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; } private ExtensionLoader(Class<?> type) { this.type = type; objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); } ... } 我们主要看ExtensionLoader构造方法,其中它初始化了type和objectFactory,前者为要作为扩展点的接口类型,后者表示要如何获取指定名称的扩展点实例(工厂类),目前dubbo提供了2个实现类,上面在包结构图上已经标注过了。@SuppressWarnings("unchecked") public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if(createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; } 给出的路径,缺少了查找扩展点实现的细节,也就是并没有展开getExtensionClasses方法,该方法会根据指定位置的配置文件扫描并解析拿到所有可用的扩展点实现,代码如下:private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; } 可见它也只是封装了一层缓存而已,我们继续深挖loadExtensionClasses方法:// 此方法已经getExtensionClasses方法同步过。 private Map<String, Class<?>> loadExtensionClasses() { //检查并获取该接口类型声明的默认扩展点实现 final SPI defaultAnnotation = type.getAnnotation(SPI.class); if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; } } //去三个指定的位置查找配置文件并解析拿到扩展点键值映射关系 Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>(); loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; } 我们先来看一下配置文件的格式:adaptive=com.alibaba.dubbo.common.extension.factory.AdaptiveExtensionFactory spi=com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory loadFile方法会从指定位置(META-INF/dubbo/internal/)根据指定接口类型(type)为文件名称查找目标配置文件,然后解析并校验,最终拿到匹配的扩展点类的所有Class实例。对应上面给出的配置文件,也就是AdaptiveExtensionFactory和SpiExtensionFactory,它们已经在包结构图上提到过了。现在我们来着重看一下这两个类,它们到底是做什么用的呢?首先,AdaptiveExtensionFactory定义上有@Adaptive注解标识,很明显,它就是自适应扩展点的实现,从loadFile方法中可以留意到:同一个接口类型只能存在一个自适应扩展点实现:@Adaptive public class AdaptiveExtensionFactory implements ExtensionFactory { private final List<ExtensionFactory> factories; public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); } //从这个方法定义来看,这个自适应扩展点实现类并没有做任何事儿,唯一的工作就是把真正获取扩展点实例的逻辑依次交给 //框架中声明的所有ExtensionFactory扩展点实例,默认也就是SpiExtensionFactory public <T> T getExtension(Class<T> type, String name) { for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; } } 可以看到,AdaptiveExtensionFactory把逻辑委托给SpiExtensionFactory来做,而后者又是怎么做的呢:public class SpiExtensionFactory implements ExtensionFactory { public <T> T getExtension(Class<T> type, String name) { if (type.isInterface() && type.isAnnotationPresent(SPI.class)) { ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type); if (loader.getSupportedExtensions().size() > 0) { //获取自适应扩展点实例,这是dubbo默认的行为, //也可以自己写一个ExtensionFactory来按照要求加载扩展点 return loader.getAdaptiveExtension(); } } return null; } } 而objectFactory(真实工作的也就是SpiExtensionFactory.getExtension)只是用在ExtendLoader的注入方法(injectExtension)中,该方法用于为选定的扩展点实现注入相关的其他扩展点实例。目前为止,我们已经大概了解在dubbo内部,是以什么样的规则来使用扩展点机制,也为以后学习dubbo的其它方面提供了基础。
文章
缓存  ·  Dubbo  ·  Java  ·  应用服务中间件  ·  开发者  ·  容器
2023-02-07
dubbo协议下的单一长连接与多线程并发如何协同工作
上班的路上突然就冒出了这么个问题:既然在dubbo中描述消费者和提供者之间采用的是单一长连接,那么如果消费者端是高并发多线程模型的web应用,单一长连接如何解决多线程并发请求问题呢?其实如果不太了解socket或者多线程编程的相关知识,不太容易理解这个问题。传统的最简单的RPC方式,应该是为每次远程调用请求创建一个对应的线程,我们先不说这种方式的缺点。至少优点很明显,就是简单。简单体现在哪儿?通信双方一对一(相比NIO来说)。通俗点来说,socket通信的双方发送和接受数据不会被其它(线程)干扰,这种干扰不同于数数据包的“粘包问题”。其实说白了就相当于电话线路的场景:试想一下如果多个人同时对着同一个话筒大喊,对方接受到的声音就会是重叠且杂乱的。对于单一的socket通道来说,如果发送方多线程的话,不加控制就会导致通道中的数据乱七八糟,接收端无法区分数据的单位,也就无法正确的处理请求。乍一看,似乎dubbo协议所说的单一长连接与客户端多线程并发请求之间,是水火不容的。但其实稍加设计,就可以让它们和谐相处。socket中的粘包问题是怎么解决的?用的最多的其实是定义一个定长的数据包头,其中包含了完整数据包的长度,以此来完成服务器端拆包工作。那么解决多线程使用单一长连接并发请求时包干扰的方法也有点雷同,就是给包头中添加一个标识id,服务器端响应请求时也要携带这个id,供客户端多线程领取对应的响应数据提供线索。其实如果不考虑性能的话,dubbo完全也可以为每个客户端线程创建一个对应的服务器端线程,但这是海量高并发场景所不能接受的~~那么脑补一张图:下面咱们试图从代码中找到痕迹。一路追踪,我们来到这个类:com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.java,先来看看其中的request方法,大概在第101行左右: public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //这个future就是前面我们提到的:客户端并发请求线程阻塞的对象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); //非阻塞调用 }catch (RemotingException e) { future.cancel(); throw e; } return future; } 注意这个方法返回的ResponseFuture对象,当前处理客户端请求的线程在经过一系列调用后,会拿到ResponseFuture对象,最终该线程会阻塞在这个对象的下面这个方法调用上,如下:public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { //无限连 done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } 上面我已经看到请求线程已经阻塞,那么又是如何被唤醒的呢?再看一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.java,其实所有实现了ChannelHandler接口的类都被设计为装饰器模式,所以你可以看到类似这样的代码: protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler( new HeartbeatHandler( ExtensionLoader.getExtensionLoader(Dispather.class).getAdaptiveExtension().dispath(handler, url) )); } 现在来仔细看一下HeaderExchangeHandler类的定义,先看一下它定义的received方法,下面是代码片段:public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { ..... } else if (message instanceof Response) { //这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点 handleResponse(channel, (Response) message); } else if (message instanceof String) { ..... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } 我们主要看中间的那个条件分支,它是用来处理响应消息的,也就是说当dubbo客户端接收到来自服务端的响应后会执行到这个分支,它简单的调用了handleResponse方法,我们追过去看看:static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { //排除心跳类型的响应 DefaultFuture.received(channel, response); } } 熟悉的身影:DefaultFuture,它是实现了我们上面说的ResponseFuture接口类型,实际上细心的童鞋应该可以看到,上面request方法中其实实例化的就是这个DefaultFutrue对象:DefaultFuture future = new DefaultFuture(channel, req, timeout); 那么我们可以继续来看一下DefaultFuture.received方法的实现细节:public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } 留一下我们之前提到的id的作用,这里可以看到它已经开始发挥作用了。通过id,DefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,它就是上面我们提到的,阻塞请求线程的那个对象。好,找到目标后,调用它的doReceived方法,这就是标准的java多线程编程知识了:private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } 这样我们就可以证实上图中左边的绿色箭头所标注的两点。
文章
Dubbo  ·  Java  ·  应用服务中间件
2023-02-07
dubbo的拦截器和监听器
今天要聊一个可能被其他dubbo源码研究的童鞋容易忽略的话题:Filter和Listener。我们先来看一下这两个概念的官方手册:拦截器监听器:引用监听器和暴露监听器老实说,依赖之前的源码分析经验,导致我饶了很大的弯路,一直找不到filter和listener被使用的位置。看过前几篇文章的朋友应该也有这个疑惑,为什么按照url参数去匹配框架的执行流程,死活找不到dubbo注入拦截器和监听器的位置呢?ReferenceConfig --> RegistryProtocol --> DubboProtocol --> invoker --> exporter 按照这个调用流程,没错啊,可每一个环节都没有使用filter和listener属性的痕迹,有点抓瞎了啊。要说用好IDE确实很重要啊,光靠脑子想真的很伤身,下面来看一下谜底。先来回忆一下dubbo的SPI机制,根据接口类型,dubbo会去读取并解析对应的配置文件,从中拿到对应的扩展点实现,好,我们先来看一下Protocol接口对应的配置文件:registry=com.alibaba.dubbo.registry.integration.RegistryProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper #注意这一行 listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper #注意这一行 mock=com.alibaba.dubbo.rpc.support.MockProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol 我们已经找到了filter和listener对应的扩展点了。接下来看一下它们是怎么一步一步的被注入到上面的流程里的。在ReferenceConfig类中我们会引用和暴露对应的服务,我们以服务引用为场景来分析:get() --> init() --> createProxy() | +---> invoker = refprotocol.refer(interfaceClass, urls.get(0)); 注意上面提到的这一行代码,这里的refprotocol是引用的Protocol$Adpative,这个类是dubbo的SPI机制动态创建的自适应扩展点,我们在之前的文章中已经介绍过,看一下它的refer方法细节:public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //注意这一行,根据url的协议名称选择对应的扩展点实现 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } 乍一看,并没有感觉有什么蹊跷,不过在单步调试中就会出现"诡异"现象(由于该类是动态创建的,所以该方法并不会被单步到,所以为分析带来了一定的干扰),我们得再往回倒一下,之前在dubbo中SPI的基础中曾经分析过ExtensionLoader的源码,但是当时由于了解的不够确实忽略了一些细节。我们再来看一下它的执行流程:getExtension() --> createExtension() | +--> ...... Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { //装饰器模式 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } ...... 一看到这行代码,就知道关键点在这里,这种写法刚好就是和常见的拦截器和监听器的实现方法吻合,而且事实证明也确实是在这个地方完成的注入,那么我们就需要看一下这个cachedWrapperClasses到到底存了什么?我们最后看一下ExtensionLoader.loadFile方法,它是负责解析我们开头提到的那个SPI扩展点配置文件的,它会依次扫描配置文件的每一行,然后根据配置内容完成等号两边的键值对应关系,例如:test=com.alibaba.dubbo.rpc.filter.TestFilter loadFile的任务就是把test和解析过以后的TestFilter类关系对应上,供以后的getExtension查找使用。注意看其中的这几行代码:...... clazz.getConstructor(type); //判断是否为wrapper实现 Set<Class<?>> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet<Class<?>>(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); ...... 这里就完成了cachedWrapperClasses的初始化,它根据查看配置文件中定义的扩展点实现是否包含一个带有当前类型的构造方法为条件,确定哪些是wrapper,这样我们就可以发现:filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper 这两行命中了。这也是之后在真正获取protocol扩展点时会动态注入的两个重要包装类,前者完成拦截器,后者完成监听器。至于拦截器和监听器的使用方法,我实在不知道除了官方提到的内容以外还有什么好补充的了,那就写到这里吧~
文章
Dubbo  ·  IDE  ·  应用服务中间件  ·  开发工具
2023-02-07
dubbo的缓存实现
这次的目标是缓存,没错,绝壁常用的一个知识点,我们怎么能不了解一下它的内部实现源码呢?!dubbo的官方描述很简洁,好的封装就是这么强大, 让你用起来丝毫不费力。我们今天就费力的看一下dubbo是如何提供cache功能的。有想直接使用的童鞋,就可以跳过下面内容直面看官方提供的简单例子。按照SPI的要求,我们从配置文件中可以看到dubbo提供的三种缓存接口的入口:threadlocal=com.alibaba.dubbo.cache.support.threadlocal.ThreadLocalCacheFactory lru=com.alibaba.dubbo.cache.support.lru.LruCacheFactory jcache=com.alibaba.dubbo.cache.support.jcache.JCacheFactory 先来看一下dubbo提供的AbstractCacheFactory的细节:public abstract class AbstractCacheFactory implements CacheFactory { private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>(); public Cache getCache(URL url) { String key = url.toFullString(); Cache cache = caches.get(key); if (cache == null) { caches.put(key, createCache(url)); cache = caches.get(key); } return cache; } protected abstract Cache createCache(URL url); } 很直观的看得出,该类完成了具体cache实现的实例化工作(注意getCache的返回类型Cache,该接口规范了不同缓存的实现),接下来我们就分三部分来具体看一下不同的缓存接口的具体实现。ThreadLocal如果你的配置如下: <dubbo:reference interface="com.foo.BarService" cache="threadlocal" /> 那就表明你使用的是该类型的缓存,根据SPI机制,会执行下面这个工厂类:public class ThreadLocalCacheFactory extends AbstractCacheFactory { protected Cache createCache(URL url) { return new ThreadLocalCache(url); } } 注意该类继承了上面提到的AbstractCacheFactory。可以看出,真正实例化的具体缓存层实现是ThreadLocalCache类型。由于此类型是基于线程本地变量的,所以非常简单:public class ThreadLocalCache implements Cache { private final ThreadLocal<Map<Object, Object>> store; public ThreadLocalCache(URL url) { this.store = new ThreadLocal<Map<Object, Object>>() { @Override protected Map<Object, Object> initialValue() { return new HashMap<Object, Object>(); } }; } public void put(Object key, Object value) { store.get().put(key, value); } public Object get(Object key) { return store.get().get(key); } } 这里注意的是,为了遵循接口定义才需要初始化时传入url参数,但其实该类型的缓存实现是完全不需要额外参数的。最后要叮嘱的是,该缓存应用场景为:比如一个页面渲染,用到很多portal,每个portal都要去查用户信息,通过线程缓存,可以减少这种多余访问。场景描述的核心内容是当前请求的上下文,可以结合dubbo的线程模型来更好的消化这一点。也许我们以后还会单独来分析这个主题。LRU类似ThreadLocal,我们就不再重复列举对应的工厂方法了,直接看LruCache类的实现:public class LruCache implements Cache { private final Map<Object, Object> store; public LruCache(URL url) { final int max = url.getParameter("cache.size", 1000); //定义了缓存的容量 this.store = new LinkedHashMap<Object, Object>() { private static final long serialVersionUID = -3834209229668463829L; @Override protected boolean removeEldestEntry(Entry<Object, Object> eldest) { //jdk提供的接口,用于移除最旧条目的需求 return size() > max; } }; } public void put(Object key, Object value) { synchronized (store) { //注意这里的同步条件 store.put(key, value); } } public Object get(Object key) { synchronized (store) { //注意这里的同步条件 return store.get(key); } } } 相比ThreadLocal,可以看出,该类型的缓存是跨线程的,也匹配我们常见的缓存场景。JCache对于我这种java新手,什么是JCache,显然需要科普一下,这里给出了我找到的几篇不错的文章:官府,草根,小栗子,注解篇,中文完美篇。由于内容太多,我就不胡乱翻译了~~由于这部分的代码太简单,节省篇幅就不列源码了。不过我们的项目缓存是基于redis的,而我并没有找到支持JCache的redis客户端,不知道大家有没有推荐的啊~??如何解析“cache”属性那么,cache层的逻辑是如何一步一步“注入”到我们的业务逻辑里呢?这还是要追溯到dubbo的过滤器上,我们知道在dubbo初始化指定protocol的时候,会使用装饰器模式把所有需要加载的过滤器封装到目标protocol上,这个细节指引我来查看ProtocolFilterWrapper类:refer() ---> buildInvokerChain() | V private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } 注意ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);这一行,单步调试可以得知它会返回所有需要“注入”的Filter逻辑,当然也包含我们关注的缓存:com.alibaba.dubbo.cache.filter.CacheFilter。注意看该类声明的开头:@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY) 这一行是关键哟,上面提到的getActivateExtension方法就是靠这一行注解工作的。dubbo以这种设计风格完成了大多数的功能,所以对于研究dubbo源码的童鞋,一定要多多注意。经历了这一圈下来,所有过滤器就已经注入到我们的服务当中了。业务层如何使用cache最后再来仔细看一下com.alibaba.dubbo.cache.filter.CacheFilter类的invoke方法:public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) { Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName())); if (cache != null) { String key = StringUtils.toArgumentString(invocation.getArguments()); if (cache != null && key != null) { Object value = cache.get(key); if (value != null) { return new RpcResult(value); } Result result = invoker.invoke(invocation); if (! result.hasException()) { cache.put(key, result.getValue()); } return result; } } } return invoker.invoke(invocation); } 可以看出,这里根据不同的配置会初始化并使用不同的缓存实现,好了,关于缓存的分析就到此为止。
文章
缓存  ·  Dubbo  ·  NoSQL  ·  Java  ·  应用服务中间件  ·  Redis
2023-02-07
dubbo的编解码,序列化和通信
dubbo的调研已经快完结了(按照我自己拟定的计划),计划内剩下的内容就只有:序列化编解码通信实现打算写在一篇里,年前彻底搞定dubbo[x]的调研,过完年来了就要投入使用了,好紧张哇~~哟呵呵呵呵!其实前两块的内容并没有啥好讲的,毕竟咱目的是了解源码来辅佐如何使用,而非像当当网的团队那样做dubbo的升级开发。按照源码的阅读习惯,我们按照上面列表的逆序来一个一个的分析。废话不多说,走着~通信实现我们主要基于dubbo推荐默认使用的通信框架:netty,来了解一下dubbo是如何完成两端通信的。我们直接从DubboProtocol类开始看起:export() --> openServer() --> createServer() | +--> server = Exchangers.bind(url, requestHandler); //创建服务 dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进行服务端server的创建,并且保证相同的配置(host+port)下只会开启一个server,这和netty提供的模型有关(NIO),这个我们后面再说。我们先来继续看Exchangers的相关部分,...... public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //这里尝试配置了编解码的方式 return getExchanger(url).bind(url, handler); } ...... public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); //默认使用HeaderExchanger return getExchanger(type); } ...... public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); } ...... 可以看出,Exchangers只是根据URL的参数提供了策略模式。我们依然以dubbo默认的处理方式为主,接下来代码执行到HeaderExchanger类:public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } } 这些代码看起来非常的设计模式:return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); | | | | | V | | | V 1.提供统一的服务操作接口 | | | 利用装饰器模式,这个才是最靠近业务的逻辑(直接调用相关的invoker) 2.创建心跳定时任务 V | | 1.利于扩展点机制选择通信框架 | | 2.格式化回调函数 | | V V 消息的解码??? 处理dubbo的通信模型:单向,双向,异步等通信模型 要想理解现在的内容,就得先搞清楚JAVA NIO channel概念,搞清楚netty的NIO线程模型。了解了这两个基础知识点,那么我们就可以继续分析源码了,上面那一行代码中Transporters.bind()默认会调用NettyTransporter:public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } } 接下来我们就真正进入到了netty的世界,我们先来看一下NettyServer的家谱:要时刻记着,dubbo是一个非常灵活的框架,我们不仅可以使用netty作为底层通信组件,也可以仅靠url参数就可以改变底层通信的实现,这种架构设计彰显了开发人员对代码的驾驭能力。AbstractServer抽象父类把创建server所需的公共逻辑抽离出来集中完成,而需要根据特定通信框架的逻辑则交给特定子类(NettyServer)利用重载(doOpen)完成,这样的代码结构在dubbo中随处可见。@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); //Upstream pipeline.addLast("encoder", adapter.getEncoder()); //Downstream pipeline.addLast("handler", nettyHandler); //Upstream & Downstream return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); } 如果是熟悉netty的童鞋,肯定早已习惯这个方法的写法,就是创建了netty的server嘛,不过需要注意的是,netty本身是基于事件的,留意一下上面的NettyServer的继承关系,其中ChannelHandler并不是netty的那个ChannelHandler,这就意味着要让前者转换成后者,才可以供netty使用,这也就是NettyHandler的意义,同样,类似这样的做法也可以在dubbo中找到多处。同时也要注意,NettyServer和NettyHandler都有同一个用于记录打开中的channel的集合:private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>,其中ip:port指的是调用端的ip和端口号 其中的Channel类型也并非netty的Channel,而是dubbo的NettyChannel,该类负责把netty的Channel,dubbo自身的url和handler映射起来,依赖这样的设计思想,就可以完全把业务和底层基础实现很好的隔离开来,灵活性大大提高,当然,复杂度也随之增加了,这是架构师需要权衡的一个哲学问题。dubbo封装netty就介绍到这里,我们的分析并没有深入到netty太多,因为小弟我对netty的了解也是非常的皮毛,为了避免误人子弟,所以更多的细节就留给高手来分享吧。编解码socket通信中有一个很好玩儿的部分,就是定义消息头,作用非常重大,例如解决粘包问题。dubbo借助netty这样的第三方框架来完成底层通信,这样一部分工作就委托出去了,不过还是有一些工作是需要dubbo好好规划的,我们来看一张官方提供的消息头格式:只有搞清楚了消息头结构设计,才能完成消息体的编码解码,才能交给底层通信框架去收发。上图中我们其实只需要关注Dubbo部分,其部分意义已经在这篇文章中阐述过了,我们这里只关注代码实现,再来看一下NettyServer类:@Override protected void doOpen() throws Throwable { ...... NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); //Upstream pipeline.addLast("encoder", adapter.getEncoder()); //Downstream pipeline.addLast("handler", nettyHandler); //Upstream & Downstream return pipeline; ...... } 注意看我在每一行后面加的注释,参见这一篇关于netty的流处理顺序的文章,我们就可以理解dubbo的编码解码是如何配置的。下面接着看一下getCodec()方法: protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); //这里的codecName值为dubbo if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { //应该是向下兼容 或者 阿里内部才会执行的代码 return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } } 这里又一次尝试根据url中的codec参数来确定最终使用的编解码类,不过我们可以在DubboProtocol类的定义中看到,其实这个参数已经被硬编码了://这里强行设置编码方式,有点硬啊 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); 注意这里Version.isCompatibleVersion()会去查找是否存在"com/taobao/remoting/impl/ConnectionRequest.class",但我们知道,这是taobao内部的实现。根据参数,我们看一下对应的配置文件:transport=com.alibaba.dubbo.remoting.transport.codec.TransportCodec telnet=com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec exchange=com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec #使用的是这个 thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftCodec 再看回来,NettyCodecAdapter完成了把netty和dubbo隔离的任务,使在后面进行编码解码时使用的channel不再是特定的通信框架提供的,而是dubbo提供的抽象实现。再往下深挖,就会看到dubbo是如何处理数据包的拆装,由于过于琐碎,我决定暂时不继续下去了,日后如果在使用时出现问题,会单独拿出来讲讲。序列化dubbo本身支持多种序列化方式,当当的duubox也在序列化方面做了新的工作,PRC中要解决跨进程通信的一个首要问题就是对象的系列化问题,业界各大佬公司和开源组织也都开源了很多优秀的项目,而要了解所有的序列化库是需要花大量时间的,我们依旧只关注dubbo是如何在代码层面触发序列化工作的。只有序列化算法本身,还是交给大家去对应官网进行深度学习吧。序列化是在向对端发送数据前的重要工作,事实上我是在DubboCodec类中发现序列化工作的入口的:protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { ...... Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); //获取对应的序列化库 ...... decodeEventData(channel, deserialize(s, channel.getUrl(), is)); ...... } private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) throws IOException { return serialization.deserialize(url, is); } //该方法继承自ExchangeCodec父类 protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); ...... ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); ...... } 而从dubbo的配置文件中看,dubbo[x]支持的序列化方式包括:dubbo=com.alibaba.dubbo.common.serialize.support.dubbo.DubboSerialization hessian2=com.alibaba.dubbo.common.serialize.support.hessian.Hessian2Serialization java=com.alibaba.dubbo.common.serialize.support.java.JavaSerialization compactedjava=com.alibaba.dubbo.common.serialize.support.java.CompactedJavaSerialization json=com.alibaba.dubbo.common.serialize.support.json.JsonSerialization fastjson=com.alibaba.dubbo.common.serialize.support.json.FastJsonSerialization nativejava=com.alibaba.dubbo.common.serialize.support.nativejava.NativeJavaSerialization kryo=com.alibaba.dubbo.common.serialize.support.kryo.KryoSerialization fst=com.alibaba.dubbo.common.serialize.support.fst.FstSerialization jackson=com.alibaba.dubbo.common.serialize.support.json.JacksonSerialization 好吧,到此为止,我们就算了解dubbo啦,如果有什么遗漏的地方,可以留言提醒小弟,一起学习进步。
文章
机器学习/深度学习  ·  设计模式  ·  编解码  ·  Dubbo  ·  算法  ·  架构师  ·  Java  ·  应用服务中间件
2023-02-07
事务消息应用场景、实现原理与项目实战(附全部源码)
作者:丁威活动中心场景介绍在电商系统上线初期,往往会进行一些“拉新”活动,例如活动部门提出新用户注册送积分、送优惠券活动。基于分布式、微服务的设计理念,通常的架构设计(子系统交互)如下图所示:其核心系统介绍如下:账户中心:提供用户登录、用户注册等服务,一个新用户注册时,向 MQ 服务器中的 USER_REGISTER 主题发送一条消息,主流程结束,与送积分,送优惠券等过程解耦。优惠券(券系统):提供发放优惠券、使用优惠券等与券相关的基础服务。积分中心:提供积分相关的服务,例如积分赠送、积分消费、积分查询等基础服务。送积分服务(消费者):订阅 MQ,按照规则决定是否需要赠送积分,如果需要则调用积分相关的基础接口,完成积分的发放。送优惠券(消费者):订阅 MQ,按照规则决定是否需要赠送优惠券,如果需要则调用券系统相关的基础接口,完成优惠券的发放。上面的架构设计非常优雅,但并不是无懈可击,如果新用户注册成功,但消息发送到 MQ 失败,或者消息成功发送到 MQ,但发送完 MQ 后系统出现异常导致用户注册失败又该如何呢?上面的问题其实就是典型的分布式事务问题:即如何保证用户注册(数据库操作)与 MQ 消息发送这两个分布式操作的一致性。RocketMQ 事务消息闪亮登场。事务消息实现原理一言以蔽之:RocketMQ 事务消息要解决的问题是消息发送与业务的一致性,其解决思路:二阶段提交与事务状态回查,其具体实现流程如下图所示:其核心设计理念:应用程序开启一个数据库事务,进行数据库操作,并且在事务中发送一条 PREPARE 消息,PREPARE 消息发送成功后通知应用程序记录本地事务状态,然后提交本地事务。RocketMQ 在收到类型为 PREPARE 的消息时,首先备份消息的原主题与原消息消费队列,然后将消息存储在主题为 RMQ_SYS_TRANS_HALF_TOPIC 的消息队列中,故 PREPARE 的消息是不会被客户端消费的。Broker 消息服务器开启一个定时任务处理 RMQ_SYS_TRANS_HALF_TOPIC 中的消息,会每隔指定时间向消息发送者发起事务状态查询请求 ,询问消息发送者客户端本地事务是否成功,然后根据回查状态决定是提交还是回滚,即对处于 PREPARE 状态进行提交或回滚操作。发送者如果明确得知事务成功,则可以返回 COMMIT,服务端会提交该条消息,具体操作是恢复原消息的主题与队列,重新发送到 Broker,消费端感知后消费。发送者如果无法明确得知事务状态,则返回 UNOWN,此时服务端会等待一定时间后再次向发送者询问,默认询问 15 次。发送者如果非常明确得知事务失败,则可以返回 ROLLBACK。在具体实践中,消息发送者在无法获取事务状态时不要武断的返回 ROLLBACK,而是要返回 UNOWN,让服务端定时重试回查,说明如下:在将 PREPARE 消息发送到 Broker 后,服务端发起事务查询时本地事务可能还未提交,为了避免无效的事务回查机制,RocketMQ 通常至少在收到 PREPARE 消息 6s 后才会发起第一次事务回查,可通过 transactionTimeOut 配置。故客户端在实现事务回查时无法证明事务状态时不应该返回 ROLLBACK,而是返回 UNOWN。事务消息实战光说不练假把式,接下来以一个新用户注册送优惠券的场景来详细介绍如何使用事务消息。项目模块职责说明如下:事务消息的核心代码组装在 transaction-service,其核心类图如下:其中核心要点如下:UserServiceImpl:Dubbo 接口业务实现类,类似 MVC 的控制层,在这里做一些参数验证,但不执行具体的业务逻辑,只是发送一条事务消息到 MQ。UserRegTransactionListener:事务监听器,在 executeLocalTransaction 方法中执行业务逻辑,数据库本地事务加在该方法。温馨提示:之所以不在 UserServicveImpl 中执行本地事务,是因为 executeLocalTransaction 中抛出的异常会被 RocketMQ 框架捕捉,及异常无法被 UserServiceImpl 感知,即无法实现其事务的一致性。接下来展示其核心代码,全部源码已上传到 github 仓库。仓库地址:https://github.com/dingwpmz/rocketmq-learning。UserServiceImpl 核心实现UserServiceImpl 的核心要点如下:首先应该对参数进行校验、业务逻辑进行校验,如果不满足业务条件,会发送一些无效消息到 MQ,虽然不会造成业务异常,但会消耗性能。发送事务消息,建议对消息设置 Key,Key 的值可以用业务处理流水号(可唯一表示该业务操作)或者核心业务字段(例如订单编号)。业务入口类可通过事务消息发送状态来判断业务是否失败。UserRegTransactionListener 核心实现事务监听器需要实现执行本地事务与事务回查两个接口。1、实现 executeLocalTransaction首先需要实现 executeLocalTransaction 方法,执行本地事务,其代码如下图所示:其中几个关键点说明如下:在该方法上添加数据库事务标签。执行业务逻辑,示例 Demo 只是将用户数据存储到数据库。如果业务执行失败,可明确告知需要回滚,上层调用方也可根据 ROLLBACK_MESSAGE 进行相应的处理。如果业务成功,不建议直接返回 COMMIT,而是建议返回 UNKNOW,因为该方法尽管在方法最后一行,但可能发生断电等异常情况,数据库并没有成功。2、实现 checkLocalTransaction其次需要实现事务状态回查,用来 RocketMQ 服务端感知事务是否成功,其实现原理如下图所示:其实现关键点如下:如果能明确得知本地事务成功,则返回 COMMIT_MESSAGE如该不能明确得知本地事务成功,不能返回 ROLLBACK_MESSAGE,而是返回 UNKNOW,等待服务端下一次事务回查(不会立即触发),服务端默认回查 15 次,如果 15 次都得到 UNKNOW,则会回滚该消息。代码获取上文只是将事务消息的核心代码加以解读,并重点阐述每个步骤的实现关键点,笔者基于 SpringBoot,尝试结合场景学习 RocketMQ 的使用技巧,其代码上传到了 github 仓库:https://github.com/dingwpmz/rocketmq-learning。点击跳转到代码仓库。扫码了解更多中间件技术干货和案例实践:
文章
消息中间件  ·  存储  ·  Dubbo  ·  前端开发  ·  中间件  ·  Java  ·  应用服务中间件  ·  数据库  ·  RocketMQ  ·  微服务
2023-02-07
【Netty】Netty高性能原理剖析
1、前言我们在实际项目中必然会遇到网络间的通信,也就是RPC,大家肯定都用过Dubbo,那么你对Dubbo底层---Netty了解多少呢?对于它为什么性能如此之高又了解多少呢?这篇文章就简单的介绍下Netty高性能原理。Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API 实现。它提供了对 TCP、UDP和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。2、Netty高性能实际上在 IO 编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者 IO 多路复用技术进行处理。其中IO 多路复用技术通过把多个 IO 的阻塞复用到同一个 select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,I/O 多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源。与 Socket 类和 ServerSocket 类相对应,NIO 也提供了 SocketChannel 和ServerSocketChannel两种不同的套接字通道实现。2.1 多路复用通讯方式Netty 架构按照 Reactor 模式设计和实现,它的服务端通信序列图如下:Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector, 可以同时并发处理成百上千个客户端 Channel,由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 IO 阻塞导致的线程挂起。2.2 异步通讯 NIO由于 Netty 采用了异步通信模式,一个 IO 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 IO 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。2.3 零拷贝(DIRECT BUFFERS 使用堆外直接内存)Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写, JVM 会将堆内存Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存, 消息在发送过程中多了一次缓冲区的内存拷贝。Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样 方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题。2.4 内存池(基于内存池的缓冲区重用机制)随着 JVM 虚拟机和 JIT 即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓 冲区 Buffer,情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽 量重用缓冲区,Netty 提供了基于内存池的缓冲区重用机制。2.5 高效的 Reactor 线程模型常用的 Reactor 线程模型有三种,Reactor 单线程模型, Reactor 多线程模型, 主从 Reactor 多线程模型。2.5.1 Reactor 单线程模型Reactor 单线程模型,指的是所有的 IO 操作都在同一个 NIO 线程上面完成,NIO 线程的职责如下: 1) 作为 NIO 服务端,接收客户端的 TCP 连接;2) 作为 NIO 客户端,向服务端发起 TCP 连接;3) 读取通信对端的请求或者应答消息;4) 向通信对端发送消息请求或者应答消息。由于 Reactor 模式使用的是异步非阻塞 IO,所有的 IO 操作都不会导致阻塞,理论上一个线程可以独 立处理所有 IO 相关的操作。从架构层面看,一个 NIO 线程确实可以完成其承担的职责。例如,通过 Acceptor 接收客户端的 TCP 连接请求消息,链路建立成功之后,通过 Dispatch 将对应的 ByteBuffer 派发到指定的 Handler 上进行消息解码。用户 Handler 可以通过 NIO 线程将消息发送给客户端。2.5.2 Reactor 多线程模型Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程处理 IO 操作。 有专门一个NIO 线程-Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求; 网络 IO 操作-读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送;2.5.3 主从 Reactor 多线程模型服务端用于接收客户端连接的不再是个 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。 Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责 SocketChannel 的读写和编解码工作。Acceptor 线程池仅仅只用于客户端的登陆、握手和安全 认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负 责后续的 IO 操作。2.6 无锁设计、线程绑定Netty 采用了串行无锁化设计,在 IO 线程内部进行串行操作,避免多线程竞争导致的性能下降。 表面上看,串行化设计似乎 CPU 利用率不高,并发程度不够。但是,通过调整 NIO 线程池的线程 参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列- 多个工作线程模型性能更优。Netty 的 NioEventLoop 读取到消息之后,直接调用 ChannelPipeline 的 fireChannelRead(Object msg),只要用户不主动切换线程,一直会由 NioEventLoop 调用 到用户的 Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。2.7 高性能的序列化框架Netty 默认提供了对 Google Protobuf 的支持,通过扩展 Netty 的编解码接口,用户可以实现其它的高性能序列化框架,例如 Thrift 的压缩二进制编解码框架。SO_RCVBUF 和 SO_SNDBUF:通常建议值为 128K 或者 256K。SO_TCPNODELAY:NAGLE 算法通过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提高网络应用效率。但是对于时延敏感的应用场景需要关闭该优化算法。软中断:开启 RPS 后可以实现软中断,提升网络吞吐量。RPS 根据数据包的源地址,目的地址以 及目的和源端口,计算出一个 hash 值,然后根据这个 hash 值来选择软中断运行的 cpu,从上层 来看,也就是说将每个连接和 cpu 绑定,并通过这个 hash 值,来均衡软中断在多个 cpu 上,提升 网络并行处理性能。3、NettyRPC实现3.1 概念RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一 样。RPC 可以很好的解耦系统,如 WebService 就是一种基于 Http 协议的 RPC。这个 RPC 整体框架 如下:3.2 关键技术服务发布与订阅:服务端使用 Zookeeper 注册服务地址,客户端从 Zookeeper 获取可用的服务 地址。通信:使用 Netty 作为通信框架。Spring:使用 Spring 配置服务,加载 Bean,扫描注解。动态代理:客户端使用代理模式透明化服务调用。消息编解码:使用 Protostuff 序列化和反序列化消息。3.3 核心流程服务消费方(client)调用以本地调用方式调用服务;client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;client stub 找到服务地址,并将消息发送到服务端;server stub 收到消息后进行解码;server stub 根据解码结果调用本地的服务;本地服务执行并将结果返回给 server stub;server stub 将返回结果打包成消息并发送至消费方;client stub 接收到消息,并进行解码;服务消费方得到最终结果。
文章
编解码  ·  弹性计算  ·  网络协议  ·  Dubbo  ·  算法  ·  安全  ·  Java  ·  应用服务中间件  ·  API  ·  Spring
2023-02-07
gRPC(一)入门:什么是RPC?
前言本文作为Grpc的开篇,通过文档先了解一下rpc。个人网站:https://linzyblog.netlify.app/示例代码已经上传到github:点击跳转一、RPC1、什么是RPC?RPC(Remote Procedure Call 远程过程调用)是一种软件通信协议,一个程序可以使用该协议向位于网络上另一台计算机中的程序请求服务,而无需了解网络的详细信息。RPC 用于调用远程系统上的其他进程,如本地系统。过程调用有时也称为 函数调用或 子程序调用。RPC是一种客户端-服务器交互形式(调用者是客户端,执行者是服务器),通常通过请求-响应消息传递系统实现。与本地过程调用一样,RPC 是一种 同步 操作,需要阻塞请求程序,直到返回远程过程的结果。但是,使用共享相同地址空间的轻量级进程或 线程 可以同时执行多个 RPC。通俗的解释:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。接口定义语言(IDL)——用于描述软件组件的应用程序编程接口(API)的规范语言——通常用于远程过程调用软件。在这种情况下,IDL 在链路两端的机器之间提供了一座桥梁,这些机器可能使用不同的操作系统 (OS) 和计算机语言。实际场景:有两台服务器,分别是服务器 A、服务器 B。在 服务器 A 上的应用 想要调用服务器 B 上的应用,它们可以直接本地调用吗?答案是不能的,但走 RPC 的话,十分方便。因此常有人称使用 RPC,就跟本地调用一个函数一样简单。2、HTTP和RPC的区别1)概念区别RPC是一种方法,而HTTP是一种协议。两者都常用于实现服务,在这个层面最本质的区别是RPC服务主要工作在TCP协议之上(也可以在HTTP协议),而HTTP服务工作在HTTP协议之上。由于HTTP协议基于TCP协议,所以RPC服务天然比HTTP更轻量,效率更胜一筹。两者都是基于网络实现的,从这一点上,都是基于Client/Server架构。2)从协议上区分RPC是远端过程调用,其调用协议通常包含:传输协议 和 序列化协议。传输协议:著名的 grpc,它底层使用的是 http2 协议;还有 dubbo 一类的自定义报文的 tcp 协议。序列化协议:基于文本编码的 json 协议;也有二进制编码的 protobuf、hession 等协议;还有针对 java 高性能、高吞吐量的 kryo 和 ftc 等序列化协议。HTTP服务工作在HTTP协议之上,而且HTTP协议基于TCP协议。3、RPC如何工作的?当调用 RPC 时,调用环境被挂起,过程参数通过网络传送到过程执行的环境,然后在该环境中执行过程。当过程完成时,结果将被传送回调用环境,在那里继续执行,就像从常规过程调用返回一样。在 RPC 期间,将执行以下步骤:1.客户端调用客户端存根。该调用是本地过程调用,参数以正常方式压入堆栈。2.客户端存根将过程参数打包到消息中并进行系统调用以发送消息。过程参数的打包称为编组。3.客户端的本地操作系统将消息从客户端机器发送到远程服务器机器。4.服务器操作系统将传入的数据包传递给服务器存根。5.服务器存根从消息中解包参数——称为解编组。6.当服务器过程完成时,它返回到服务器存根,它将返回值编组为一条消息。然后服务器 存根将消息交给传输层。7.传输层将生成的消息发送回客户端传输层,传输层将消息返回给客户端存根。8.客户端存根解组返回参数,然后执行返回给调用者。Client (客户端):服务调用方。Server(服务端):服务提供方。Client Stub(客户端存根):存放服务端的地址消息,负责将客户端的请求参数打包成网络消息,然后通过网络发送给服务提供方。Server Stub(服务端存根):接收客户端发送的消息,再将客户端请求参数打包成网络消息,然后通过网络远程发送给服务方。4、RPC的优缺点尽管它拥有广泛的好处,但使用 RPC 的人肯定应该注意一些陷阱。RPC 为开发人员和应用程序管理人员提供的一些优势:帮助客户端通过传统使用高级语言中的过程调用与服务器进行通信。可以在分布式环境中使用,也可以在本地环境中使用。支持面向进程和面向线程的模型。对用户隐藏内部消息传递机制。只需极少的努力即可重写和重新开发代码。提供抽象,即网络通信的消息传递特性对用户隐藏。省略许多协议层以提高性能。另一方面,RPC 的一些缺点包括:客户端和服务器各自的例程使用不同的执行环境,资源(如文件)的使用也更加复杂。因此,RPC 系统并不总是适合传输大量数据。RPC 极易发生故障,因为它涉及一个通信系统、另一台机器和另一个进程。RPC没有统一的标准;它可以通过多种方式实现。RPC 只是基于交互的,因此它在硬件架构方面没有提供任何灵活性。5、常见的RPC框架1)跟语言绑定框架Dubbo:国内最早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。Motan:微博内部使用的 RPC 框架,于 2016 年对外开源,仅支持 Java 语言。Tars:腾讯内部使用的 RPC 框架,于 2017 年对外开源,仅支持 C++ 语言。Spring Cloud:国外 Pivotal 公司 2014 年对外开源的 RPC 框架,仅支持 Java 语言。2)跨语言开源框架gRPC:Google 于 2015 年对外开源的跨语言 RPC 框架,支持多种语言。Thrift:最初是由Facebook 开发的内部系统跨语言的 RPC 框架,2007 年贡献给了 Apache 基金,成为 Apache 开源项目之一,支持多种语言。Rpcx:是一个类似阿里巴巴 Dubbo和微博 Motan的 RPC 框架,开源,支持多种语言。二、RPC快速入门Go语言标准包(net/rpc)已经提供了对RPC的支持,而且支持三个级别的RPC:TCP、HTTP和JSONRPC。但Go语言的RPC包是独一无二的RPC,它和传统的RPC系统不同,它只支持Go语言开发的服务器与客户端之间的交互,因为在内部,它们采用了Gob来编码。1、简单的RPC示例1)服务端实现我们先构造一个 HelloService 类型,其中的 SayHi方法用于实现打印功能:type HelloService struct{} func (h *HelloService) SayHi(request string, response *string) error { format := time.Now().Format("2006-01-02 15:04:05") *response = "hi " + request + "---" + format return nil }Go 语言的 RPC 规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个 error 类型,同时必须是公开的方法。将 HelloService 类型的对象注册为一个 RPC 服务:func main() { //注册服务 _ = rpc.RegisterName("HiLinzy", new(HelloService)) //监听接口 lis, err := net.Listen("tcp", ":8888") if err != nil { log.Fatal(err) return } for { //监听请求 accept, err := lis.Accept() if err != nil { log.Fatalf("Accept Error: %s", err) } //用goroutine为每个TCP连接提供RPC服务 go rpc.ServeConn(accept) } }RegisterName类似于Register,但使用提供的名称作为类型,Register 函数调用会将对象类型中所有满足 RPC 规则的对象方法注册为 RPC 函数,所有注册的方法会放在 “HelloService” 服务空间之下。rpc.ServeConn 函数在该 TCP 连接上为对方提供 RPC 服务。我们的服务支持多个 TCP 连接,然后为每个 TCP 连接提供 RPC 服务。2)客户端实现在客户端请求 HelloService 服务的代码:func main() { //注册服务 _ = rpc.RegisterName("HiLinzy", new(HelloService)) //监听接口 lis, err := net.Listen("tcp", ":8888") if err != nil { log.Fatal(err) return } for { //监听请求 accept, err := lis.Accept() if err != nil { log.Fatalf("Accept Error: %s", err) } //用goroutine为每个TCP连接提供RPC服务 go rpc.ServeConn(accept) } }rpc.Dial 拨号 RPC 服务,然后通过 dial.Call 调用具体的 RPC 方法。在调用 dial.Call 时,第一个参数是用点号连接的 RPC 服务名字和方法名字,第二和第三个参数分别我们定义 RPC 方法的两个参数,第一个是客服端传递的消息,第二个是由服务端产生返回的结果。# 启动服务 ➜ go run server.go API server listening at: 127.0.0.1:54096 # 启动客户端 ➜ go run client.go API server listening at: 127.0.0.1:54100 rpc service result: hi linzy---2022-10-30 15:52:39 rpc service result: hi linzy---2022-10-30 15:52:40 rpc service result: hi linzy---2022-10-30 15:52:41 rpc service result: hi linzy---2022-10-30 15:52:42 rpc service result: hi linzy---2022-10-30 15:52:432、更安全的RPC接口在涉及 RPC 的应用中,作为开发人员一般至少有三种角色:首先是服务端实现 RPC 方法的开发人员,其次是客户端调用 RPC 方法的人员,最后也是最重要的是制定服务端和客户端 RPC 接口规范的设计人员。在前面的例子中我们为了简化将以上几种角色的工作全部放到了一起,虽然看似实现简单,但是不利于后期的维护和工作的切割。1)服务端重构如果要重构 HelloService 服务,第一步需要明确服务的名字和接口:const HelloServiceName = "server/tcp-server/server.HiLinzy" type HelloServiceInterface interface { SayHi(request string, response *string) error } //封装Register func RegisterHelloService(svc HelloServiceInterface) error { return rpc.RegisterName(HelloServiceName, svc) }我们将 RPC 服务的接口规范分为三个部分:首先是服务的名字,然后是服务要实现的详细方法列表,最后是注册该类型服务的函数。为了避免名字冲突,我们在 RPC 服务的名字中增加了包路径前缀(这个是 RPC 服务抽象的包路径,并非完全等价 Go 语言的包路径)。RegisterHelloService 注册服务时,编译器会要求传入的对象满足 HelloServiceInterface 接口。基于 RPC 接口规范编写真实的服务端代码:type HelloService struct{} func (h *HelloService) SayHi(request string, response *string) error { format := time.Now().Format("2006-01-02 15:04:05") *response = "hi " + request + "---" + format return nil } func main() { //注册服务 //_ = rpc.RegisterName("HiLinzy", new(HelloService)) RegisterHelloService(new(HelloService)) //监听接口 lis, err := net.Listen("tcp", "127.0.0.1:8888") if err != nil { log.Fatal(err) return } for { //监听请求 accept, err := lis.Accept() if err != nil { log.Fatalf("Accept Error: %s", err) } go rpc.ServeConn(accept) } }2)客户端重构为了简化客户端用户调用 RPC 函数,我们在可以在接口规范部分增加对客户端的简单包装:const HelloServiceName = "server/tcp-server/server.HiLinzy" type HelloServiceClient struct { *rpc.Client } func DialHelloService(network, address string) (*HelloServiceClient, error) { c, err := rpc.Dial(network, address) if err != nil { return nil, err } return &HelloServiceClient{Client: c}, nil } func (h *HelloServiceClient) SayHi(request string, response *string) error { //client.Call 的第一个参数用 HelloServiceName+".SayHi" 代替了 "HiLinzy.SayHi"。 return h.Client.Call(HelloServiceName+".SayHi", request, &response) }提供了一个 DialHelloService 方法,直接拨号 HelloService 服务。基于新的客户端接口,我们可以简化客户端用户的代码:func main() { //建立连接 //dial, err := rpc.Dial("tcp", "127.0.0.1:8888") client, err := DialHelloService("tcp", "127.0.0.1:8888") if err != nil { log.Fatal("dialing:", err) } var result string for i := 0; i < 5; i++ { //发起请求 //_ = dial.Call("HiLinzy.SayHi", "linzy", &result) err = client.SayHi("linzy", &result) if err != nil { log.Fatal(err) } fmt.Println("rpc service result:", result) time.Sleep(time.Second) } } 现在客户端用户不用再担心 RPC 方法名字或参数类型不匹配等低级错误的发生。# 启动服务 ➜ go run server.go API server listening at: 127.0.0.1:56990 # 启动客户端 ➜ go run client.go API server listening at: 127.0.0.1:57188 rpc service result: hi linzy---2022-10-30 16:55:12 rpc service result: hi linzy---2022-10-30 16:55:13 rpc service result: hi linzy---2022-10-30 16:55:14 rpc service result: hi linzy---2022-10-30 16:55:15 rpc service result: hi linzy---2022-10-30 16:55:16在新的 RPC 服务端实现中,我们用 RegisterHelloService 函数来注册函数,这样不仅可以避免命名服务名称的工作,同时也保证了传入的服务对象满足了 RPC 接口的定义。3、跨语言的 RPC标准库的RPC默认采用 Go 语言特有的 gob 编码,因此从其他语言调用 Go 语言实现的 RPC 服务将比较困难。在互联网的微服务时代,每个 RPC 以及服务的使用者都可能采用不同的编程语言,因此跨语言是互联网时代 RPC 的一个首要条件。得益于 RPC 的框架设计,Go 语言的 RPC 其实也是很容易实现跨语言支持的。Go 语言的 RPC 框架有两个比较有特色的设计:RPC 数据打包时可以通过插件实现自定义的编码和解码。RPC 建立在抽象的 io.ReadWriterCloser 接口之上的,我们可以将 RPC 架设在不同的通信协议之上。这里我们使用Go官方自带的 net/rpc/jsonrpc 扩展实现一个跨语言的rpc。1)服务端实现首先是基于 json 编码重新实现 RPC 服务:func main() { //注册服务 //_ = rpc.RegisterName("HiLinzy", new(HelloService)) RegisterHelloService(new(HelloService)) //监听接口 lis, err := net.Listen("tcp", "127.0.0.1:8888") if err != nil { log.Fatal(err) return } for { //监听请求 accept, err := lis.Accept() if err != nil { log.Fatalf("Accept Error: %s", err) } //go rpc.ServeConn(accept) go rpc.ServeCodec(jsonrpc.NewServerCodec(accept)) } }代码中最大的变化是用 rpc.ServeCodec 函数替代了 rpc.ServeConn 函数,传入的参数是针对服务端的 json 编解码器。2)客户端实现实现 json 版本的客户端:func main() { //建立TCP连接 conn, err := net.Dial("tcp", "127.0.0.1:8888") if err != nil { log.Fatal("net.Dial:", err) } //建立针对客户端的json编解码器 client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) var result string for i := 0; i < 5; i++ { //发起请求 //err = client.SayHi("linzy", &result) client.Call(HelloServiceName+".SayHi", "linzy", &result) if err != nil { log.Fatal(err) } fmt.Println("rpc service result:", result) time.Sleep(time.Second) } }# 启动服务 ➜ go run server.go API server listening at: 127.0.0.1:59409 # 启动客户端 ➜ go run client.go API server listening at: 127.0.0.1:59514 rpc service result: hi linzy---2022-10-30 19:09:52 rpc service result: hi linzy---2022-10-30 19:09:53 rpc service result: hi linzy---2022-10-30 19:09:54 rpc service result: hi linzy---2022-10-30 19:09:55 rpc service result: hi linzy---2022-10-30 19:09:56我们先手工调用 net.Dial 函数建立 TCP 连接,然后基于该连接建立针对客户端的 json 编解码器。在确保客户端可以正常调用 RPC 服务的方法之后,我们用一个普通的 TCP 服务代替 Go 语言版本的 RPC 服务,这样可以查看客户端调用时发送的数据格式。3)分析数据格式我们用Wireshark抓个包看看我们直接传递的数据格式:这是一个 json 编码的数据,其中 method 部分对应要调用的 rpc 服务和方法组合成的名字,params 部分的第一个元素为参数,id 是由调用端维护的一个唯一的调用编号。{"method":"server/tcp-server/server.HiLinzy.SayHi","params":["linzy"],"id":0}请求的 json 数据对象在内部对应两个结构体:客户端是 clientRequest,服务端是 serverRequest。clientRequest 和 serverRequest 结构体的内容基本是一致的:type clientRequest struct { Method string `json:"method"` Params [1]interface{} `json:"params"` Id uint64 `json:"id"` } type serverRequest struct { Method string `json:"method"` Params *json.RawMessage `json:"params"` Id *json.RawMessage `json:"id"` }我们再来查看服务端响应的结果的数据结构:返回的结果也是一个 json 格式的数据:{"id":0,"result":"hilinzy---2022-10-30 19:09:52","error":null}.其中 id 对应输入的 id 参数,result 为返回的结果,error 部分在出问题时表示错误信息。对于顺序调用来说,id 不是必须的。但是 Go 语言的 RPC 框架支持异步调用,当返回结果的顺序和调用的顺序不一致时,可以通过 id 来识别对应的调用。返回的 json 数据也是对应内部的两个结构体:客户端是 clientResponse,服务端是 serverResponse。两个结构体的内容同样也是类似的:type clientResponse struct { Id uint64 `json:"id"` Result *json.RawMessage `json:"result"` Error interface{} `json:"error"` } type serverResponse struct { Id *json.RawMessage `json:"id"` Result interface{} `json:"result"` Error interface{} `json:"error"` }因此无论采用何种语言,只要遵循同样的 json 结构,以同样的流程就可以和 Go 语言编写的 RPC 服务进行通信。这样我们就实现了跨语言的 RPC。4、HTTP 上的 RPCGo 语言内在的 RPC 框架已经支持在 HTTP 协议上提供 RPC 服务。但是框架的 HTTP 服务同样采用了内置的 gob 协议,并且没有提供采用其它协议的接口,因此从其它语言依然无法访问的。在前面的例子中,我们已经实现了在 TCP 协议之上运行 jsonrpc 服务,并且通过Wireshark抓包分析传递的数据 json 数据格式。现在我们尝试在 http 协议上提供 jsonrpc 服务。新的 RPC 服务其实是一个类似 REST 规范的接口,接收请求并采用相应处理流程:const HelloServiceName = "server/tcp-server/server.HiLinzy" type HelloService struct{} func (h *HelloService) SayHi(request string, response *string) error { format := time.Now().Format("2006-01-02 15:04:05") *response = "hi " + request + "---" + format return nil } func main() { //注册服务 rpc.RegisterName(HelloServiceName, new(HelloService)) http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) { var conn io.ReadWriteCloser = struct { io.Writer io.ReadCloser }{ ReadCloser: r.Body, Writer: w, } rpc.ServeRequest(jsonrpc.NewServerCodec(conn)) }) http.ListenAndServe(":8888", nil) }RPC 的服务架设在 “/jsonrpc” 路径,在处理函数中基于 http.ResponseWriter 和 http.Request 类型的参数构造一个 io.ReadWriteCloser 类型的 conn 通道。然后基于 conn 构建针对服务端的 json 编码解码器。最后通过 rpc.ServeRequest 函数为每次请求处理一次 RPC 方法调用。用Postman模拟RPC调用过程,向连接localhost:8888/jsonrpc发送一条 json 字符串{"method":"server/tcp-server/server.HiLinzy.SayHi","params":["linzy"],"id":0}这样我们就可用很方便的从不同的语言或者不同的方式来访问RPC服务了。参考文章:https://www.techtarget.com/searchapparchitecture/definition/Remote-Procedure-Call-RPC​https://mp.weixin.qq.com/s__biz=MzI5MDAzNTAxMQ==&mid=2455917150&idx=1&sn=8a8325b09e6e2a0e34bf86609967f28c&scene=19#wechat_redirect​https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-01-rpc-intro.html
文章
JSON  ·  网络协议  ·  Dubbo  ·  Java  ·  应用服务中间件  ·  Go  ·  Apache  ·  数据格式  ·  网络架构  ·  C++
2023-02-06
Dubbo 3 之 Triple 流控反压原理解析
​作者:顾欣Triple 是 Dubbo3 提出的基于 HTTP2 的开放协议,旨在解决 Dubbo2 私有协议带来的互通性问题。Triple 基于 HTTP2 定制自己的流控,支持通过特定的异常通知客户端业务层服务端负载高情况,保护了服务端被大流量击垮,提高系统高可用能力。流控反压现状客户端和服务器端在接收数据的时候有一个缓冲区来临时存储数据,但是缓冲区的大小是有限制的,所以有可能会出现缓冲区溢出的情况, Http 通过流控保护数据溢出丢失风险。Http1 流控在 HTTP1.1 中,流量的控制依赖的是底层 TCP 协议,在客户端和服务器端建立连接的时候,会使用系统默认的设置来建立缓冲区。在数据进行通信的时候,会告诉对方它的接收窗口的大小,这个接收窗口就是缓冲区中剩余的可用空间。如果接收窗口大小为零,则说明接收方缓冲区已满,则发送方将不再发送数据,直到客户端清除其内部缓冲区,然后请求恢复数据传输。Http2 流控Http2 使用了多路复用机制,一个 TCP 连接可以有多个 Http2 连接,故在 Http2 中,有更加精细的流控制机制,允许服务端实现自己数据流和连接级的流控制。服务端与客户端初次见了连接时,会通过发送 Http2SettingsFrame 设置初始化的流控窗口大小,用于 Stream 级别流控,默认为65,535字节。定好流控窗口后,每次客户端发送数据就会减少流控窗口的大小,服务端收到数据后会发送窗口更新包(WINDOW_UPDATE frame)通知客户端更新窗口。客户端收到窗口更新包后就会增加对应值的流控窗口,从而达到动态控制的目的。Triple 流控反压Netty 基于 Http2 实现了基础的流控,当服务端负载过高,客户端发送窗口为0时,新增请求就无法被发送出去,会在缓存到客户端待发送请求队列中,缓存数据过大,就会造成客户端内存溢出,影响业务程序。Triple 基于 netty 实现了 Http2 协议,通过 Http2FlowController 接口统一封装,在实现分为进站(inbound)和出站(outbound)两个维度的实现。Triple 在 inbound 流量上使用了 netty 的默认流控实现,在 outbound 上实现了自己流控,基于服务端负载,将服务端流量压力透传到客户端业务层,实现客户端的业务反压,暂停业务继续发送请求,保护服务端不被大流量击垮。连接初始化Triple 在初次建立连接时,通过 TripleHttp2Protocol 初始化 http2 配置,默认流控窗口 DEFAULT_WINDOW_INIT_SIZE = MIB_8,并在服务端和客户端加入自己的 outbound 流控接口。Inbound 流控Inbound 流量会通过 DefaultHttp2LocalFlowController 的 consumeBytes 方法实现流控窗口更新与发送。1.入口传入 Http 流与更新数据大小2.找到对应连接实现数据消费3.更新流控窗口 4.发送流控更新数据包(window_update)Outbound 流控Outbound 通过 Triple 自己的流控实现 TriHttp2RemoteFlowController,将服务端压力反馈到业务层,保护服务端被大流量击垮。1.发送数据时判断是否还有窗口2.窗口为0时抛出特定异常3.反馈客户端流控异常总结Triple 通过将底层客户端发送窗口为0场景封装为特定流控异常,透传至客户端上层业务,阻止客户端业务继续数据发送,有效的保护了服务端被大流量击垮和客户端的内存溢出的问题。未来展望目前 Triple 已经基本实现了流控反压能力,未来我们将深度联动业务,基于业务负载自适应调整反压流控,一是在 inbound 上将流控窗口包发送时机调整到服务端业务处理完成后,二是在 outbound 流量上关联客户端业务层,动态调整客户端发送速率。从而实现基于服务端业务负载动态反压流控机制。点击此处查看原文文档
文章
存储  ·  缓存  ·  网络协议  ·  Dubbo  ·  应用服务中间件
2023-02-06
1 2 3 4 5 6 7 8 9
...
20
跳转至:
阿里中间件
164654 人关注 | 1945 讨论 | 1034 内容
+ 订阅
  • 谈谈我工作中的23个设计模式
  • Java Agent 踩坑之 appendToSystemClassLoaderSearch 问题
  • 展望架构的2023:Serverless 兴起,下一代微服务的雏形和标准化开始呈现
查看更多 >
消息队列
1 人关注 | 0 讨论 | 16 内容
+ 订阅
  • 15年云原生实践,在关键节点我们做对了什么? | 云原生大咖说
  • 事务消息应用场景、实现原理与项目实战(附全部源码)
  • 关于热力图数据上报清洗,我们做了一个有意思的尝试
查看更多 >
微服务
23004 人关注 | 11351 讨论 | 33478 内容
+ 订阅
  • 网络的几个问题
  • 【Linux进程间通信】一、什么是IPC
  • 大型系统应用边界设计原则与实践
查看更多 >
开发与运维
5637 人关注 | 131540 讨论 | 303677 内容
+ 订阅
  • 【Linux守护进程】二、守护进程详解
  • 【Linux守护进程】一、进程组与会话
  • 【Linux信号专题】五、SIGCHLD信号详解
查看更多 >
数据库
252655 人关注 | 50918 讨论 | 95248 内容
+ 订阅
  • 网络的几个问题
  • 数据库知识容易忽略
  • redis基础要会
查看更多 >