dubbo源码学习(四):暴露服务的过程

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: dubbo采用的nio异步的通信,通信协议默认为 netty,当然也可以选择 mina,grizzy。在服务端(provider)在启动时主要是开启netty监听,在zookeeper上注册服务节点,处理消费者请求,返回处理后的消息给消费者,消费者使用服务时主要是订阅服务的节点,监听zookeeper节点目录,服务端的变化时zookeeper会推送给消费者,消费者重新缓存服务地址等。

dubbo采用的nio异步的通信,通信协议默认为 netty,当然也可以选择 mina,grizzy。在服务端(provider)在启动时主要是开启netty监听,在zookeeper上注册服务节点,处理消费者请求,返回处理后的消息给消费者,消费者使用服务时主要是订阅服务的节点,监听zookeeper节点目录,服务端的变化时zookeeper会推送给消费者,消费者重新缓存服务地址等。服务者、消费者、zookeeper三者之间都是长连接。

 

下面看dubbo源码来看服务暴露的过程,服务暴露的入口为:


com.alibaba.dubbo.config.ServiceConfig#export 方法,


代码如下:

 

//是否延时暴露  
        if (delay != null && delay > 0) {  
            Thread thread = new Thread(new Runnable() {  
                public void run() {  
                    try {  
                        Thread.sleep(delay);  
                    } catch (Throwable e) {  
                    }  
                    doExport();  
                }  
            });  
            thread.setDaemon(true);  
            thread.setName("DelayExportServiceThread");  
            thread.start();  
        } else {  
            //不延时暴露,则直接暴露  
            doExport();  
        }


上在代码无论是延时暴露或直接暴露调用的方法是:doExport(),doExport会对解析完的配置再做一次检查,核心代码大家可以查看dubbo的源码,下面列出一小部分


/* 
            检查默认设置,如果xml中没有配置<dubbo:provider 
            主要是从系统环境变量中寻找是否有相应的provider的配置 
*/  
        checkDefault();  
        //下面设置的内容如果没有配置<dubbo:provider时基本上都是Null  
        if (provider != null) {  
            if (application == null) {  
                application = provider.getApplication();  
            }  
            if (module == null) {  
                module = provider.getModule();  
            }  
            if (registries == null) {  
                registries = provider.getRegistries();  
            }  
            if (monitor == null) {  
                monitor = provider.getMonitor();  
            }  
            if (protocols == null) {  
                protocols = provider.getProtocols();  
            }  
        }  
        if (module != null) {  
            //registries一般都会配置  
            if (registries == null) {  
                registries = module.getRegistries();  
            }  
            if (monitor == null) {  
                monitor = module.getMonitor();  
            }  
        }  
        if (application != null) {  
            //application一般也会配置  
            if (registries == null) {  
                registries = application.getRegistries();  
            }  
            if (monitor == null) {  
                monitor = application.getMonitor();  
            }  
        }  
        //是否泛化调用  
        if (ref instanceof GenericService) {  
            interfaceClass = GenericService.class;  
            if (StringUtils.isEmpty(generic)) {  
                generic = Boolean.TRUE.toString();  
            }  
        } else {  
            try {  
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()  
                        .getContextClassLoader());  
            } catch (ClassNotFoundException e) {  
                throw new IllegalStateException(e.getMessage(), e);  
            }  
            /* 
                检查即将暴露的接口的方法配置,检查方法是否在接口中存在 
                一般不会配置所以一般情况下methods为null 
                <dubbo:service  > <dubbo:method /> </dubbo:serivce> 
             */  
            checkInterfaceAndMethods(interfaceClass, methods);  
            /* 
                检查接口的引用不为空,并且必须实现的是要暴露的接口 
             */  
            checkRef();  
            generic = Boolean.FALSE.toString();  
        }

 

所有的检查通过之后,会调用 :

