dubbo - server的bind过程分析

简介: 开篇这篇文章主要是为了讲清楚dubbo server端在bind过程中整个调用链,之前在dubbo服务发布的流程中已经讲解过在dubbo的服务发布过程中底层最终是通过bind()方法来实现监听的。这篇文章会对bind的过程进行细化讲解,包括核心的Exchangers、HeaderExchanger、HeaderExchangeServer等类。

开篇

这篇文章主要是为了讲清楚dubbo server端在bind过程中整个调用链,之前在dubbo服务发布的流程中已经讲解过在dubbo的服务发布过程中底层最终是通过bind()方法来实现监听的。
这篇文章会对bind的过程进行细化讲解,包括核心的Exchangers、HeaderExchanger、HeaderExchangeServer等类。


调用链

image

说明:

  • 重点关注类Exchangers、HeaderExchanger、HeaderExchangeServer、Transporters、NettyTransporter、NettyServer。

DubboProtocol

public class DubboProtocol extends AbstractProtocol {

   public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 省略相关代码
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

    private ExchangeServer createServer(URL url) {
        // 省略相关代码
        ExchangeServer server;
        try {
            // 核心的代码绑定
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
}

说明:

  • DubboProtocol类的createServer()方法调用Exchangers.bind()进入Exchangers类。


Exchangers & HeaderExchanger

public class Exchangers {

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");

        // 绑定过程的入口
        return getExchanger(url).bind(url, handler);
    }


    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    // header=com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
}



public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

说明:

  • Exchangers的bind()方法内部调用getExchanger(url).bind()。
  • getExchanger()方法内部返回HeaderExchanger对象,涉及SPI机制。
  • getExchanger(url).bind()执行HeaderExchanger的bind()方法。
  • HeaderExchanger的bind内部返回HeaderExchangeServer对象。
  • HeaderExchangeServer类的构造函数参数是Transporters.bind()返回的server值。
  • Transporters.bind()方法返回NettyServer对象。


HeaderExchangeServer

public class HeaderExchangeServer implements ExchangeServer {

    private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
            new NamedThreadFactory(
                    "dubbo-remoting-server-heartbeat",
                    true));
    private final Server server;
    // heartbeat timer
    private ScheduledFuture<?> heatbeatTimer;
    // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
    private int heartbeat;
    private int heartbeatTimeout;
    private AtomicBoolean closed = new AtomicBoolean(false);

    public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
    }

    public void reset(URL url) {
        server.reset(url);
        try {
            if (url.hasParameter(Constants.HEARTBEAT_KEY)
                    || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
                int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
                int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
                if (t < h * 2) {
                    throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
                }
                if (h != heartbeat || t != heartbeatTimeout) {
                    heartbeat = h;
                    heartbeatTimeout = t;
                    startHeatbeatTimer();
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }

    @Deprecated
    public void reset(com.alibaba.dubbo.common.Parameters parameters) {
        reset(getUrl().addParameters(parameters.getParameters()));
    }

    public void send(Object message) throws RemotingException {
        if (closed.get()) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
        }
        server.send(message);
    }

    public void send(Object message, boolean sent) throws RemotingException {
        if (closed.get()) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
        }
        server.send(message, sent);
    }
}

说明:

  • HeaderExchangeServer类包含变量Server server,server为NettyServer对象。
  • HeaderExchangeServer对相当于对Server类进行了一层分装,所有对Server层的操作都通过HeaderExchangeServer进行封装并对外提供服务。


Transporters

public class Transporters {

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }

        return getTransporter().bind(url, handler);
    }

    // netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
    // netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
    // mina=com.alibaba.dubbo.remoting.transport.mina.MinaTransporter
    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
}

说明:

  • Transporters.bind()方法通过getTransporter()返回NettyTransporter对象,涉及SPI机制。
  • getTransporter().bind()方法执行到NettyTransporter.bind()方法,返回NettyServer对象。


NettyTransporter

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}



public class NettyServer extends AbstractServer implements Server {

    private Map<String, Channel> channels; // <ip:port, channel>

    private ServerBootstrap bootstrap;

    private org.jboss.netty.channel.Channel channel;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

    @Override
    protected void doOpen() throws Throwable {

        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();

                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

    @Override
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    public Collection<Channel> getChannels() {
        Collection<Channel> chs = new HashSet<Channel>();
        for (Channel channel : this.channels.values()) {
            if (channel.isConnected()) {
                chs.add(channel);
            } else {
                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
            }
        }
        return chs;
    }

    public Channel getChannel(InetSocketAddress remoteAddress) {
        return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    public boolean isBound() {
        return channel.isBound();
    }

}

说明:

  • NettyTransporter的bind()返回NettyServer对象。
  • NettyTransporter的connect()返回NettyClient对象。
  • NettyServer内部绑定的流程都是Netty相关的服务。


GrizzlyTransporter

public class GrizzlyTransporter implements Transporter {

    public static final String NAME = "grizzly";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new GrizzlyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new GrizzlyClient(url, listener);
    }

}



public class GrizzlyServer extends AbstractServer {

