WebFlux原理浅尝
基于Netty作为服务器来讲解WebFlux的实现原理。
Reactor Netty概述
Netty作为服务器时,其底层是基于Reactor Netty来进行反应式流支持的。Reactor Netty提供基于Netty框架的无阻塞和回压的TCP/HTTP/UDP客户端和服务器。在WebFlux中主要使用其创建的HTTP服务器,Reactor Netty提供易于使用且易于配置的HttpServer类。它隐藏了创建HTTP服务器所需的大部分Netty功能,并添加了Reactive Streams回压。
想要使用Reactor Netty库提供的功能,首先需要通过以下代码将库添加到pom.xml中来导入BOM:
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Californium-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
然后需要像往常一样将依赖项添加到相关的reactor项目中(不需要加version标签)。以下代码显示了如何执行此操作:
<dependencies> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> </dependencies>
要启动HTTP服务器,必须要创建和配置HttpServer实例。默认情况下,主机(Host)配置为任何本地地址,并且系统在调用绑定操作时可选取临时端口(port)。以下示例显示如何创建HttpServer实例:
import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; public class ReactorNetty { public static void main(String[] args) { DisposableServer server = HttpServer.create()//1.创建http服务器 .host("localhost")//2.设置host .port(8080)//3.设置监听端口 .route(routes -> routes//4.设置路由规则 .get("/hello", (request, response) -> response.sendString(Mono.just("Hello World!"))) .post("/echo", (request, response) -> response.send(request.receive().retain())) .get("/path/{param}", (request, response) -> response.sendString(Mono.just(request.param("param")))) .ws("/ws", (wsInbound, wsOutbound) -> wsOutbound.send(wsInbound.receive().retain()))) .bindNow(); server.onDispose().block();//5.阻塞方式启动服务器,同步等待服务停止 } }
由上述代码可知:
- 代码1创建了一个待配置的HttpServer。
- 代码2配置HTTP服务的主机。
- 代码3配置HTTP服务的监听端口号。
·代码4配置HTTP服务路由,为访问路径/hello提供GET请求并返回“Hello World!”;为访问路径/echo提供POST请求,并将收到的请求正文作为响应返回;为访问路径/path/{param}提供GET请求并返回path参数的值;将websocket提供给/ws并将接收的传入数据作为传出数据返回。
·代码5调用代码1返回的DisposableServer的onDispose()方法并以阻塞的方式等待服务器关闭。
运行上面代码,在浏览器中输入http://127.0.0.1:8080/hello,若在页面上显示出“Hello World!”,说明我们的HTTP服务器生效了。
WebFlux服务器启动流程
我们结合SpringBoot的启动流程讲解WebFlux服务启动流程,首先我们看一下启动时序图
【WebFlux服务启动时序图】
图中的步骤1通过createApplicationContext创建了应用程序上下文AnnotationConfigReactiveWebServerApplicationContext,其代码如下:
protected ConfigurableApplicationContext createApplicationContext() { Class<?> contextClass = this.applicationContextClass; if (contextClass == null) { try { //a 环境类型 switch (this.webApplicationType) { case SERVLET://a.1 Web servlet环境 contextClass = Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS); break; case REACTIVE://a.2 Web Reactive环境 contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS); break; default://a.3 非Web环境 contextClass = Class.forName(DEFAULT_CONTEXT_CLASS); } } catch (ClassNotFoundException ex) { throw new IllegalStateException( "Unable create a default ApplicationContext, " + "please specify an ApplicationContextClass", ex); } } return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass); } //默认非Web环境时 public static final String DEFAULT_CONTEXT_CLASS = "org.springframework.context." + "annotation.AnnotationConfigApplicationContext"; //web Servlet环境时默认的上下文 public static final String DEFAULT_SERVLET_WEB_CONTEXT_CLASS = "org.springframework.boot." + "web.servlet.context.AnnotationConfigServletWebServerApplicationContext"; //反应式Web环境时默认的上下文 public static final String DEFAULT_REACTIVE_WEB_CONTEXT_CLASS = "org.springframework." + "boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext";
如上述代码所示,创建容器应用程序上下文时应根据环境类型的不同而创建不同的应用程序上下文。这里我们使用的是反应式Web环境,所以创建的应用程序上下文是AnnotationConfigReactiveWebServerApplicationContext的实例。
那么环境类型webApplicationType是如何确定的呢?其实是在创建SpringApplication的构造函数内确定的:
public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) { ... this.webApplicationType = WebApplicationType.deduceFromClasspath(); ... } 下面我们看WebApplicationType的deduceFromClasspath方法: static WebApplicationType deduceFromClasspath() { //b.判断是不是REACTIVE类型 if (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null) && !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null)) { return WebApplicationType.REACTIVE; } //c.判断是不是非Web类型 for (String className : SERVLET_INDICATOR_CLASSES) { if (!ClassUtils.isPresent(className, null)) { return WebApplicationType.NONE; } } //SERVLET环境 return WebApplicationType.SERVLET; } //spring mvc 分派器 private static final String WEBMVC_INDICATOR_CLASS = "org.springframework." + " web.servlet.DispatcherServlet"; // reactive web分派器 private static final String WEBFLUX_INDICATOR_CLASS = "org." + "springframework.web.reactive.DispatcherHandler"; //Jersey Web 项目容器类 private static final String JERSEY_INDICATOR_CLASS = "org.glassfish.jersey.servlet.ServletContainer"; //Servlet容器所需要的类 private static final String[] SERVLET_INDICATOR_CLASSES = { "javax.servlet.Servlet", "org.springframework.web.context.ConfigurableWebApplicationContext" };
如上述代码所示,deduceFromClasspath方法是根据classpath下是否有对应的Class字节码文件存在来决定当前是什么环境的。
下面我们看图中步骤3是如何创建并启动HTTP服务器的。在Spring上下文刷新的onRefresh阶段调用了createWebServer方法来创建Web服务器,其内部调用getWebServerFactory来获取Web服务器工厂。getWebServerFactory代码如下:
protected ReactiveWebServerFactory getWebServerFactory() { //d 从bean工厂中获取所有ReactiveWebServerFactory类型的Bean实例的名字 String[] beanNames = getBeanFactory() .getBeanNamesForType(ReactiveWebServerFactory.class); //e 不存在则抛出异常 if (beanNames.length == 0) { throw new ApplicationContextException( "Unable to start ReactiveWebApplicationContext due to missing " + "ReactiveWebServerFactory bean."); } if (beanNames.length > 1) { throw new ApplicationContextException( "Unable to start ReactiveWebApplicationContext due to multiple " + "ReactiveWebServerFactory beans : " + StringUtils.arrayToCommaDelimitedString(beanNames)); } //f 存在则获取第一个实例 return getBeanFactory().getBean(beanNames[0], ReactiveWebServerFactory.class); }
如上述代码所示,从应用程序上下文对应的Bean工厂中获取ReactiveWebServerFactory的实例,以便后面创建Web服务器。那么ReactiveWebServerFactory的实现类的实例什么时候注入上下文容器中呢?其实这是借助了Springboot的autoconfigure机制,autoconfigure机制会自动把ReactiveWebServerFactory的实现类NettyReactiveWebServer Factory注入容器内。
具体注入哪个ReactiveWebServerFactory的实现类,是ReactiveWebServerFactoryConfiguration根据autoconfigure机制来做的,其代码如下:
class ReactiveWebServerFactoryConfiguration { //f.1将NettyReactiveWebServerFactory注入容器 @Configuration @ConditionalOnMissingBean(ReactiveWebServerFactory.class) @ConditionalOnClass({ HttpServer.class }) static class EmbeddedNetty { @Bean public NettyReactiveWebServerFactory nettyReactiveWebServerFactory() { return new NettyReactiveWebServerFactory(); } } //f.2注入TomcatReactiveWebServerFactory实例 @Configuration @ConditionalOnMissingBean(ReactiveWebServerFactory.class) @ConditionalOnClass({ org.apache.catalina.startup.Tomcat.class }) static class EmbeddedTomcat { @Bean public TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory() { return new TomcatReactiveWebServerFactory(); } } //f.3注入JettyReactiveWebServerFactory实例 @Configuration @ConditionalOnMissingBean(ReactiveWebServerFactory.class) @ConditionalOnClass({ org.eclipse.jetty.server.Server.class }) static class EmbeddedJetty { @Bean public JettyReactiveWebServerFactory jettyReactiveWebServerFactory() { return new JettyReactiveWebServerFactory(); } } //f.4注入UndertowReactiveWebServerFactory实例 @ConditionalOnMissingBean(ReactiveWebServerFactory.class) @ConditionalOnClass({ Undertow.class }) static class EmbeddedUndertow { @Bean public UndertowReactiveWebServerFactory undertowReactiveWebServerFactory() { return new UndertowReactiveWebServerFactory(); } } }
比如代码f.1,如果当前容器上下文中不存在ReactiveWebServerFactory的实例,并且classpath下存在HttpServer的class文件,则说明当前环境为Reactive环境,则注入NettyReactiveWebServerFactory到容器。
比如代码f.2,如果当前容器上下文中不存在ReactiveWebServerFactory的实例,并且classpath下存在org.apache.catalina.startup.Tomcat的class文件,则说明当前环境为Servlet环境,并且Servlet容器为Tomcat,则将TomcatReactiveWebServerFactory实例注入容器。
找到对应的ReactiveWebServerFactory工厂实例后,如图所示,步骤8创建了ServerManager的实例,代码如下:
public static ServerManager get(ReactiveWebServerFactory factory) { return new ServerManager(factory); }
其中ServerManager的构造函数如下:
private ServerManager(ReactiveWebServerFactory factory) { this.handler = this::handleUninitialized; this.server = factory.getWebServer(this); }
由上可知,调用NettyReactiveWebServerFactory的getWebServer方法创建了Web服务器,其代码如下:
public WebServer getWebServer(HttpHandler httpHandler) { //I HttpServer httpServer = createHttpServer(); //II ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter( httpHandler); //III return new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout); }
如上代码I所示,其通过createHttpServer创建了HTTPServer,其代码如下(使用reactor Netty的API创建了HTTP Server):
private HttpServer createHttpServer() { return HttpServer.builder().options((options) -> { options.listenAddress(getListenAddress()); if (getSsl() != null && getSsl().isEnabled()) { SslServerCustomizer sslServerCustomizer = new SslServerCustomizer( getSsl(), getSslStoreProvider()); sslServerCustomizer.customize(options); } if (getCompression() != null && getCompression().getEnabled()) { CompressionCustomizer compressionCustomizer = new CompressionCustomizer( getCompression()); compressionCustomizer.customize(options); } applyCustomizers(options); }).build(); }
代码II创建了与Netty对应的适配器类ReactorHttpHandlerAdapter。
代码III创建了一个NettyWebServer的实例,其包装了适配器和HTTPserver实例。
到这里我们如何创建HTTPServer就讲解完了。
下面我们看图7-3中所示步骤11是如何启动服务的。在应用程序上下文刷新的finishRefresh阶段调用了startReactiveWebServer方法来启动服务,其代码如下:
private WebServer startReactiveWebServer() { ServerManager serverManager = this.serverManager; ServerManager.start(serverManager, this::getHttpHandler); return ServerManager.getWebServer(serverManager); }
如上代码所示,首先调用了getHttpHandler来获取处理器:
protected HttpHandler getHttpHandler() { // Use bean names so that we don't consider the hierarchy String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class); if (beanNames.length == 0) { throw new ApplicationContextException( "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean."); } if (beanNames.length > 1) { throw new ApplicationContextException( "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : " + StringUtils.arrayToCommaDelimitedString(beanNames)); } return getBeanFactory().getBean(beanNames[0], HttpHandler.class); }
如上代码所示,其中获取了应用程序上下文中HttpHandler的实现类,这里为HttpWebHandlerAdapter。然后调用ServerManager.start启动了服务,其代码如下:
public static void start(ServerManager manager, Supplier<HttpHandler> handlerSupplier) { if (manager != null && manager.server != null) { manager.handler = handlerSupplier.get();//执行getHttpHandler方法 manager.server.start();//启动服务 } }
如上代码所示,首先把HttpWebHandlerAdapter实例保存到了ServerManager内部,然后启动ServerManager中的NettyWebServer服务器。NettyWebServer的start方法代码如下:
public void start() throws WebServerException { //IV具体启动服务 if (this.nettyContext == null) { try { this.nettyContext = startHttpServer(); } ... //开启deamon线程同步等待服务终止 NettyWebServer.logger.info("Netty started on port(s): " + getPort()); startDaemonAwaitThread(this.nettyContext); } } private BlockingNettyContext startHttpServer() { if (this.lifecycleTimeout != null) { return this.httpServer.start(this.handlerAdapter, this.lifecycleTimeout); } return this.httpServer.start(this.handlerAdapter); }
如上代码IV所示,其调用了startHttpServer启动服务,然后返回了BlockingNetty Context对象,接着调用了startDaemonAwaitThread开启deamon线程同步等待服务终止,其代码如下:
private void startDaemonAwaitThread(BlockingNettyContext nettyContext) { //启动线程 Thread awaitThread = new Thread("server") { @Override public void run() { //同步阻塞服务停止 nettyContext.getContext().onClose().block(); } }; //设置线程为demaon,并启动 awaitThread.setContextClassLoader(getClass().getClassLoader()); awaitThread.setDaemon(false); awaitThread.start(); }
这里之所以开启线程来异步等待服务终止,是因为这样不会阻塞调用线程,如果调用线程被阻塞了,则整个SpringBoot应用就运行不起来了。
WebFlux一次服务调用流程
前面我们说了WebFlux服务启动流程,本节我们讲解一次服务调用流程,以controller PersonHandler中的getPerson方法调用流程为例。
当我们在浏览器敲入http://127.0.0.1:8080/getPerson时,会向WebFlux中的Netty服务器发起请求,服务器中的Boss监听线程会接收该请求,并在完成TCP三次握手后,把连接套接字通道注册到worker线程池的某个NioEventLoop中来处理,然后该NioEventLoop中对应的线程就会轮询该套接字上的读写事件并进行处理。下面我们来看其时序图,如下图所示。
【WebFlux一次服务调用流程】
如图所示,当注册到worker线程池的NioEventLoop上的连接套接字有读事件后,会调用processSelectedKeys方法进行处理,然后把读取的数据通过与该通道对应的管道DefaultChannelPipeline传播到注册的事件处理器进行处理。这里处理器HttpServerCodec负责把二进制流解析为HTTP请求报文,然后传递到管道后面的处理器HttpServerHandler中,HttpServerHandler会调用ServerContextHandler的createOperations方法,通过代码“channel.eventLoop().execute(op::onHandlerStart);”把ChannelOperations的onHandlerStart方法作为任务提交到与当前通道对应的NioEventLoop管理的队列中。下面我们看NioEventLoop中的线程是如何执行该任务的。onHandlerStart代码如下:
protected void onHandlerStart() { applyHandler(); } protected final void applyHandler() { try { //1.调用适配器ReactorHttpHandlerAdapter的apply方法 Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this)) .subscribe(this); } catch (Throwable t) { channel.close(); } }
如上述代码1所示,调用适配器ReactorHttpHandlerAdapter的apply方法来具体处理请求,其代码如下:
public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) { ServerHttpRequest adaptedRequest; ServerHttpResponse adaptedResponse; try { //2.创建与reactor对应的请求、响应对象 adaptedRequest = new ReactorServerHttpRequest(request, BUFFER_FACTORY); adaptedResponse = new ReactorServerHttpResponse(response, BUFFER_FACTORY); } catch (URISyntaxException ex) { ... response.status(HttpResponseStatus.BAD_REQUEST); return Mono.empty(); } ... //3. 这里httpHandler为ServerManager return this.httpHandler.handle(adaptedRequest, adaptedResponse) .doOnError(ex -> logger.warn("Handling completed with error: " + ex.getMessage())) .doOnSuccess(aVoid -> logger.debug("Handling completed with success")); }
然后我们看代码3所示的ServerManager的handle方法:
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { //4.这里handler为HttpWebHandlerAdapter return this.handler.handle(request, response); } 接着调用HttpWebHandlerAdapter的handle方法,其代码如下: public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { //5.创建服务交换对象 ServerWebExchange exchange = createExchange(request, response); //6.这里getDelegate()为DispatcherHandler return getDelegate().handle(exchange) .onErrorResume(ex -> handleFailure(request, response, ex)) .then(Mono.defer(response::setComplete)); } protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) { return new DefaultServerWebExchange(request, response, this.sessionManager, getCodecConfigurer(), getLocaleContextResolver(), this.applicationContext); }
最后调用分派器DispatcherHandler的handle方法进行路由:
public Mono<Void> handle(ServerWebExchange exchange) { ... if (this.handlerMappings == null) { return Mono.error(HANDLER_NOT_FOUND_EXCEPTION); } //7.查找对应的controller进行处理 return Flux.fromIterable(this.handlerMappings)//7.1获取所有处理器映射 .concatMap(mapping -> mapping.getHandler(exchange))//7.2转换映射,获取处理器 .next()//7.3获取第一个元素 .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))//7.4不存在处理器 .flatMap(handler -> invokeHandler(exchange, handler))//7.5使用处理器进行处理 .flatMap(result -> handleResult(exchange, result));//7.6处理处理器处理的结果 }
上述代码使用所有请求处理器映射作为Flux流的数据源,查找与指定请求对应的处理器。
如果没有找到,则使用Mono.error(HANDLER_NOT_FOUND_EXCEPTION)创建一个错误信息作为元素;
如果找到了,则调用invokeHandler方法进行处理,处理完毕调用handleResult对结果进行处理。这里我们找到了与getPerson对应的处理器PersonHandler,则invokeHandler内会反射调用PersonHandler的getPerson方法进行执行,然后把结果交给handleResult写回响应对象。
WebFlux的适用场景
既然Spring 5中推出了WebFlux,那么我们做项目时到底选择使用Spring MVC还是WebFlux?
这是一个自然会想到的问题,但却是不合理的。因为两者的存在并不是矛盾的,利用两者可扩大我们开发时可用选项的范围。两者的设计是为了保持连续性和一致性,它们可以并排使用,每一方的反馈都有利于双方。下图所示显示了两者之间的关系、共同点以及各自的特性。
【WebFlux与Servlet对比】
建议
关于是选择Spring MVC还是WebFlux,Spring5官方文档给出了几点建议:
如果你的Spring MVC应用程序运行正常,则无须更改。命令式编程是编写、理解和调试代码的最简单方法。
如果你已使用非阻塞Web栈,则可以考虑使用WebFlux。因为Spring WebFlux提供与此相同的执行模型优势,并且提供了可用的服务器选择(Netty、Tomcat、Jetty、Undertow和Servlet 3.1+容器),还提供了可选择的编程模型(带注解的controller和函数式Web端点),以及可选择的反应库(Reactor、RxJava或其他)。
如果你对与Java 8 Lambdas或Kotlin一起使用的轻量级、功能性Web框架感兴趣,则可以使用Spring WebFlux函数式Web端点。对于较小的应用程序或具有较低复杂要求的微服务而言,这也是一个不错的选择,可以让你从更高的透明度和控制中受益。
在微服务架构中,你可以将应用程序与Spring MVC、Spring WebFlux控制器、Spring WebFlux函数式端点混合使用。在两个框架中支持相同的基于注解的编程模型,可以更轻松地重用知识,同时为正确的工作选择合适的工具。
评估应用程序的一种简单方法是检查其依赖性。如果你要使用阻塞持久性API(JPA,JDBC)或网络API,则Spring MVC至少是常见体系结构的最佳选择。从技术上讲,Reactor和RxJava都可以在单独的线程上执行阻塞调用,但是你无法充分利用非阻塞的Web技术栈。
如果你有一个调用远程服务的Spring MVC应用程序,则可尝试使用反应式WebClient。你可以直接从Spring MVC控制器方法返回反应式类型(Reactor、RxJava或其他)。每次调用的延迟或调用之间的相互依赖性越大,其益处就越大。Spring MVC控制器也可以调用其他反应式组件。
小结
Spring 5.0引入的新的异步非阻塞的WebFlux技术栈,其与Servlet技术栈是并行存在的。WebFlux从规范上支持异步处理,基于Reactor库天然支持反应式编程,并且其使用少量固定线程来实现系统可伸缩性