从源码全面解析 dubbo 服务端服务调用的来龙去脉

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 从源码全面解析 dubbo 服务端服务调用的来龙去脉

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、服务端调用流程

对于整个服务端调用来说,主要分为两部分:

  • 服务端启动时:封装的 HandlerFilter
  • 服务端调用时:经过 Handler,然后经过 Filter,最后调用目标方法

1、启动封装

我们启动的时候,会经过 Exporter exporter = protocolSPI.export(invoker) ,走 Dubbo SPI 的扩展机制

1.1 过滤器的封装

首先是我们的过滤器的封装:ProtocolFilterWrapper

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
    // buildInvokerChain:
    return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
    // 获取所有的过滤器
    filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
    // 如果当前的过滤器不为空
    if (!CollectionUtils.isEmpty(filters)) {
        // 将所有的过滤器封装成链表(last)的形式
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
        }
        // 将过滤器链表封装成CallbackRegistrationInvoker类型
        return new CallbackRegistrationInvoker<>(last, filters);
    } 
}

到这里,我们将过滤器封装成一个链表并且将其封装成 CallbackRegistrationInvoker 形式

我们直接跳到 DubboProtocol.export 中:

public <T> Exporter<T> export(Invoker<T> invoker){
    // 得到当前注册Zookeeper的URL
    URL url = invoker.getUrl();
    // 根据URL得到唯一的key:cn/com.common.service.IUserService:1.0.0.test:20883
    // 分组(group) + 接口(intfenerce) + 版本(1.0.0.test) + 端口号(20883)
    String key = serviceKey(url);
    // 
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
}
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
    // 将当前的过滤器放至exporterMap中,方便我们后面的获取
    exporterMap.put(key, this);
    // 打开服务
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

到这里,我们的过滤器就封装到了 exporterMap 里面,后面会用到,我们后面再聊

1.2 Hander的封装

在我们上面封装完过滤器之后,我们会进行 openServer 打开服务这个操作,该操作会进行 Handler 的封装并启动我们的 Netty 服务

这里的Handler 一共封装成下面的流程:

NettyServerhandler -> NettyServer -> MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler -> DecodeHandler  -> HeaderExchangeHandler -> ExchangeHandlerAdapter

最终会走到 NettyServerdoOpen 方法:这里对 Netty 不太清楚的,可以看博主的 Netty 源码文章:【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码

// 典型的Netty启动的流程
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = createBossGroup();
    workerGroup = createWorkerGroup();
    final NettyServerHandler nettyServerHandler = createNettyServerHandler();
    channels = nettyServerHandler.getChannels();
    // 初始化我们的服务端启动器
    initServerBootstrap(nettyServerHandler);
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
    boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
    bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
                }
                // 这里添加Netty的Handler责任链
                ch.pipeline()
                    // 解码器
                    .addLast("decoder", adapter.getDecoder())
                    // 编码器
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                    // 把上面封装的Handler放到Netty中,便于我们的调用执行
                    .addLast("handler", nettyServerHandler);
            }
        });
}

这里如果对 Netty 责任链不熟悉的可参考:【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

2、服务调用

我们上篇文章剖析了 消费端 是如何进行的服务调用:从源码全面解析 dubbo 消费端服务调用的来龙去脉

这篇我们来看下服务端是如何进行服务调用的

2.1 Handler调用

我们直接跳到 NettyServerHandlerchannelRead 的方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    // 主要看这个Handler做的事情
    handler.received(channel, msg);
    ctx.fireChannelRead(msg);
}

我们从上图看到,重点是这五个 Handler

  • MultiMessageHandler:处理多个数据包发送的消息,也称为“多包消息”
  • HeartbeatHandler:定期发送“心跳”消息,以确保连接仍然存在并响应正常
  • AllChannelHandler:管理所有通道的打开和关闭
  • DecodeHandler:将二进制数据解码为消息对象。
  • HeaderExchangeHandler:负责协议头部的交换和处理

我们挨个去看看他们实际的作用

2.1 MultiMessageHandler
  • 如果当前的请求是多个的话,需要进行切割成单个请求往下传递
  • 如果是单个请求的话,直接向下传递即可
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            try {
                handler.received(channel, obj);
            }
        }
    } else {
        handler.received(channel, message);
    }
}
2.2 HeartbeatHandler
  • 服务端:判断当前的请求是不是心跳请求,如果是心跳请求的话,发送心跳请求
  • 消费端:判断当前的请求是不是心跳请求,处理心跳请求
