什么是响应式编程,Java 如何实现

简介: 什么是响应式编程,Java 如何实现

我们这里用通过唯一 id 获取知乎的某个回答作为例子,首先我们先明确下,一次HTTP请求到服务器上处理完之后,将响应写回这次请求的连接,就是完成这次请求了,如下:

public void request(Connection connection, HttpRequest request) {
    //处理request,省略代码
    connection.write(response);//完成响应
}

假设获取回答需要调用两个接口,获取评论数量还有获取回答信息,传统的代码可能会这么去写:

//获取评论数量
public void getCommentCount(Connection connection, HttpRequest request) {
    Integer commentCount = null;
    try {
        //从缓存获取评论数量,阻塞IO
        commentCount = getCommnetCountFromCache(id);
    } catch(Exception e) {
        try {
            //缓存获取失败就从数据库中获取,阻塞IO
            commentCount = getVoteCountFromDB(id);
        } catch(Exception ex) {
        }
    }
    connection.write(commentCount);
}
//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
    //获取点赞数量
    Integer voteCount = null;
    try {
        //从缓存获取点赞数量,阻塞IO
        voteCount = getVoteCountFromCache(id);
    } catch(Exception e) {
        try {
            //缓存获取失败就从数据库中获取,阻塞IO
            voteCount = getVoteCountFromDB(id);
        } catch(Exception ex) {
        }
    }
    //从数据库获取回答信息,阻塞IO
    Answer answer = getAnswerFromDB(id);
    //拼装Response
    ResultVO response = new ResultVO();
    if (voteCount != null) {
        response.setVoteCount(voteCount);
    }
    if (answer != null) {
        response.setAnswer(answer);
    }
    connection.write(response);//完成响应
}

在这种实现下,你的进程只需要一个线程池,承载了所有请求。这种实现下,有两个弊端:

  1. 线程池 IO 阻塞,导致某个存储变慢或者缓存击穿的话,所有服务都堵住了。假设现在评论缓存突然挂了,全都访问数据库,导致请求变慢。由于线程需要等待 IO 响应,导致唯一一个线程池被堆满,无法处理获取回答的请求。
  2. 对于获取回答信息,获取点赞数量其实和获取回答信息是可以并发进行的。不用非得先获取点赞数量之后再获取回答信息。

现在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我们可以通过响应式编程,来让我们的线程不会阻塞,而是一直在处理请求。这是如何实现的呢?

传统的 BIO,是线程将数据写入 Connection 之后,当前线程进入 Block 状态,直到响应返回,之后接着做响应返回后的动作。NIO 则是线程将数据写入 Connection 之后,将响应返回后需要做的事情以及参数缓存到一个地方之后,直接返回。在有响应返回后,NIO 的 Selector 的 Read 事件会是 Ready 状态,扫描 Selector 事件的线程,会告诉你的线程池数据好了,然后线程池中的某个线程,拿出刚刚缓存的要做的事情还有参数,继续处理。

那么,怎样实现缓存响应返回后需要做的事情以及参数的呢?Java 本身提供了两种接口,一个是基于回调的 Callback 接口(Java 8 引入的各种Functional Interface),一种是 Future 框架。

基于 Callback 的实现:

//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
    ResultVO resultVO = new ResultVO();
    getVoteCountFromCache(id, (count, throwable) -> {
        //异常不为null则为获取失败
        if (throwable != null) {
            //读取缓存失败就从数据库获取
            getVoteCountFromDB(id, (count2, throwable2) -> {
                if (throwable2 == null) {
                    resultVO.setVoteCount(voteCount);
                }
                //从数据库读取回答信息
                getAnswerFromDB(id, (answer, throwable3) -> {
                    if (throwable3 == null) {
                        resultVO.setAnswer(answer);
                        connection.write(resultVO);
                    } else {
                        connection.write(throwable3);
                    }
                });
            });
        } else {
            //获取成功,设置voteCount
            resultVO.setVoteCount(voteCount);
            //从数据库读取回答信息
            getAnswerFromDB(id, (answer, throwable2) -> {
                if (throwable2 == null) {
                    resultVO.setAnswer(answer);
                    //返回响应
                    connection.write(resultVO);
                } else {
                    //返回错误响应
                    connection.write(throwable2);
                }
            });
        }
    });
}