com.alibaba.dubbo.config.ServiceConfig#doExportUrls 
/* 
            将注册协议转化成url 
            registry://45.119.68.23:2181/com.alibaba.dubbo.registry.RegistryService? 
            application=test-dubbo&dubbo=2.5.3&pid=7648&registry=zookeeper×tamp=1462349748801 
         */  
        List<URL> registryURLs = loadRegistries(true);  
        //配置多通信协议时,都进行暴露  
        for (ProtocolConfig protocolConfig : protocols) {  
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);  
        }


doExportUrlsFor1Protocol中主要将所有的配置转化成map,然后将map转化成dubbo的统一URL,最终暴露的dubbo服务也就是这个统一的url,这个url也会注册到zookeeper的节点上,部分代码如下:


/* 
    将不为null的配置对象中的属性设置到 map 中 
    即将 xml 配置文件中的配置设置的值全转化成为map 
    {side=provider, application=alijk-dubbo, accepts=1000, 
        dubbo=2.5.3, threads=100, pid=7236, interface=cn.eoncloud.account.sdk.export.AccountService, 
        threadpool=fixed, version=1.0.0, timeout=500, anyhost=true, timestamp=1462347843960} 
 */  
appendParameters(map, application);  
appendParameters(map, module);  
appendParameters(map, provider, Constants.DEFAULT_KEY);  
appendParameters(map, protocolConfig);  
appendParameters(map, this);  
...... 
/* 
    将配置信息转化成 url ,主要根据之前map里的数据组装成url 
    调用 URL#buildString方法 
    dubbo://10.6.13.137:9998/cn.eoncloud.account.sdk.export.AccountService 
    ?accepts=1000&anyhost=true&application=test-dubbo&dubbo=2.5.3 
    &interface=cn.eoncloud.account.sdk.export.AccountService 
    &methods=getAccountName,getAllTest&pid=7236&revision=1.0.0&side=provider 
    &threadpool=fixed&threads=100&timeout=500×tamp=1462347843960&version=1.0.0 
 */  
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);  
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)  
        .hasExtension(url.getProtocol())) {  
    url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)  
            .getExtension(url.getProtocol()).getConfigurator(url).configure(url);  
}  
......  
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));  
//com.alibaba.dubbo.registry.integration.RegistryProtocol#export 即将进行暴露  
Exporter<?> exporter = protocol.export(invoker);


上面的代码核心暴露的一行代码为:protocol.export(invoker); 这个protocol的值为:RegistryProtocol,也就是暴露会跳到:RegistryProtocol.exprot中去处理,


RegistryProtocol.exprot主要做两件事情:


1、开启netty服务端 。


2、创建zookeeper服务节点。


下面来看RegistryProtocol.export方法,代码如下:


public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {  
        //export invoker doLocalExport调用dubboProtocol.export开启netty服务监听  
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  
        //registry provider  
        final Registry registry = getRegistry(originInvoker);  
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);  
        //调用zodoRegister的doRegister 创建zookeeper的服务节点  
        registry.register(registedProviderUrl);  
        // 订阅override数据  
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。  
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);  
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);  
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);  
        //订阅  
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);  
        //保证每次export都返回一个新的exporter实例  
        return new Exporter<T>() {  
            public Invoker<T> getInvoker() {  
                return exporter.getInvoker();  
            }  
            public void unexport() {  
                try {  
                    exporter.unexport();  
                } catch (Throwable t) {  
                    logger.warn(t.getMessage(), t);  
                }  
                try {  
                    registry.unregister(registedProviderUrl);  
                } catch (Throwable t) {  
                    logger.warn(t.getMessage(), t);  
                }  
                try {  
                    overrideListeners.remove(overrideSubscribeUrl);  
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);  
                } catch (Throwable t) {  
                    logger.warn(t.getMessage(), t);  
                }  
            }  
        };  
    }


上面的代码里有一段特别重要,关键性的代码在doLocalExport中:


final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));  
//此处protol为dubboProtocol  
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);


从上面的代码中可以看到会调用dubboProtocol的export对服务进行暴露,这个export最终目的就是开启netty的监听,下面来看dubbo是如何一步一步开启netty的


