使用 Kotlin + WebFlux/RxJava 2 实现响应式以及尝试正式版本的协程

简介: 使用 Kotlin + WebFlux/RxJava 2 实现响应式以及尝试正式版本的协程

在前一篇文章《使用 Kotlin + Spring Boot 进行后端开发》中,曾介绍过尝试使用 Kotlin 来做后端开发。这一次,尝试 WebFlux 以及协程。


首先,在build.gradle中添加插件和依赖的库。

plugins {
    id 'java'
    id 'org.jetbrains.kotlin.jvm' version '1.3.10'
    id "org.jetbrains.kotlin.plugin.allopen" version "1.3.10"
}
ext {
    libraries = [
            rxjava                    : "2.2.2",
            logback                   : "1.2.3",
            spring_boot               : "2.1.0.RELEASE",
            kotlinx_coroutines_core   : "1.0.1"
    ]
}
group 'com.kotlin.tutorial'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
def libs = rootProject.ext.libraries // 库
repositories {
    mavenCentral()
}
dependencies {
    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
    compile "org.jetbrains.kotlin:kotlin-reflect:1.3.10"
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation "io.reactivex.rxjava2:rxjava:${libs.rxjava}"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:${libs.kotlinx_coroutines_core}"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:${libs.kotlinx_coroutines_core}"
    implementation "ch.qos.logback:logback-classic:${libs.logback}"
    implementation "ch.qos.logback:logback-core:${libs.logback}"
    implementation "ch.qos.logback:logback-access:${libs.logback}"
    implementation "org.springframework.boot:spring-boot-starter-web:${libs.spring_boot}"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive:${libs.spring_boot}"
}
compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}


此次,使用了 allopen 插件。它是官方提供的插件详见:

https://kotlinlang.org/docs/reference/compiler-plugins.html


Kotlin 的类默认是final的,一般需要使用open关键字。使用了allopen插件就可以节省open关键字。值得注意的是,需要打开 Intellij 的 Enable annotation processing 选项。


这样,创建 SpringKotlinApplication 就不需要使用open

import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
/**
 * Created by tony on 2018/11/13.
 */
@SpringBootApplication
class SpringKotlinApplication
fun main(args: Array<String>) {
    SpringApplication.run(SpringKotlinApplication::class.java, *args)
}


另外,不要忘记配置数据库的信息,例子采用的是 MongoDB。


WebFlux



WebFlux 是 Spring 5 新增的特性,相对于传统 MVC 的同步阻塞IO模型,它采用异步非阻塞的IO模型。


WebFlux 的 Flux 取自于 Reactor 中的类 Flux。Reactor 是 Spring 5 响应式开发的基础。


Reactor 是完全基于响应式流规范设计和实现的库,Flux 和 Mono 是 Reactor 中的两个基本概念。


Flux 类似 RxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。Mono 最多只触发一个事件,它跟 RxJava 的 Single 和 Maybe 类似,所以可以把 Mono 用于在异步任务完成时发出通知。


1.1 创建 Model


首先,创建几个 Model 类。


User 表示用户对象。

import org.springframework.data.annotation.Id
/**
 * Created by tony on 2018/11/22.
 */
data class User(@Id val id: String? = null, val name: String, val age: Int, val address: Address) {
    constructor() : this(null, "", 0, Address())
    constructor(name: String, age: Int, address: Address) : this(null, name = name, age = age, address = address)
}


Address 记录用户的地址。

import org.springframework.data.annotation.Id
/**
 * Created by tony on 2018/11/22.
 */
data class Address(@Id val id: String? = null, val number: Int, val street: String, val city: String) {
    constructor() : this(null, 0, "", "")
    constructor(number: Int, street: String, city: String) : this(null, number, street, city)
}


Audit 用于记录用户操作的时间。

import org.springframework.data.annotation.Id
import java.time.LocalDateTime
/**
 * Created by tony on 2018/11/22.
 */
data class Audit(@Id val id: String? = null, val name: String, val eventDate: LocalDateTime) {
    constructor() : this(null, "",LocalDateTime.now())
    constructor(name: String, eventDate: LocalDateTime) : this(null, name, eventDate)
}


1.2 创建 Repository


创建 UserReactiveRepository 用于 User 对象的查询操作,它实现 ReactiveMongoRepository 接口。

import com.kotlin.tutorial.model.User
import org.springframework.data.mongodb.repository.ReactiveMongoRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
/**
 * Created by tony on 2018/11/22.
 */
@Repository
interface UserReactiveRepository : ReactiveMongoRepository<User, String> {
    fun findUserByAge(age: Int): Flux<User>
    fun findUserByAddressCity(city: String): Flux<User>
    fun findUserByAgeAndAddressCity(age: Int, city: String): Flux<User>
}