可以看出,随着调用层级的加深,callback 层级越来越深,越来越难写,而且啰嗦的代码很多。并且,基于 CallBack 想实现获取点赞数量其实和获取回答信息并发是很难写的,这里还是先获取点赞数量之后再获取回答信息。

那么基于 Future 呢?我们用 Java 8 之后引入的 CompletableFuture 来试着实现下。

//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
    ResultVO resultVO = new ResultVO();
        //所有的异步任务都执行完之后要做的事情
        CompletableFuture.allOf(
                getVoteCountFromCache(id)
                        //发生异常,从数据库读取
                        .exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))
                        //读取完之后,设置VoteCount
                        .thenAccept(voteCount -> {
                    resultVO.setVoteCount(voteCount);
                }),
                getAnswerFromDB(id).thenAccept(answer -> {
                    resultVO.setAnswer(answer);
                })
        ).exceptionallyAsync(throwable -> {
            connection.write(throwable);
        }).thenRun(() -> {
            connection.write(resultVO);
        });
}

这种实现就看上去简单多了,并且读取点赞数量还有读取回答内容是同时进行的。 Project Reactor 在 Completableuture 这种实现的基础上,增加了更多的组合方式以及更完善的异常处理机制,以及面对背压时候的处理机制,还有重试机制

相关文章
|
9月前
|
缓存 Java 调度
Java 响应式编程 Reactor 框架
Java 响应式编程 Reactor 框架
513 0
|
Java API
java9 响应式编程支持
java9中的响应式编程
2078 0
|
前端开发 Java API
【Java开发者专场】阿里专家杜万:Java响应式编程,一文全面解读
响应式宣言如何解读,Java中如何进行响应式编程,Reactor Streams又该如何使用?热衷于整合框架与开发工具的阿里云技术专家杜万,为大家全面解读响应式编程,分享Spring Webflux的实践。
3892 0
|
3天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
20 0
|
1天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
5 0
|
2天前
|
并行计算 算法 安全
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
|
2天前
|
安全 Java 编译器
是时候来唠一唠synchronized关键字了,Java多线程的必问考点!
本文简要介绍了Java中的`synchronized`关键字,它是用于保证多线程环境下的同步,解决原子性、可见性和顺序性问题。从JDK1.6开始,synchronized进行了优化,性能得到提升,现在仍可在项目中使用。synchronized有三种用法:修饰实例方法、静态方法和代码块。文章还讨论了synchronized修饰代码块的锁对象、静态与非静态方法调用的互斥性,以及构造方法不能被同步修饰。此外,通过反汇编展示了`synchronized`在方法和代码块上的底层实现,涉及ObjectMonitor和monitorenter/monitorexit指令。
14 0
|
2天前
|
监控 安全 Java
在Java中如何优雅的停止一个线程?可别再用Thread.stop()了!
在Java中如何优雅的停止一个线程?可别再用Thread.stop()了!
9 2
|
2天前
|
Java 调度
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
28 1
|
2天前
|
安全 Java
Java基础教程(15)-多线程基础
【4月更文挑战第15天】Java内置多线程支持,通过Thread类或Runnable接口实现。线程状态包括New、Runnable、Blocked、Waiting、Timed Waiting和Terminated。启动线程调用start(),中断线程用interrupt(),同步用synchronized关键字。线程安全包如java.util.concurrent提供并发集合和原子操作。线程池如ExecutorService简化任务管理,Callable接口允许返回值,Future配合获取异步结果。Java 8引入CompletableFuture支持回调。