private void openServer(URL url) {  
       // find server. ip:port  
       String key = url.getAddress();  
       //client 也可以暴露一个只有server可以调用的服务。  
       boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);  
       if (isServer) {  
        ExchangeServer server = serverMap.get(key);  
        if (server == null) {  
               //创建 Server  
            serverMap.put(key, createServer(url));  
        } else {  
            //server支持reset,配合override功能使用  
            server.reset(url);  
        }  
       }  
   }  
   private ExchangeServer createServer(URL url) {  
       //默认开启server关闭时发送readonly事件  
       url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());  
       //默认开启heartbeat  
       url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));  
       //默认使用netty  
       String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  
       if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))  
           throw new RpcException("Unsupported server type: " + str + ", url: " + url);  
       //默认使用dubbo协议编码  
       url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);  
       ExchangeServer server;  
       try {  
           //HeaderExchangeServer 在此处已经开启了Netty Server 进行监听  
           server = Exchangers.bind(url, requestHandler);  
       } catch (RemotingException e) {  
           throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);  
       }  
       str = url.getParameter(Constants.CLIENT_KEY);  
       if (str != null && str.length() > 0) {  
           Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();  
           if (!supportedTypes.contains(str)) {  
               throw new RpcException("Unsupported client type: " + str);  
           }  
       }  
       return server;  
   }


在上面的代码中:Exchangers.bind(url, requestHandler)  默认为:


HeaderExchanger.bind()
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {  
        //Transporters默认为NettyTransporter  
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));  
    }


代码运行到这里可以看到传输方式了,dubbo默认采用的通信方式为 NettyTransporter ,再来看NettyTransporter.bind方法


public static final String NAME = "netty";  
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {  
        return new NettyServer(url, listener);  
    }


已经能看到NettyServer了,dubbo在暴露服务最终开启的netty服务监听,监听消费者发送的请求,通过反射调用方法得到结果通过 tcp/ip 网络传输返回给消费者。再进入到NettyServer中我们就能看到非常传统的开启Netty服务的代码了


protected void doOpen() throws Throwable {  
        NettyHelper.setNettyLoggerFactory();  
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));  
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));  
        //最后一个参数为 NIO 最大工作线程数  
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));  
        //netty server 启动器  
        bootstrap = new ServerBootstrap(channelFactory);  
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);  
        channels = nettyHandler.getChannels();  
        // https://issues.jboss.org/browse/NETTY-365  
        // https://issues.jboss.org/browse/NETTY-379  
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
            public ChannelPipeline getPipeline() {  
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);  
                ChannelPipeline pipeline = Channels.pipeline();  
                /*int idleTimeout = getIdleTimeout();  
                if (idleTimeout > 10000) {  
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));  
                }*/  
                pipeline.addLast("decoder", adapter.getDecoder());  
                pipeline.addLast("encoder", adapter.getEncoder());  
                pipeline.addLast("handler", nettyHandler);  
                return pipeline;  
            }  
        });  
        // 创建一个绑定到指定地址的新通道,也就是绑定IP、端口供客户端连接  
        channel = bootstrap.bind(getBindAddress());  
    }


上面的代码执行完成后,netty的服务端就已经开启了,可以接收客户端的连接了,但客户端连接上来要怎么处理呢?消息接收、发送怎么处理呢?所有的处理都在上面代码的 NettyHandler类中,Nettyhandler继承了Netty包中的的SimpleChannelHandler

 

NettyHandler extends SimpleChannelHandler  


重写了 channelConnected、channelDisconnected、messageReceived等方法,而我们比较关注的可能是messagereceived方法,在收到消息时如何处理,但今天暂时先不看dubbo如果处理消息,只看暴露,消息处理如何实现异步通信下一节再讲。

 

/**  
     * 收到消息时触发  
     * @param ctx  
     * @param e  
     * @throws Exception  
     */  
    @Override  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {  
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);  
        try {  
            handler.received(channel, e.getMessage());  
        } finally {  
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());  
        }  
    }


从前面知道,开启netty服务是在RegistryProtocol.export 的 doLocalExport 中,在开启了netty服务后,就是在zookeeper上注册服务节点了,消费者在消费服务时会根据消费的接口名找到对应的zookeeper节点目录,对目录进行监听,接收推送

 