创建 AuditRepository 用于查询用户最近一条的操作时间。

import com.kotlin.tutorial.model.Audit
import org.springframework.data.repository.CrudRepository
import org.springframework.stereotype.Repository
/**
 * Created by tony on 2018/11/22.
 */
@Repository
interface AuditRepository: CrudRepository<Audit, String> {
    fun findFirstByNameOrderByEventDateDesc(name: String): Audit
}


1.3 创建 Service


创建 UserReactiveService,通过依赖注入了 userRepository、auditRepository。

import com.kotlin.tutorial.Utils.toLower
import com.kotlin.tutorial.model.Address
import com.kotlin.tutorial.model.Audit
import com.kotlin.tutorial.model.User
import com.kotlin.tutorial.repository.AuditRepository
import com.kotlin.tutorial.repository.UserReactiveRepository
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import java.time.LocalDateTime
/**
 * Created by tony on 2018/11/22.
 */
@Component
class UserReactiveService {
    @Autowired
    lateinit var userRepository: UserReactiveRepository
    @Autowired
    lateinit var auditRepository: AuditRepository
    companion object {
        val cities = listOf("Shanghai", "Suzhou", "Hangzhou").toLower()
        val streets = listOf("renming road", "zhongshan road").toLower()
    }
    fun find(age: Int?, rawCity: String?): Flux<User> {
        val city = rawCity?.toLowerCase()
        return when {
            age is Int && city is String -> userRepository.findUserByAgeAndAddressCity(age, city)
            city is String -> userRepository.findUserByAddressCity(city)
            age is Int -> userRepository.findUserByAge(age)
            else -> userRepository.findAll()
        }
    }
    fun generateData(): Flux<User> {
        val list = listOf(20, 25, 33, 28, 34).map {
            val u = generate(it)
            auditRepository.save(Audit(u.name, LocalDateTime.now()))
            u
        }
        return userRepository.deleteAll().thenMany(userRepository.saveAll(list))
    }
    private fun generate(age: Int): User {
        val address = Address(age, streets[age % streets.size], cities[age % cities.size])
        return User("Tony$age", age, address)
    }
}


1.4 创建 Controller


创建 UserController 编写两个 reactive 的接口:

@RestController
@RequestMapping("/user")
class UserController {
    @Autowired
    lateinit var userReactiveService: UserReactiveService
    @GetMapping("/reactive/find")
    fun findByReactive(@RequestParam age: Int?, @RequestParam city: String?) = userReactiveService.find(age, city)
    @GetMapping("/reactive/generate")
    fun genDataByReactive() = userReactiveService.generateData()
    ......    
}


创建用户的方式:

curl http://localhost:8080/user/reactive/generate


基于城市查询用户的方式:

curl http://localhost:8080/user/reactive/find?city=suzhou


RxJava 2



RxJava 库是 JVM 上响应式编程的先驱,也是响应式流规范(Reactive Streams)的基础。


如果对 RxJava 2 不熟悉,也可以购买我的《RxJava 2.x 实战》


2.1 创建 Repository


创建 UserRxJavaRepository 功能跟 UserReactiveRepository 一样,只是多了一个 findUserByName() 方法。

import com.kotlin.tutorial.model.User
import io.reactivex.Flowable
import org.springframework.data.repository.reactive.RxJava2CrudRepository
import org.springframework.stereotype.Repository
/**
 * Created by tony on 2018/11/22.
 */
@Repository
interface UserRxJavaRepository : RxJava2CrudRepository<User, String> {
    fun findUserByName(name: String): Flowable<User>
    fun findUserByAge(age: Int): Flowable<User>
    fun findUserByAddressCity(city: String): Flowable<User>
    fun findUserByAgeAndAddressCity(age: Int, city: String): Flowable<User>
}


2.2 创建 JavaService


创建 UserRxJavaService ,类似于 UserReactiveService。但是,多了两个方法:findByName()、login()。其中,调用 login() 会添加一条审计的记录。

import com.kotlin.tutorial.Utils.toLower
import com.kotlin.tutorial.model.Address
import com.kotlin.tutorial.model.Audit
import com.kotlin.tutorial.model.User
import com.kotlin.tutorial.repository.AuditRepository
import com.kotlin.tutorial.repository.UserRxJavaRepository
import io.reactivex.Flowable
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import java.time.LocalDateTime
/**
 * Created by tony on 2018/11/22.
 */
