Spring Boot 集成 WebFlux 开发 Reactive Web 应用
《Spring Boot 实战开发》—— 基于 Gradle + Kotlin的企业级应用开发最佳实践
IBM的研究称,整个人类文明所获得的全部数据中,有90%是过去两年内产生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在内的大批新技术应运而生。其中以RxJava和Reactor为代表的响应式(Reactive)编程技术针对的就是经典的大数据4V( Volume,Variety,Velocity,Value)中的Velocity,即高并发问题,而在Spring 5中,引入了响应式编程的支持。
本章介绍 Spring Boot 如何集成Spring 5 中的WebFlux 开发响应式 Web 应用。
1.1 响应式宣言
响应式宣言和敏捷宣言一样,说起响应式编程,必先提到响应式宣言——
We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto
响应式宣言中包含了4组关键词:
Responsive: 可响应的。要求系统尽可能做到在任何时候都能及时响应。
Resilient: 可恢复的。要求系统即使出错了,也能保持可响应性。
Elastic: 可伸缩的。要求系统在各种负载下都能保持可响应性。
Message Driven: 消息驱动的。要求系统通过异步消息连接各个组件。
可以看到,对于任何一个响应式系统,首先要保证的就是可响应性,否则就称不上是响应式系统。
1.2 Spring 5 响应式Web框架架构图
引用一张来自 Spring 5框架官方文档中的图:
图13-1 Spring 5框架
左侧是传统的基于Servlet的Spring Web MVC框架。右侧是Spring 5.0新引入的基于Reactive Streams的Spring WebFlux框架。从上到下依次是如下三个新组件:
Router Functions
WebFlux
Reactive Streams
下面分别作简要介绍。
Router Functions
对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
WebFlux: 核心组件
协调上下游各个组件提供响应式编程支持。
Reactive Streams
一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux<DataBuffer>格式,以便进行统一处理。
值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。
1.3 项目实战
本节通过实例工程具体介绍开发一个Reactive Web 应用程序的过程。
1.3.1 创建项目
使用http://start.spring.io/ 创建项目,选择 Reactive Web起步依赖。如下图:
图13-2 选择 Reactive Web起步依赖
生成好项目 zip 包后,解压导入 IDEA 中, 选择 Gradle 构建项目。如下图:
图13-3 选择 Gradle 构建
配置 Gradle 本地环境,如下图:
图13-4 配置 Gradle 本地环境
完成导入 IDEA,等待项目构建初始化完毕,可以看到项目依赖树如下图:
图13-5 项目依赖树
可以看到,在 webflux的 starter 中依赖了 reactor、reactive-streams、netty 等。
Spring Initializr 将会帮我们自动生成一个样板工程。下面我们分别来加入 model 层 、dao层、 service层、 handler层等模块的代码。完整的项目的代码目录结构设计如下:
├── src
│ ├── main
│ │ ├── java
│ │ ├── kotlin
│ │ │ └── com
│ │ │ └── easy
│ │ │ └── kotlin
│ │ │ └── webflux
│ │ │ ├── WebfluxApplication.kt
│ │ │ ├── dao
│ │ │ │ └── PersonRepository.kt
│ │ │ ├── handler
│ │ │ │ └── PersonHandler.kt
│ │ │ ├── model
│ │ │ │ └── Person.kt
│ │ │ ├── router
│ │ │ │ └── RouterConfig.kt
│ │ │ ├── server
│ │ │ │ └── HttpServerConfig.kt
│ │ │ └── service
│ │ │ └── PersonService.kt
│ │ └── resources
│ │ └── application.properties
1.3.2 项目代码
本节具体介绍具体的代码实现。
模型层
Person 对象模型代码是
class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) {
override fun toString(): String {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}'
}
}
服务层
接口PersonRepository.kt 的代码如下:
interface PersonRepository {
fun getPerson(id: Int): Mono<Person>
fun allPeople(): Flux<Person>
fun savePerson(person: Mono<Person>): Mono<Void>
}
服务层的实现类PersonService.kt 代码如下
@Service
class PersonService : PersonRepository {
var persons: MutableMap<Int, Person> = hashMapOf()
constructor() {
this.persons[1] = Person("Jack", 20)
this.persons[2] = Person("Rose", 16)
}
// 根据 id 获取 Mono 对象包装的 Person数据
override fun getPerson(id: Int): Mono<Person> {
return Mono.justOrEmpty(this.persons[id])
}
// 返回所有 Person数据,包装在 Flux 对象中
override fun allPeople(): Flux<Person> {
return Flux.fromIterable(this.persons.values)
}
override fun savePerson(person: Mono<Person>): Mono<Void> {
return person.doOnNext {
val id = this.persons.size + 1
persons.put(id, it)
println("Saved ${person} with ${id}")
}.thenEmpty(Mono.empty())
}
}
其中, Mono 和 Flux 是由 Reactor 提供的两个 Reactor的类型。Reactor有两种类型,Flux<T>和Mono<T>。
Flux
Flux 单词的意思是“流”。Flux类似RaxJava的Observable,它可以触发零个或者多个事件,并根据实际情况结束处理或触发错误。
Mono
Mono这个单词本身的意思是“单子”的意思。Mono最多只触发一个事件,它跟RxJava的Single和Maybe类似,所以可以把Mono<Void>用于在异步任务完成时发出通知。
Spring 同时支持其他 Reactive 流实现,如 RXJava。
控制器层PersonHandler.kt 代码如下:
@Service
class PersonHandler {
@Autowired lateinit var repository: PersonRepository
fun getPerson(request: ServerRequest): Mono<ServerResponse> {
val personId = Integer.valueOf(request.pathVariable("id"))!!
val notFound = ServerResponse.notFound().build()
val personMono = this.repository.getPerson(personId)
return personMono
.flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) }
.switchIfEmpty(notFound)
}
fun createPerson(request: ServerRequest): Mono<ServerResponse> {
val person = request.bodyToMono(Person::class.java)
return ServerResponse.ok().build(this.repository.savePerson(person))
}
fun listPeople(request: ServerRequest): Mono<ServerResponse> {
val people = this.repository.allPeople()
return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java)
}
}
这里我们没有真实去连接数据库进行操作,只是在内存中模拟了数据的返回。
请求路由
RouterConfig.kt 配置请求路由,把请求映射到相应的 Handler 处理方法。代码如下:
@Configuration
class RouterConfig {
@Autowired lateinit var personHandler: PersonHandler
@Bean
fun routerFunction(): RouterFunction<*> {
return route(GET("/api/person").and(accept(APPLICATION_JSON)),
HandlerFunction { personHandler.listPeople(it) })
.and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)),
HandlerFunction { personHandler.getPerson(it) }))
}
}
这里我们配置/api/person的 GET 请求映射到personHandler.listPeople()方法处理;/api/person/{id}的 GET 请求映射到personHandler.getPerson() 方法来处理。
Reactive Web服务器配置类HttpServerConfig.kt 配置基于 netty 的 Reactive Web Server。我们配置端口号为application.properties 文件中server.port的值。代码如下
@Configuration
class HttpServerConfig {
@Autowired
lateinit var environment: Environment
@Bean
fun httpServer(routerFunction: RouterFunction<*>): HttpServer {
val httpHandler = RouterFunctions.toHttpHandler(routerFunction)
val adapter = ReactorHttpHandlerAdapter(httpHandler)
val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt())
server.newHandler(adapter)
return server
}
}
项目入口类
项目入口类 WebfluxApplication代码如下:
@SpringBootApplication
class WebfluxApplication
fun main(args: Array<String>) {
runApplication<WebfluxApplication>(*args)
}
运行测试
直接在 IDEA 中启动运行应用,在控制台启动日志中,可以看到路由映射的信息:
Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372
((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319
……
2017-11-04 00:39:50.459 INFO 2884 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:9000
2017-11-04 00:39:50.459 INFO 2884 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 9000
2017-11-04 00:39:50.466 INFO 2884 --- [ main] c.e.kotlin.webflux.WebfluxApplicationKt : Started WebfluxApplicationKt in 5.047 seconds (JVM running for 6.276)
直接在命令行执行curl 请求相应的 url,可以看到对应的输出:
$ curl http://127.0.0.1:9000/api/person
[{"name":"Jack","age":20},{"name":"Rose","age":16}]
$ curl http://127.0.0.1:9000/api/person/1
{"name":"Jack","age":20}
$ curl http://127.0.0.1:9000/api/person/2
{"name":"Rose","age":16}
1.4 本章小结
Spring Web MVC是一个命令式的编程框架,可以很方便的进行开发和调试。在很多情况下,命令式的编程风格就可以满足,但当我们的应用需要高可伸缩性,那么 Reactive 非堵塞方式是最适合的。 所以,需要根据实际情况去决定采用 Spring 5 Reactive 或者是 Spring Web MVC命令式框架。
提示:本章工程源代码:https://github.com/EasyKotlin/kotlin-with-webflux