dubbo协议下的单一长连接与多线程并发如何协同工作

简介: dubbo协议下的单一长连接与多线程并发如何协同工作

上班的路上突然就冒出了这么个问题:既然在dubbo中描述消费者和提供者之间采用的是单一长连接,那么如果消费者端是高并发多线程模型的web应用,单一长连接如何解决多线程并发请求问题呢?

其实如果不太了解socket或者多线程编程的相关知识,不太容易理解这个问题。传统的最简单的RPC方式,应该是为每次远程调用请求创建一个对应的线程,我们先不说这种方式的缺点。至少优点很明显,就是简单。简单体现在哪儿?

通信双方一对一(相比NIO来说)。

通俗点来说,socket通信的双方发送和接受数据不会被其它(线程)干扰,这种干扰不同于数数据包的“粘包问题”。其实说白了就相当于电话线路的场景:

试想一下如果多个人同时对着同一个话筒大喊,对方接受到的声音就会是重叠且杂乱的。

对于单一的socket通道来说,如果发送方多线程的话,不加控制就会导致通道中的数据乱七八糟,接收端无法区分数据的单位,也就无法正确的处理请求。

乍一看,似乎dubbo协议所说的单一长连接与客户端多线程并发请求之间,是水火不容的。但其实稍加设计,就可以让它们和谐相处。

socket中的粘包问题是怎么解决的?用的最多的其实是定义一个定长的数据包头,其中包含了完整数据包的长度,以此来完成服务器端拆包工作。

那么解决多线程使用单一长连接并发请求时包干扰的方法也有点雷同,就是给包头中添加一个标识id,服务器端响应请求时也要携带这个id,供客户端多线程领取对应的响应数据提供线索。

其实如果不考虑性能的话,dubbo完全也可以为每个客户端线程创建一个对应的服务器端线程,但这是海量高并发场景所不能接受的~~

那么脑补一张图:
image.png

下面咱们试图从代码中找到痕迹。

一路追踪,我们来到这个类:com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.java,先来看看其中的request方法,大概在第101行左右:

 public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);

    //这个future就是前面我们提到的:客户端并发请求线程阻塞的对象
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        channel.send(req);  //非阻塞调用
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

注意这个方法返回的ResponseFuture对象,当前处理客户端请求的线程在经过一系列调用后,会拿到ResponseFuture对象,最终该线程会阻塞在这个对象的下面这个方法调用上,如下:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (! isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (! isDone()) {    //无限连
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

上面我已经看到请求线程已经阻塞,那么又是如何被唤醒的呢?再看一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.java,其实所有实现了ChannelHandler接口的类都被设计为装饰器模式,所以你可以看到类似这样的代码:

 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(
            new HeartbeatHandler(
                    ExtensionLoader.getExtensionLoader(Dispather.class).getAdaptiveExtension().dispath(handler, url)
            ));
}

现在来仔细看一下HeaderExchangeHandler类的定义,先看一下它定义的received方法,下面是代码片段:

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
          .....
        } else if (message instanceof Response) {   
            //这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
           .....
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

我们主要看中间的那个条件分支,它是用来处理响应消息的,也就是说当dubbo客户端接收到来自服务端的响应后会执行到这个分支,它简单的调用了handleResponse方法,我们追过去看看:

static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {  //排除心跳类型的响应
        DefaultFuture.received(channel, response);
    }
}

熟悉的身影:DefaultFuture,它是实现了我们上面说的ResponseFuture接口类型,实际上细心的童鞋应该可以看到,上面request方法中其实实例化的就是这个DefaultFutrue对象:

DefaultFuture future = new DefaultFuture(channel, req, timeout);

那么我们可以继续来看一下DefaultFuture.received方法的实现细节:

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

留一下我们之前提到的id的作用,这里可以看到它已经开始发挥作用了。通过idDefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,它就是上面我们提到的,阻塞请求线程的那个对象。好,找到目标后,调用它的doReceived方法,这就是标准的java多线程编程知识了:

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

这样我们就可以证实上图中左边的绿色箭头所标注的两点。


相关文章
|
1月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
150 0
|
20天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
127 59
|
11天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
lua面向对象(类)和lua协同线程与协同函数、Lua文件I/O
Lua的面向对象编程、协同线程与协同函数的概念和使用,以及Lua文件I/O操作的基本方法。
28 4
lua面向对象(类)和lua协同线程与协同函数、Lua文件I/O
|
1月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
26 1
|
2月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
2月前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
57 0
|
2月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
3月前
|
Rust 并行计算 安全
揭秘Rust并发奇技!线程与消息传递背后的秘密,让程序性能飙升的终极奥义!
【8月更文挑战第31天】Rust 以其安全性和高性能著称,其并发模型在现代软件开发中至关重要。通过 `std::thread` 模块,Rust 支持高效的线程管理和数据共享,同时确保内存和线程安全。本文探讨 Rust 的线程与消息传递机制,并通过示例代码展示其应用。例如,使用 `Mutex` 实现线程同步,通过通道(channel)实现线程间安全通信。Rust 的并发模型结合了线程和消息传递的优势,确保了高效且安全的并行执行,适用于高性能和高并发场景。
59 0
|
3月前
|
C# 开发者 Windows
勇敢迈出第一步:手把手教你如何在WPF开源项目中贡献你的第一行代码,从选择项目到提交PR的全过程解析与实战技巧分享
【8月更文挑战第31天】本文指导您如何在Windows Presentation Foundation(WPF)相关的开源项目中贡献代码。无论您是初学者还是有经验的开发者,参与这类项目都能加深对WPF框架的理解并拓展职业履历。文章推荐了一些适合入门的项目如MvvmLight和MahApps.Metro,并详细介绍了从选择项目、设置开发环境到提交代码的全过程。通过具体示例,如添加按钮点击事件处理程序,帮助您迈出第一步。此外,还强调了提交Pull Request时保持专业沟通的重要性。参与开源不仅能提升技能,还能促进社区交流。
44 0