从源码全面解析 dubbo 服务端服务调用的来龙去脉

简介: 从源码全面解析 dubbo 服务端服务调用的来龙去脉

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、服务端调用流程

对于整个服务端调用来说,主要分为两部分:

  • 服务端启动时:封装的 HandlerFilter
  • 服务端调用时:经过 Handler,然后经过 Filter,最后调用目标方法

1、启动封装

我们启动的时候,会经过 Exporter exporter = protocolSPI.export(invoker) ,走 Dubbo SPI 的扩展机制

1.1 过滤器的封装

首先是我们的过滤器的封装:ProtocolFilterWrapper

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
    // buildInvokerChain:
    return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
    // 获取所有的过滤器
    filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
    // 如果当前的过滤器不为空
    if (!CollectionUtils.isEmpty(filters)) {
        // 将所有的过滤器封装成链表(last)的形式
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
        }
        // 将过滤器链表封装成CallbackRegistrationInvoker类型
        return new CallbackRegistrationInvoker<>(last, filters);
    } 
}

到这里,我们将过滤器封装成一个链表并且将其封装成 CallbackRegistrationInvoker 形式

我们直接跳到 DubboProtocol.export 中:

public <T> Exporter<T> export(Invoker<T> invoker){
    // 得到当前注册Zookeeper的URL
    URL url = invoker.getUrl();
    // 根据URL得到唯一的key:cn/com.common.service.IUserService:1.0.0.test:20883
    // 分组(group) + 接口(intfenerce) + 版本(1.0.0.test) + 端口号(20883)
    String key = serviceKey(url);
    // 
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
}
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
    // 将当前的过滤器放至exporterMap中,方便我们后面的获取
    exporterMap.put(key, this);
    // 打开服务
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

到这里,我们的过滤器就封装到了 exporterMap 里面,后面会用到,我们后面再聊

1.2 Hander的封装

在我们上面封装完过滤器之后,我们会进行 openServer 打开服务这个操作,该操作会进行 Handler 的封装并启动我们的 Netty 服务

这里的Handler 一共封装成下面的流程:

NettyServerhandler -> NettyServer -> MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler -> DecodeHandler  -> HeaderExchangeHandler -> ExchangeHandlerAdapter

最终会走到 NettyServerdoOpen 方法:这里对 Netty 不太清楚的,可以看博主的 Netty 源码文章:【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码

// 典型的Netty启动的流程
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = createBossGroup();
    workerGroup = createWorkerGroup();
    final NettyServerHandler nettyServerHandler = createNettyServerHandler();
    channels = nettyServerHandler.getChannels();
    // 初始化我们的服务端启动器
    initServerBootstrap(nettyServerHandler);
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
    boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
    bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
                }
                // 这里添加Netty的Handler责任链
                ch.pipeline()
                    // 解码器
                    .addLast("decoder", adapter.getDecoder())
                    // 编码器
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                    // 把上面封装的Handler放到Netty中,便于我们的调用执行
                    .addLast("handler", nettyServerHandler);
            }
        });
}

这里如果对 Netty 责任链不熟悉的可参考:【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

2、服务调用

我们上篇文章剖析了 消费端 是如何进行的服务调用:从源码全面解析 dubbo 消费端服务调用的来龙去脉

这篇我们来看下服务端是如何进行服务调用的

2.1 Handler调用

我们直接跳到 NettyServerHandlerchannelRead 的方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    // 主要看这个Handler做的事情
    handler.received(channel, msg);
    ctx.fireChannelRead(msg);
}

我们从上图看到,重点是这五个 Handler

  • MultiMessageHandler:处理多个数据包发送的消息,也称为“多包消息”
  • HeartbeatHandler:定期发送“心跳”消息,以确保连接仍然存在并响应正常
  • AllChannelHandler:管理所有通道的打开和关闭
  • DecodeHandler:将二进制数据解码为消息对象。
  • HeaderExchangeHandler:负责协议头部的交换和处理

我们挨个去看看他们实际的作用

2.1 MultiMessageHandler
  • 如果当前的请求是多个的话,需要进行切割成单个请求往下传递
  • 如果是单个请求的话,直接向下传递即可
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            try {
                handler.received(channel, obj);
            }
        }
    } else {
        handler.received(channel, message);
    }
}
2.2 HeartbeatHandler
  • 服务端:判断当前的请求是不是心跳请求,如果是心跳请求的话,发送心跳请求
  • 消费端:判断当前的请求是不是心跳请求,处理心跳请求
public void received(Channel channel, Object message) throws RemotingException {
    // 记录最近读取的时间
    setReadTimestamp(channel);
    // 判断当前的请求是不是心跳请求
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        if (req.isTwoWay()) {
            // 如果当前是同一个心跳检测则返回同一个响应
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(HEARTBEAT_EVENT);
            channel.send(res);
            if (logger.isDebugEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            }
        }
        return;
    }
    // 消费端使用:处理心跳响应
    if (isHeartbeatResponse(message)) {
        return;
    }
    handler.received(channel, message);
}
2.3 AllChannelHandler
  • 为每一个服务端的请求从线程池中分配一个线程执行
public void received(Channel channel, Object message) throws RemotingException {
    // 获取线程
    ExecutorService executor = getPreferredExecutorService(message);
    try {
        // 将当前的Handler丢进线程池里面执行
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }
}

2.4 DecodeHandler
  • 根据当前的响应请求进行解码操作
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    if (message instanceof Request) {
        decode(((Request) message).getData());
    }
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }
    handler.received(channel, message);
}
2.5 HeaderExchangeHandler
  • 调用我们的过滤器并等待数据的返回
  • 将数据通过 Channel 返回至客户端