public void received(Channel channel, Object message) throws RemotingException {
    // 记录最近读取的时间
    setReadTimestamp(channel);
    // 判断当前的请求是不是心跳请求
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        if (req.isTwoWay()) {
            // 如果当前是同一个心跳检测则返回同一个响应
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(HEARTBEAT_EVENT);
            channel.send(res);
            if (logger.isDebugEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            }
        }
        return;
    }
    // 消费端使用:处理心跳响应
    if (isHeartbeatResponse(message)) {
        return;
    }
    handler.received(channel, message);
}
2.3 AllChannelHandler
  • 为每一个服务端的请求从线程池中分配一个线程执行
public void received(Channel channel, Object message) throws RemotingException {
    // 获取线程
    ExecutorService executor = getPreferredExecutorService(message);
    try {
        // 将当前的Handler丢进线程池里面执行
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }
}

2.4 DecodeHandler
  • 根据当前的响应请求进行解码操作
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    if (message instanceof Request) {
        decode(((Request) message).getData());
    }
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }
    handler.received(channel, message);
}
2.5 HeaderExchangeHandler
  • 调用我们的过滤器并等待数据的返回
  • 将数据通过 Channel 返回至客户端
public void received(Channel channel, Object message) throws RemotingException {
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    // 服务端:当前的消息是请求
    if (message instanceof Request) {
        Request request = (Request) message;
        if (request.isEvent()) {
            handlerEvent(channel, request);
        } else {
            // 进行请求的解析
            if (request.isTwoWay()) {
                handleRequest(exchangeChannel, request);
            } else {
                handler.received(exchangeChannel, request.getData());
            }
        }
    } else if (message instanceof Response) {
        handleResponse(channel, (Response) message);
    } else {
        handler.received(exchangeChannel, message);
    }
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // Request [id=17, version=2.0.2, twoWay=true, event=false, broken=false, data=RpcInvocation [methodName=getUserById, parameterTypes=[class java.lang.Long]]]
    Object msg = req.getData();
    // 执行过滤器的操作
    CompletionStage<Object> future = handler.reply(channel, msg);
    // 等待数据的返回
    future.whenComplete((appResult, t) -> {
        try {
            if (t == null) {
                res.setStatus(Response.OK);
                res.setResult(appResult);
            } else {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            // 没有问题的话,将我们的数据返回
            channel.send(res);
        } 
    });
}

服务端解码之后的消息:

2.2 过滤器调用

我们直接跳到 DubboProtocolreply 方法

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message){
    Invocation inv = (Invocation) message;
    // 得到过滤器的invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    // 执行过滤器
    Result result = invoker.invoke(inv);
    // 返回结果
    return result.thenApply(Function.identity());
}
Invoker<?> getInvoker(Channel channel, Invocation inv){
    // 根据组+版本号+接口确定唯一的key
    String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));
    // 得到过滤器(我们在上面进行过对应的添加)
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    // 返回过滤器
    return exporter.getInvoker();
}

这里的过滤器总共十个,这里不带大家细看每一个的源码了,了解意思即可

  • ContextFilter:负责将请求上下文传递给请求处理流程中的其他组件
  • ProfilerServerFilter:负责性能分析和监视
  • EchoFilter:将请求消息作为响应消息返回
  • ClassLoaderFilter:负责加载和管理类加载器
  • GenericFilter:提供一种通用的请求处理机制
  • ExceptionFilter:负责处理异常并生成错误响应消息
  • MonitorFilter:负责监视请求处理过程中的状态和信息
  • TimeoutFilter:负责处理请求处理超时
  • TraceFilter:跟踪请求处理流程中的各个阶段和信息
  • ClassLoaderCallbackFilter:提供了回调函数的机制
2.3 方法调用

最终我们会到 AbstractProxyInvokerinvoke 方法

public Result invoke(Invocation invocation){
    // 执行我们动态代理的方法
     Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
    // 等待返回的结果
     CompletableFuture<Object> future = wrapWithFuture(value, invocation);
    // 封装请求
    CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
        AppResponse result = new AppResponse(invocation);
        if (t != null) {
            if (t instanceof CompletionException) {
                result.setException(t.getCause());
            } else {
                result.setException(t);
            }
        } else {
            result.setValue(obj);
        }
        // 返回数据
        // AppResponse [value=User(id=2, name=天涯, age=12), exception=null]
        return result;
    });
    return new AsyncRpcResult(appResponseFuture, invocation);
}

这里可以参考这篇文章:从源码全面解析 dubbo 服务暴露的来龙去脉

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    try {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            // proxy:实现类
            // methodName:getUserById
            // parameterTypes:class java.lang.Long
            // arguments:2
            // 执行相对应的方法即可
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

三、流程图

高清图片可私信博主

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


相关文章
|
10天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
29天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
11天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
87 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
68 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
73 0
|
3月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
96 0

推荐镜像

更多