    private static final Logger logger = LoggerFactory.getLogger(GrizzlyServer.class);

    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

    private TCPNIOTransport transport;

    public GrizzlyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    }

    @Override
    protected void doOpen() throws Throwable {
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add(new TransportFilter());

        filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
        filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
        TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
        ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
        config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
        String threadpool = getUrl().getParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_THREADPOOL);
        if (Constants.DEFAULT_THREADPOOL.equals(threadpool)) {
            int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            config.setCorePoolSize(threads).setMaxPoolSize(threads)
                    .setKeepAliveTime(0L, TimeUnit.SECONDS);
        } else if ("cached".equals(threadpool)) {
            int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            config.setCorePoolSize(0).setMaxPoolSize(threads)
                    .setKeepAliveTime(60L, TimeUnit.SECONDS);
        } else {
            throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
        }
        builder.setKeepAlive(true).setReuseAddress(false)
                .setIOStrategy(SameThreadIOStrategy.getInstance());
        transport = builder.build();
        transport.setProcessor(filterChainBuilder.build());
        transport.bind(getBindAddress());
        transport.start();
    }

    @Override
    protected void doClose() throws Throwable {
        try {
            transport.stop();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    public boolean isBound() {
        return !transport.isStopped();
    }

    public Collection<Channel> getChannels() {
        return channels.values();
    }

    public Channel getChannel(InetSocketAddress remoteAddress) {
        return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    @Override
    public void connected(Channel ch) throws RemotingException {
        channels.put(NetUtils.toAddressString(ch.getRemoteAddress()), ch);
        super.connected(ch);
    }

    @Override
    public void disconnected(Channel ch) throws RemotingException {
        channels.remove(NetUtils.toAddressString(ch.getRemoteAddress()));
        super.disconnected(ch);
    }

}

说明:

  • GrizzlyTransporter和NettyTransporter对象的实现逻辑是一致的。



image



Transport 依赖图



Server 实现类图



Client 实现类图

目录
相关文章
|
2月前
|
Dubbo Cloud Native 网络协议
【Dubbo3技术专题】「服务架构体系」第一章之Dubbo3新特性要点之RPC协议分析介绍
【Dubbo3技术专题】「服务架构体系」第一章之Dubbo3新特性要点之RPC协议分析介绍
55 1
|
8月前
|
负载均衡 Dubbo 应用服务中间件
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
65 0
|
Dubbo Java 应用服务中间件
Dubbo-线程池调优实战分析
Dubbo-线程池调优实战分析
871 0
|
2月前
|
负载均衡 Dubbo 算法
深入理解Dubbo-2.Dubbo功能之高级特性分析
深入理解Dubbo-2.Dubbo功能之高级特性分析
69 0
|
2月前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
121 0
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo的原理分析
Dubbo的原理分析
53 0
|
8月前
|
缓存 Dubbo Java
Dubbo的优雅下线原理分析
在线上环境,用到更多的,是kill pid指令,这个指令,等同于kill -15 pid指令,因此,当你在网上看到一些介绍kill -15 pid指令时,不用纠结好像没用到过,其实,就是你用到最多的kill pid指令。使用这个指令时,系统会对pid进程发送一个SIGTERM信号,就像给pid打了一个电话,告诉他,你的房子就要到期了,麻烦快点清理好东西搬走。这时,你仍有充裕的时间,把自己的东西打包好,好好清理下房间,没问题了,再搬出去。
36 0
|
11月前
|
负载均衡 监控 Dubbo
Dubbo底层原理分析和分布式实际应用
Dubbo底层原理分析和分布式实际应用
|
10月前
|
存储 Dubbo Java
dubbo 源码 v2.7 分析:通信过程及序列化协议
前面我们介绍了dubbo的核心机制,今天将开始分析远程调用流程。毕竟,作为一个rpc框架,远程调用是理论的核心内容。通过对dubbo相关实现的探究,深入了解rpc原理及可能的问题。
150 0
|
10月前
|
负载均衡 Dubbo Java
dubbo源码v2.7分析:结构、container入口及线程模型
Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架,提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现,高度可扩展能力,运行期流量调度,可视化的服务治理与运维。
74 0