现在Reactive在Java开发者中依然还是个雾里看花的状态,都知道不是银弹,但是也都不知道在一般的业务场景它能不能用?能用到什么程度?代价是什么?因此为了破除迷信,近距离了解这项技术,从Spring Webflux开始学习,关键心得记录如下。
异步思维
首先全异步编程是Reactive的核心,虽然一直作业务产品开发,但是对多线程操作也不陌生,直观感觉上即使是全异步编程也不会有太大的不适应。但是,全异步真的是需要另一套代码习惯。
下面这段代码,按照我一开始的理解应该是要输出都能输出,但是事实上直接就报了“java.io.IOException: Pipe closed”,也就是说由于使用了JDK8 Closeable方式的try catch写法,导致在实际写入前就在主线程被关闭。
在这里传统的优雅的写法成为了全异步化编程模型的“陋习”,不得不在onFinally或者后续的阻塞场景写一些样板代码将其close。同时这里也暴露了另一个问题,那就是在传统开发模式和Reactive混合使用时,很容易由于前置的同步代码潜在的预期语义,导致异步化后出现各种难以捉摸的异常,并且如果没写doOnError还会丢失异常信息。
@RestController
@RequestMapping("/test")
public class GreetingHandler {
@GetMapping("/hello")
public Flux<String> hello(ServerHttpRequest request) {
return Flux.just("a", "b", "c");
}
}
public static void main(String[] args) throws Exception {
String url = "http://localhost:8080/test/hello";
Flux<DataBuffer> response = WebClient.create()
.get().uri(url).exchange()
.flatMapMany(b -> b.bodyToFlux(DataBuffer.class));
try(PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream)) {
DataBufferUtils.write(response, outputStream)
.doFinally(s -> {
System.out.println("doFinally: " + s.toString() + Thread.currentThread().getId());
try {
System.out.println(inputStream.available());
} catch (IOException e) {
e.printStackTrace();
}
})
.doOnNext(dataBuffer -> {
System.out.println("doOnNext" + Thread.currentThread().getId());
})
.doOnError(t -> {
logger.error("", t);
})
.subscribe(DataBufferUtils.releaseConsumer());
} catch (Exception e) {
logger.error("", e);
}
}
客户端限制
Webflux将Controller的返回固定成另Mono和Flux两种,通过使用浏览器接收到的和普通接口无二,那Webflux真的不限制客户端类型吗?
了解这个问题最好的办法一是看源码,其次就是抓包看一下通信协议。本次使用Wireshark抓包,截图如下。从服务端返回可以看出发送了三次数据,因此说明FLux接口会分成多个chunk返回客户端,同时整体上还是一个完整的HTTP协议,所以客户端不受服务端选型影响。
小结
从目前的初步了解来看,在一般的业务中实战Reactive就是个高风险的选型,因为有性能压力的往往只会是少数几个接口,而开发模型混合之后维护难度会加大不少,这或许就是节约成本的代价。