Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题

现在, Java 的各种基于 Reactor 模型的响应式编程库或者框架越来越多了,像是 RxJava,Project Reactor,Vert.x 等等等等。在 Java 9, Java 也引入了自己的 响应式编程的一种标准接口,即java.util.concurrent.Flow这个类。这个类里面规定了 Java 响应式编程所要实现的接口与抽象。我们这个系列要讨论的就是Project Reactor这个实现。


这里也提一下,为了能对于没有升级到 Java 9 的用户也能兼容,java.util.concurrent.Flow这个类也被放入了一个 jar 供 Java 9 之前的版本,依赖是:


<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.3</version>
</dependency>

本系列所讲述的 Project Reactor 就是 reactive-streams 的一种实现。 首先,我们先来了解下,什么是响应式编程,Java 如何实现


什么是响应式编程,Java 如何实现


我们这里用通过唯一 id 获取知乎的某个回答作为例子,首先我们先明确下,一次HTTP请求到服务器上处理完之后,将响应写回这次请求的连接,就是完成这次请求了,如下:

public void request(Connection connection, HttpRequest request) {
    //处理request,省略代码
    connection.write(response);//完成响应
}


假设获取回答需要调用两个接口,获取评论数量还有获取回答信息,传统的代码可能会这么去写:

//获取评论数量
public void getCommentCount(Connection connection, HttpRequest request) {
    Integer commentCount = null;
    try {
        //从缓存获取评论数量,阻塞IO
        commentCount = getCommnetCountFromCache(id);
    } catch(Exception e) {
        try {
            //缓存获取失败就从数据库中获取,阻塞IO
            commentCount = getVoteCountFromDB(id);
        } catch(Exception ex) {
        }
    }
    connection.write(commentCount);
}
//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
    //获取点赞数量
    Integer voteCount = null;
    try {
        //从缓存获取点赞数量,阻塞IO
        voteCount = getVoteCountFromCache(id);
    } catch(Exception e) {
        try {
            //缓存获取失败就从数据库中获取,阻塞IO
            voteCount = getVoteCountFromDB(id);
        } catch(Exception ex) {
        }
    }
    //从数据库获取回答信息,阻塞IO
    Answer answer = getAnswerFromDB(id);
    //拼装Response
    ResultVO response = new ResultVO();
    if (voteCount != null) {
        response.setVoteCount(voteCount);
    }
    if (answer != null) {
        response.setAnswer(answer);
    }
    connection.write(response);//完成响应
}


在这种实现下,你的进程只需要一个线程池,承载了所有请求。这种实现下,有两个弊端:

  1. 线程池 IO 阻塞,导致某个存储变慢或者缓存击穿的话,所有服务都堵住了。假设现在评论缓存突然挂了,全都访问数据库,导致请求变慢。由于线程需要等待 IO 响应,导致唯一一个线程池被堆满,无法处理获取回答的请求。
  2. 对于获取回答信息,获取点赞数量其实和获取回答信息是可以并发进行的。不用非得先获取点赞数量之后再获取回答信息。


现在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我们可以通过响应式编程,来让我们的线程不会阻塞,而是一直在处理请求。这是如何实现的呢?


传统的 BIO,是线程将数据写入 Connection 之后,当前线程进入 Block 状态,直到响应返回,之后接着做响应返回后的动作。NIO 则是线程将数据写入 Connection 之后,将响应返回后需要做的事情以及参数缓存到一个地方之后,直接返回。在有响应返回后,NIO 的 Selector 的 Read 事件会是 Ready 状态,扫描 Selector 事件的线程,会告诉你的线程池数据好了,然后线程池中的某个线程,拿出刚刚缓存的要做的事情还有参数,继续处理。


那么,怎样实现缓存响应返回后需要做的事情以及参数的呢?Java 本身提供了两种接口,一个是基于回调的 Callback 接口(Java 8 引入的各种Functional Interface),一种是 Future 框架。


基于 Callback 的实现:

//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
    ResultVO resultVO = new ResultVO();
    getVoteCountFromCache(id, (count, throwable) -> {
        //异常不为null则为获取失败
        if (throwable != null) {
            //读取缓存失败就从数据库获取
            getVoteCountFromDB(id, (count2, throwable2) -> {
                if (throwable2 == null) {
                    resultVO.setVoteCount(voteCount);
                }
                //从数据库读取回答信息
                getAnswerFromDB(id, (answer, throwable3) -> {
                    if (throwable3 == null) {
                        resultVO.setAnswer(answer);
                        connection.write(resultVO);
                    } else {
                        connection.write(throwable3);
                    }
                });
            });
        } else {
            //获取成功,设置voteCount
            resultVO.setVoteCount(voteCount);
            //从数据库读取回答信息
            getAnswerFromDB(id, (answer, throwable2) -> {
                if (throwable2 == null) {
                    resultVO.setAnswer(answer);
                    //返回响应
                    connection.write(resultVO);
                } else {
                    //返回错误响应
                    connection.write(throwable2);
                }
            });
        }
    });
}


可以看出,随着调用层级的加深,callback 层级越来越深,越来越难写,而且啰嗦的代码很多。并且,基于 CallBack 想实现获取点赞数量其实和获取回答信息并发是很难写的,这里还是先获取点赞数量之后再获取回答信息。


那么基于 Future 呢?我们用 Java 8 之后引入的 CompletableFuture 来试着实现下。

//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
    ResultVO resultVO = new ResultVO();
        //所有的异步任务都执行完之后要做的事情
        CompletableFuture.allOf(
                getVoteCountFromCache(id)
                        //发生异常,从数据库读取
                        .exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))
                        //读取完之后,设置VoteCount
                        .thenAccept(voteCount -> {
                    resultVO.setVoteCount(voteCount);
                }),
                getAnswerFromDB(id).thenAccept(answer -> {
                    resultVO.setAnswer(answer);
                })
        ).exceptionallyAsync(throwable -> {
            connection.write(throwable);
        }).thenRun(() -> {
            connection.write(resultVO);
        });
}

这种实现就看上去简单多了,并且读取点赞数量还有读取回答内容是同时进行的。 Project Reactor 在 Completableuture 这种实现的基础上,增加了更多的组合方式以及更完善的异常处理机制,以及面对背压时候的处理机制,还有重试机制


响应式编程里面遇到的问题 - 背压


由于响应式编程,不阻塞,所以把之前因为基本不会发生而忽视的一个问题带了上来,就是背压(Back Pressure)。

背压是指,当上游请求过多,下游服务来不及响应,导致 Buffer 溢出的这样一个问题。在响应式编程,由于线程不阻塞,遇到 IO 就会把当前参数和要做的事情缓存起来,这样无疑增大了很多吞吐量,同时内存占用也大了起来,如果不限制的话,很可能 OutOfMemory,这就是背压问题。

在这个问题上,Project Reactor 基于的模型,是有处理方式的,Completableuture 这个体系里面没有。


为何现在响应式编程在业务开发微服务开发不普及


主要因为数据库 IO,不是 NIO。

不论是Java自带的Future框架,还是 Spring WebFlux,还是 Vert.x,他们都是一种非阻塞的基于Ractor模型的框架(后两个框架都是利用netty实现)。

在阻塞编程模式里,任何一个请求,都需要一个线程去处理,如果io阻塞了,那么这个线程也会阻塞在那。但是在非阻塞编程里面,基于响应式的编程,线程不会被阻塞,还可以处理其他请求。举一个简单例子:假设只有一个线程池,请求来的时候,线程池处理,需要读取数据库 IO,这个 IO 是 NIO 非阻塞 IO,那么就将请求数据写入数据库连接,直接返回。之后数据库返回数据,这个链接的 Selector 会有 Read 事件准备就绪,这时候,再通过这个线程池去读取数据处理(相当于回调),这时候用的线程和之前不一定是同一个线程。这样的话,线程就不用等待数据库返回,而是直接处理其他请求。这样情况下,即使某个业务 SQL 的执行时间长,也不会影响其他业务的执行。