@Component
class UserRxJavaService {
    @Autowired
    lateinit var userRepository: UserRxJavaRepository
    @Autowired
    lateinit var auditRepository: AuditRepository
    companion object {
        val cities = listOf("Shanghai", "Suzhou", "Hangzhou").toLower()
        val streets = listOf("renming road", "zhongshan road").toLower()
    }
    fun findByName(name: String): Flowable<User> = userRepository.findUserByName(name)
    fun find(age: Int?, rawCity: String?): Flowable<User> {
        val city = rawCity?.toLowerCase()
        return when {
            age is Int && city is String -> userRepository.findUserByAgeAndAddressCity(age, city)
            city is String -> userRepository.findUserByAddressCity(city)
            age is Int -> userRepository.findUserByAge(age)
            else -> userRepository.findAll()
        }
    }
    fun generateData(): Flowable<User> {
        val list = listOf(20, 25, 33, 28, 34).map {
            val u = generate(it)
            auditRepository.save(Audit(u.name, LocalDateTime.now()))
            u
        }
        return userRepository.deleteAll().andThen(userRepository.saveAll(list))
    }
    private fun generate(age: Int): User {
        val address = Address(age, streets[age % streets.size], cities[age % cities.size])
        return User("Tony$age", age, address)
    }
    fun login(name: String) =
            userRepository.findUserByName(name)
            .map {
                auditRepository.save(Audit(it.name, LocalDateTime.now()))
            }
}


2.3 创建 Controller


在原有的 UserController 中新增两个 rxjava 的接口:

@RestController
@RequestMapping("/user")
class UserController {
    @Autowired
    lateinit var userRxJavaService: UserRxJavaService
    @GetMapping("/rxjava/find")
    fun findByRx(@RequestParam age: Int?, @RequestParam city: String?) = userRxJavaService.find(age, city)
    @GetMapping("/rxjava/generate")
    fun genDateByRx() = userRxJavaService.generateData()
    ...... 
}


Kotlin 1.3 的 Coroutines



协程(coroutine)相比于线程更加轻量级,协程又称为微线程。线程和协程的一个显著区别是,线程的阻塞代价是昂贵的,而协程使用了更简单、代价更小的挂起(suspend)来代替阻塞。


Coroutines 是 Kotlin 1.1 增加的实验的功能,到 Kotlin 1.3 已经变成了正式的功能。


先在 UserController 创建一个模拟登陆的接口,访问该接口时会添加一条审计的记录

@GetMapping("/rxjava/login")
    fun mockLogin(@RequestParam username: String) = userRxJavaService.login(username)


然后尝试用传统的 blocking 方式来编写一个获取登陆信息的接口:

@GetMapping("/blocking/{username}")
    fun getNormalLoginMessage(@PathVariable username: String):String {
        val user = userService.findByName(username)
        val lastLoginTime = auditService.findByName(user.name).eventDate
        return "Hi ${user.name}, you have logged in since $lastLoginTime"
    }


再尝试用 RxJava 的方式来编写该接口:

@GetMapping("/rxjava/{username}")
    fun getRxLoginMessage(@PathVariable username: String)=
            userRxJavaService.findByName(username)
                    .map {
                        auditService.findByName(it.name).eventDate
                    }
                    .map {
                        "Hi ${username}, you have logged in since $it"
                    }


最后,使用 Coroutines 的方式来编写接口:

@GetMapping("/coroutine/{username}")
    fun getLoginMessage(@PathVariable username: String) = runBlocking {
        val user = userRxJavaService.findByName(username).awaitSingle()
        val lastLoginTime = GlobalScope.async {
            auditService.findByName(user.name).eventDate
        }.await()
        "Hi ${user.name}, you have logged in since $lastLoginTime"
    }


可以看到,使用协程的方式类似于传统的 blocking 的方式来编写代码。


image.png

模拟用户登陆.png


image.png

使用 Coroutines 的方式获取登陆信息.png


关于协程,更多可以参考之前写的 Coroutines 笔记:


Kotlin Coroutines 笔记 (一)Kotlin Coroutines 笔记 (二)


虽然 Kotlin 1.3 之后有些变动,但是大体是不变的。之后,也会整理更多 Kotlin Coroutines 笔记。


总结



响应式开发是未来的趋势,无论是服务端开发还是移动端开发,都会顺应这个趋势。


另外,Kotlin 1.3 之后的协程已经是正式版本,Kotlin 在语言级别上支持了协程,它是异步编程的另一个不错的选择。