public void received(Channel channel, Object message) throws RemotingException {
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    // 服务端:当前的消息是请求
    if (message instanceof Request) {
        Request request = (Request) message;
        if (request.isEvent()) {
            handlerEvent(channel, request);
        } else {
            // 进行请求的解析
            if (request.isTwoWay()) {
                handleRequest(exchangeChannel, request);
            } else {
                handler.received(exchangeChannel, request.getData());
            }
        }
    } else if (message instanceof Response) {
        handleResponse(channel, (Response) message);
    } else {
        handler.received(exchangeChannel, message);
    }
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // Request [id=17, version=2.0.2, twoWay=true, event=false, broken=false, data=RpcInvocation [methodName=getUserById, parameterTypes=[class java.lang.Long]]]
    Object msg = req.getData();
    // 执行过滤器的操作
    CompletionStage<Object> future = handler.reply(channel, msg);
    // 等待数据的返回
    future.whenComplete((appResult, t) -> {
        try {
            if (t == null) {
                res.setStatus(Response.OK);
                res.setResult(appResult);
            } else {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            // 没有问题的话,将我们的数据返回
            channel.send(res);
        } 
    });
}

服务端解码之后的消息:

2.2 过滤器调用

我们直接跳到 DubboProtocolreply 方法

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message){
    Invocation inv = (Invocation) message;
    // 得到过滤器的invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    // 执行过滤器
    Result result = invoker.invoke(inv);
    // 返回结果
    return result.thenApply(Function.identity());
}
Invoker<?> getInvoker(Channel channel, Invocation inv){
    // 根据组+版本号+接口确定唯一的key
    String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));
    // 得到过滤器(我们在上面进行过对应的添加)
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    // 返回过滤器
    return exporter.getInvoker();
}

这里的过滤器总共十个,这里不带大家细看每一个的源码了,了解意思即可

  • ContextFilter:负责将请求上下文传递给请求处理流程中的其他组件
  • ProfilerServerFilter:负责性能分析和监视
  • EchoFilter:将请求消息作为响应消息返回
  • ClassLoaderFilter:负责加载和管理类加载器
  • GenericFilter:提供一种通用的请求处理机制
  • ExceptionFilter:负责处理异常并生成错误响应消息
  • MonitorFilter:负责监视请求处理过程中的状态和信息
  • TimeoutFilter:负责处理请求处理超时
  • TraceFilter:跟踪请求处理流程中的各个阶段和信息
  • ClassLoaderCallbackFilter:提供了回调函数的机制
2.3 方法调用

最终我们会到 AbstractProxyInvokerinvoke 方法

public Result invoke(Invocation invocation){
    // 执行我们动态代理的方法
     Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
    // 等待返回的结果
     CompletableFuture<Object> future = wrapWithFuture(value, invocation);
    // 封装请求
    CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
        AppResponse result = new AppResponse(invocation);
        if (t != null) {
            if (t instanceof CompletionException) {
                result.setException(t.getCause());
            } else {
                result.setException(t);
            }
        } else {
            result.setValue(obj);
        }
        // 返回数据
        // AppResponse [value=User(id=2, name=天涯, age=12), exception=null]
        return result;
    });
    return new AsyncRpcResult(appResponseFuture, invocation);
}

这里可以参考这篇文章:从源码全面解析 dubbo 服务暴露的来龙去脉

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    try {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            // proxy:实现类
            // methodName:getUserById
            // parameterTypes:class java.lang.Long
            // arguments:2
            // 执行相对应的方法即可
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

三、流程图

高清图片可私信博主

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


相关文章
|
3天前
|
Java Android开发
Android12 双击power键启动相机源码解析
Android12 双击power键启动相机源码解析
12 0
|
4天前
|
分布式计算 Java API
Java8 Lambda实现源码解析
Java8的lambda应该大家都比较熟悉了,本文主要从源码层面探讨一下lambda的设计和实现。
|
5天前
|
算法 Java Go
ArrayList源码解析
ArrayList源码解析
9 1
|
5天前
|
存储 安全 Java
【HashMap源码解析(一)(佬你不来看看?)】
【HashMap源码解析(一)(佬你不来看看?)】
10 1
|
11天前
|
缓存 Java 开发者
10个点介绍SpringBoot3工作流程与核心组件源码解析
Spring Boot 是Java开发中100%会使用到的框架,开发者不仅要熟练使用,对其中的核心源码也要了解,正所谓知其然知其所以然,V 哥建议小伙伴们在学习的过程中,一定要去研读一下源码,这有助于你在开发中游刃有余。欢迎一起交流学习心得,一起成长。
|
15天前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
6月前
|
负载均衡 Dubbo 应用服务中间件
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
56 0
|
6月前
|
Dubbo Java 应用服务中间件
微服务技术系列教程(30) - Dubbo-SpringCloud与Dubbo区别
微服务技术系列教程(30) - Dubbo-SpringCloud与Dubbo区别
47 0
|
5月前
|
Dubbo Java 应用服务中间件
阿里巴巴资深架构师深度解析微服务架构设计之SpringCloud+Dubbo
软件架构是一个包含各种组织的系统组织,这些组件包括Web服务器,应用服务器,数据库,存储,通讯层),它们彼此或和环境存在关系。系统架构的目标是解决利益相关者的关注点。
|
13天前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo: 微服务通信的高效解决方案
【4月更文挑战第28天】在微服务架构的发展中,服务间的高效通信至关重要。Spring Cloud Dubbo 提供了一种基于 RPC 的通信方式,使得服务间的调用就像本地方法调用一样简单。本篇博客将探讨 Spring Cloud Dubbo 的核心概念,并通过具体实例展示其在项目中的实战应用。
15 2

推荐镜像

更多