但是,这一切的基础,是 IO 必须是非阻塞 IO,也就是 NIO(或者 AIO)。官方JDBC没有 NIO,只有 BIO 实现。这样无法让线程将请求写入链接之后直接返回,必须等待响应。但是也就解决方案,就是通过其他线程池,专门处理数据库请求并等待返回进行回调,也就是业务线程池 A 将数据库 BIO 请求交给线程池B处理,读取完数据之后,再交给 A 执行剩下的业务逻辑。这样A也不用阻塞,可以处理其他请求。但是,这样还是有因为某个业务 SQL 的执行时间长,导致B所有线程被阻塞住队列也满了从而A的请求也被阻塞的情况,这是不完美的实现。真正完美的,需要 JDBC 实现 NIO。


Java 自带的 Future框架可以这么用JDBC:

@GetMapping
public DeferredResult<Result> get() {
DeferredResult<Result> deferredResult = new DeferredResult<>();
CompletableFuture.supplyAsync(() -> {
            return 阻塞数据库IO;
        //dbThreadPool用来处理阻塞的数据库IO
        }, dbThreadPool).thenComposeAsync(result -> {
    //spring 的 DeferredResult 来实现异步回调写入结果返回
    deferredResult.setResult(result);
});
return deferredResult;
}


WebFlux 也可以使用阻塞JDBC,但是同理:

@GetMapping
public Mono<Result> get() {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
            return 阻塞数据库IO;
        //dbThreadPool用来处理阻塞的数据库IO
        }, dbThreadPool));
}

Vert.x 也可以使用阻塞的JDBC,也是同理:

@GetMapping
public  DeferredResult<Result> get() {
DeferredResult<Result> deferredResult = new DeferredResult<>();
getResultFromDB().setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                deferredResult.setResult(asyncResult.result());
            } else {
                deferredResult.setErrorResult(asyncResult.cause());
            }
        });
return deferredResult;
}
private WorkerExecutor dbThreadPool = vertx.createSharedWorkerExecutor("DB", 16);
private Future<Result> getResultFromDB() {
    Future<Result> result = Future.future();
    dbThreadPool.executeBlocking(future -> {
            return 阻塞数据库IO;
        }, false, asyncResult -> {
            if (asyncResult.succeeded()) {
                result.complete(asyncResult.result());
            } else {
                result.fail(asyncResult.cause());
            }
        });
    return result;
}

相当于通过另外的线程池(当然也可以通过原有线程池,反正就是要用和请求不一样的线程,才能实现回调,而不是当次就阻塞等待),封装了阻塞 JDBC IO。

但是,这样几乎对数据库IO主导的应用性能没有提升,还增加了线程切换,得不偿失。所以,需要使用真正实现了 NIO 的数据库客户端。目前有这些 NIO 的 JDBC 客户端,但是都不普及:

  1. Vert.x 客户端:https://vertx.io/docs/vertx-jdbc-client/java/
  2. r2jdbc 客户端:http://r2dbc.io/
  3. Jasync-sql 客户端:https://github.com/jasync-sql/jasync-sql
相关文章
|
6月前
|
Java 调度 Android开发
深入解析Android应用开发中的响应式编程与RxJava应用
在现代Android应用开发中,响应式编程及其核心框架RxJava正逐渐成为开发者的首选。本文将深入探讨响应式编程的基本概念、RxJava的核心特性以及如何在Android应用中利用RxJava提升代码的可读性和性能。 【7月更文挑战第7天】
52 1
|
机器学习/深度学习 分布式计算 数据处理
Reactor模型深度解析
Reactor模型深度解析
271 0
|
Arthas 缓存 测试技术
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(上)
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(上)
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(上)
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(下)
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(下)
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
96 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
89 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
69 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
75 0
|
17天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
17天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析

推荐镜像

更多