//registry provider  
final Registry registry = getRegistry(originInvoker);  
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);  
//调用zodoRegister的doRegister 创建zookeeper的服务节点  
registry.register(registedProviderUrl);  
// 订阅override数据  
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。  
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);  
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);  
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);  
//订阅  
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);  
dubbo服务在zookeeper上的节点注册是:com.alibaba.dubbo.registry.support.FailbackRegistry#register
@Override  
    public void register(URL url) {  
        super.register(url);  
        failedRegistered.remove(url);  
        failedUnregistered.remove(url);  
        try {  
            // 向服务器端发送注册请求  
            doRegister(url);


因为doRegister是一个抽象的方法,查看他的实现可以看到:


微信图片_20220430222434.png

 

从上图可以看到doRegister实现有 dubbo、redis、zookeeper,这也是在我们配置时经常看到的 注册协议的配置 ,最为常用的就是 zookeeper了,所以再看ZookeeperRegistry的代码,看他的doRegistry干什么了如下


protected void doRegister(URL url) {  
       try {  
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));  
       } catch (Throwable e) {  
           throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);  
       }  
   }


其实从上面已经可以看到 在zookeeper上面创建 节点了,默认不分组的情况下,服务结构如下:

/dubbo/XXXXservice/consumers、providers


微信图片_20220430222437.png


至此,dubbo的暴露基本上已经完成,开启了netty服务,注册了zookeeper的节点,就等着消费者连接上来使用了。下一节将介绍dubbo的消息发送和接收,NIO异步通讯的实现。

 

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
7月前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
7月前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
159 0
|
2月前
|
存储 负载均衡 监控
dubbo学习一:zookeeper与dubbo的关系,下载安装启动zookeeper(解决启动中报错)
这篇文章是关于Apache Dubbo框架与Zookeeper的关系,以及如何下载、安装和启动Zookeeper的教程,包括解决启动过程中可能遇到的报错问题。
66 3
dubbo学习一:zookeeper与dubbo的关系,下载安装启动zookeeper(解决启动中报错)
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
|
2月前
|
监控 Dubbo Java
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
100 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
2月前
|
Dubbo IDE Java
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
这篇文章是关于如何下载和部署Dubbo管理控制台(dubbo-admin)的教程,并分析了2.6.1版本及以后版本的变化。
51 0
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
|
4月前
|
JSON Dubbo Java
【Dubbo协议指南】揭秘高性能服务通信,选择最佳协议的终极攻略!
【8月更文挑战第24天】在分布式服务架构中,Apache Dubbo作为一款高性能的Java RPC框架,支持多种通信协议,包括Dubbo协议、HTTP协议及Hessian协议等。Dubbo协议是默认选择,采用NIO异步通讯,适用于高要求的内部服务通信。HTTP协议通用性强,利于跨语言调用;Hessian协议则在数据传输效率上有优势。选择合适协议需综合考虑性能需求、序列化方式、网络环境及安全性等因素。通过合理配置,可实现服务性能最优化及系统可靠性提升。
60 3
|
4月前
|
缓存 Dubbo Java
Dubbo服务消费者启动与订阅原理
该文章主要介绍了Dubbo服务消费者启动与订阅的原理,包括服务消费者的启动时机、启动过程以及订阅和感知最新提供者信息的方式。
Dubbo服务消费者启动与订阅原理
|
4月前
|
Dubbo 网络协议 Java
深入掌握Dubbo服务提供者发布与注册原理
该文章主要介绍了Dubbo服务提供者发布与注册的原理,包括服务发布的流程、多协议发布、构建Invoker、注册到注册中心等过程。
深入掌握Dubbo服务提供者发布与注册原理
|
4月前
|
负载均衡 Dubbo Java
Dubbo服务Spi机制和原理
该文章主要介绍了Dubbo中的SPI(Service Provider Interface)机制和原理,包括SPI的基本概念、Dubbo中的SPI分类以及SPI机制的实现细节。
Dubbo服务Spi机制和原理