相关文章
|
1月前
|
前端开发 Java API
vertx学习总结5之回调函数及其限制,如网关/边缘服务示例所示未来和承诺——链接异步操作的简单模型响应式扩展——一个更强大的模型,特别适合组合异步事件流Kotlin协程
本文是Vert.x学习系列的第五部分,讨论了回调函数的限制、Future和Promise在异步操作中的应用、响应式扩展以及Kotlin协程,并通过示例代码展示了如何在Vert.x中使用这些异步编程模式。
48 5
vertx学习总结5之回调函数及其限制,如网关/边缘服务示例所示未来和承诺——链接异步操作的简单模型响应式扩展——一个更强大的模型,特别适合组合异步事件流Kotlin协程
|
1月前
|
JSON 调度 数据库
Android面试之5个Kotlin深度面试题:协程、密封类和高阶函数
本文首发于公众号“AntDream”,欢迎微信搜索“AntDream”或扫描文章底部二维码关注,和我一起每天进步一点点。文章详细解析了Kotlin中的协程、扩展函数、高阶函数、密封类及`inline`和`reified`关键字在Android开发中的应用,帮助读者更好地理解和使用这些特性。
25 1
|
2月前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
47 3
|
2月前
|
数据处理 开发者 C++
Kotlin协程与RxJava:谁将称雄现代应用开发?揭秘背后的技术博弈与选择之道!
【9月更文挑战第13天】本文对比了现代应用开发中备受欢迎的两种并发编程方案:Kotlin协程与RxJava。Kotlin协程以轻量级线程和挂起函数简化异步编程,尤其适合I/O密集型任务;RxJava基于观察者模式,擅长处理复杂异步数据流。文中还提供了示例代码,帮助开发者根据项目需求和偏好做出合适的选择。
67 1
|
3月前
|
调度 开发者 UED
Kotlin 中的协程是什么?
【8月更文挑战第31天】
231 0
|
5月前
|
存储 Java 调度
Android面试题之Kotlin 协程的挂起、执行和恢复过程
了解Kotlin协程的挂起、执行和恢复机制。挂起时,状态和上下文(局部变量、调用栈、调度器等)被保存;挂起点通过`Continuation`对象处理,释放线程控制权。当恢复条件满足,调度器重新分配线程,调用`resumeWith`恢复执行。关注公众号“AntDream”获取更多并发知识。
128 2
|
6月前
|
移动开发 Android开发 开发者
构建高效Android应用:Kotlin与协程的完美融合
【5月更文挑战第25天】 在移动开发的世界中,性能和响应性是衡量应用质量的关键指标。随着Kotlin的流行和协程的引入,Android开发者现在有了更强大的工具来提升应用的性能和用户体验。本文深入探讨了Kotlin语言如何与协程相结合,为Android应用开发带来异步处理能力的同时,保持代码的简洁性和可读性。我们将通过实际案例分析,展示如何在Android项目中实现协程,以及它们如何帮助开发者更有效地管理后台任务和用户界面的流畅交互。
|
6月前
|
移动开发 监控 Android开发
构建高效安卓应用:Kotlin 协程的实践与优化
【5月更文挑战第16天】 在移动开发领域,性能优化一直是开发者们追求的重要目标。特别是对于安卓平台来说,由于设备多样性和系统资源的限制,如何提升应用的响应性和流畅度成为了一个关键议题。近年来,Kotlin 语言因其简洁、安全和高效的特点,在安卓开发中得到了广泛的应用。其中,Kotlin 协程作为一种轻量级的并发解决方案,为异步编程提供了强大支持,成为提升安卓应用性能的有效手段。本文将深入探讨 Kotlin 协程在安卓开发中的应用实践,以及通过合理设计和使用协程来优化应用性能的策略。
66 8
|
6月前
|
移动开发 数据库 Android开发
构建高效Android应用:探究Kotlin的协程优势
【5月更文挑战第22天】随着移动开发技术的不断进步,Android平台的性能优化已经成为开发者关注的焦点。在众多提升应用性能的手段中,Kotlin语言提供的协程概念因其轻量级线程管理和异步编程能力而受到广泛关注。本文将深入探讨Kotlin协程在Android开发中的应用,以及它如何帮助开发者构建出更高效、响应更快的应用,同时保持代码的简洁性和可读性。
|
6月前
|
移动开发 Android开发 开发者
构建高效安卓应用:Kotlin 协程的实践指南
【5月更文挑战第18天】 随着移动开发技术的不断进步,安卓平台亟需一种高效的异步编程解决方案来应对日益复杂的应用需求。Kotlin 协程作为一种新兴的轻量级线程管理机制,以其简洁的语法和强大的功能,成为解决这一问题的关键。本文将深入探讨Kotlin协程在安卓开发中的实际应用,从基本概念到高级技巧,为开发者提供一份全面的实践指南,旨在帮助读者构建更加高效、稳定的安卓应用。