Dubbo-多线程通信原理

简介: 根据dubbo的源代码,讲述跨服务器场景下,多线程如何通信


本文代码摘录的时候,将一些与本流程无关的内容去掉了,如有需要请看源码。

如果大家对Dubbo RPC原理原理感兴趣,可以看我之前写过的另外一篇博客《 Dubbo RPC》。

 

一、 思考与目标

1. 思考

并发情况下,dubbo的RPC模型如下图所示:

b560b47f97fde73084638823a66e49f43506bc2c

如图所示,Consumer端可能同时有多个线程调用Provider的服务,此时Provider会启动多个线程来分别处理这些并发调用,处理完以后将数据返回。在这种多线程环境中,Dubbo是如何做到Consumer Thread A不会拿到其他线程的请求结果?

其实这是一个非常普遍的问题,例如我们的服务器与MQ服务器、数据库、缓存等等第三方服务器通信时,都需要确保并发环境下数据不会错乱,并且要尽可能降低服务器的性能损耗。

 

 

2. 目标

l   探究Dubbo RPC多线程通信原理。

l   写一个简单的示例,实现多线程数据交换

 

二、   Dubbo多线程通信原理

1. 流程图


8ad7d4ad5a6ec8a2ae2ff741a490eb351043df7c

如上图所示,消费端多线程并行消费服务的场景,主要流程如下

  • 获取DubboInvoker对象;
  • 将请求体信息封装在一个Request对象中,Request中会包括一个自增的id;然后将Request存到一个ConcurrentHashMap中(key=id,value= DefaultFuture)
  • 将request数据写入Channel,
  • Consumer Thread执行Defaultfuture#get()方法等待返回结果;
  • 服务提供方创建多线程处理用户请求,并将放回结果封装在Response中(包括Request#id)
  • 将response写入Channel
  • 消费方从Channel中收到数据以后,解析出id,从Map中解析出DefaultFuture
  • 唤醒Consumer Thread,返回结果;
  • DefaultFuture也会启动一个定时程序,检查在timeout内,结果是否返回,如果未返回,将DefaultFuture从map中移除,并抛出超时异常。

 

 

2. 多线程下的通信

DubboInvoker#doInvoke方法中,在ExchangeClient#request(inv, timeout)调用时,返回一个DefaultFuture对象,接着会调用DefaultFuture.get()方法(等待返回结果)。

对于consumer端而言,服务器会为每一个请求创建一个线程,因为rpc操作是一个慢动作,为了节省资源,当线程发送rpc请求后,需要让当前线程释放资源、进入等待队列,当获取到返回结果以后,再唤醒这个线程。

RPC请求的过程为:每一个RPC请求都有一个唯一的id,RPC请求的时候,会将此id也发送给provider;provider处理完请求后会将此id和返回结果一同返回给consumer;consumer收到返回信息以后解析出id,然后从FUTURES中找到相对应的DefaultFuture,并通过DefaultFuture.done#signal()唤醒之前等待线程。

下面根据源码详细讨论一下多线程情况下rpc请求的细节,即dubbo多线程模型的实现。

1) DefaultFuture#field

这里列出了与多线程相关的几个重要的属性

 private final Lock                            lock = new ReentrantLock();
    private final Condition                       done = lock.newCondition();
    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();


 

2)   DefaultFuture#构造函数

创建好DefaultFuture对象以后,将DefaultFuture存入了FUTURES中。其实每一次请求,多会生成一个唯一的id,即对于每个服务器而言,id唯一。

public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

 

3) DefaultFuture#get

主要逻辑是:获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,要么被唤醒;如果被唤醒,则返回rpc的结果。

  public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

 

4) DefaultFuture#received

收到返回结果时,调用此方法。首先从FUTURES中根据id获取DefaultFuture,如果不存在,打印一条日志;如果存在则通过signal释放一个唤醒信号,将线程从等待队列中唤醒。

    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at ")。
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
 
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }



5) DefaultFuture#received

以下代码是用来从FUTURES清理rpc请求超时的DefaultFuture

   private static class RemotingInvocationTimeoutScan implements Runnable {
        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }
    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

 

6) HeaderExchangeHandler#handleRequest

创建Response对象时,带上请求id

   Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
 
            String msg;
            if (data == null) msg = null;
            else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
 
            return res;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }


三、  示例

根据dubbo的实现方案,我写了一个简单的示例,以方便理解,见https://gitee.com/wuzhengfei/great-truthConcurrentRWTester代码

 

 

 

 

相关文章
|
20天前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
44 2
|
10天前
|
负载均衡 监控 Dubbo
Dubbo 原理和机制详解(非常全面)
本文详细解析了 Dubbo 的核心功能、组件、架构设计及调用流程,涵盖远程方法调用、智能容错、负载均衡、服务注册与发现等内容。欢迎留言交流。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Dubbo 原理和机制详解(非常全面)
|
14天前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
31 1
[Java]线程生命周期与线程通信
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
131 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
15天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
12 1
|
15天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
26 1
|
15天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
21 1
|
26天前
|
Java
|
2月前
|
Dubbo 应用服务中间件 Apache
Star 4w+,Apache Dubbo 3.3 全新发布,Triple X 领衔,开启微服务通信新时代
在 Apache Dubbo 突破 4w Star 之际,Apache Dubbo 团队正式宣布,Dubbo 3.3 正式发布!作为全球领先的开源微服务框架,Dubbo 一直致力于为开发者提供高性能、可扩展且灵活的分布式服务解决方案。此次发布的 Dubbo 3.3,通过 Triple X 的全新升级,突破了以往局限,实现了对南北向与东西向流量的全面支持,并提升了对云原生架构的友好性。
134 7
|
26天前
多线程通信和同步的方式有哪些?
【10月更文挑战第6天】
84 0