在 Kotlin 中使用 WebFlux + R2DBC 开发 Web 项目

简介: 在 Kotlin 中使用 WebFlux + R2DBC 开发 Web 项目

一. R2DBC 介绍



在 R2DBC 官网(http://r2dbc.io/) 上,对 R2DBC 有一句话的介绍:


The Reactive Relational Database Connectivity (R2DBC) project brings reactive programming APIs to relational databases.


R2DBC 的含义是 Reactive Relational Database Connectivity,它是一个使用响应式驱动程序集成关系数据库的孵化器。它是在响应式编程的基础上使用关系数据访问技术。


R2DBC 最初是一项实验和概念验证,旨在将 SQL 数据库集成到使用响应式编程模型的系统中。JDBC 使用的是阻塞式 API,而 R2DBC 允许开发者使用无阻塞 API 访问关系数据库,因为 R2DBC 包含 Reactive Streams 规范。从官网上还能看到 R2DBC 支持的响应式框架包括:Reactor、RxJava、Smallrye  Mutiny。


R2DBC 目前是一个开放的规范,它为驱动程序供应商实现和客户端使用建立了一个服务提供者接口(SPI)。


另外,R2DBC 是由 Spring 官方团队提出的规范,除了驱动实现外还提供了 R2DBC 连接池和 R2DBC 代理。


目前 R2DBC 已经支持的驱动实现包括:


  • cloud-spanner-r2dbc - driver for Google Cloud Spanner.
  • jasync-sql - R2DBC wrapper for Java & Kotlin Async Database Driver for MySQL and PostgreSQL (written in Kotlin).
  • r2dbc-h2 - native driver implemented for H2 as a test database.
  • r2dbc-mariadb - native driver implemented for MariaDB.
  • r2dbc-mssql - native driver implemented for Microsoft SQL Server.
  • r2dbc-mysql - native driver implemented for MySQL.
  • r2dbc-postgres - native driver implemented for PostgreSQL.


二. R2DBC 使用



在 Gradle 中配置 Spring Boot 以及 R2DBC 相关依赖的库:

implementation "io.r2dbc:r2dbc-h2:0.8.4.RELEASE"
    implementation "com.h2database:h2:1.4.200"
    implementation "org.springframework.data:spring-data-r2dbc:1.0.0.RELEASE"
    implementation "org.springframework.boot:spring-boot-starter-actuator:2.3.5.RELEASE"
    implementation "org.springframework.boot:spring-boot-starter-data-r2dbc:2.3.5.RELEASE"
    implementation "org.springframework.boot:spring-boot-starter-webflux:2.3.5.RELEASE"
    annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:2.3.5.RELEASE"
    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.1.0"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.9"


连接数据库


我们注册和配置 ConnectionFactoryInitializer bean,并通过 ConnectionFactory 来初始化数据库:

@Configuration
@EnableR2dbcRepositories
open class AppConfiguration {
    ......
    @Bean
    open fun initializer(@Qualifier("connectionFactory") connectionFactory: ConnectionFactory): ConnectionFactoryInitializer {
        val initializer = ConnectionFactoryInitializer()
        initializer.setConnectionFactory(connectionFactory)
        val populator = CompositeDatabasePopulator()
        populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("schema.sql")))
        populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("data.sql")))
        initializer.setDatabasePopulator(populator)
        return initializer
    }
}


这种初始化的支持是由 Spring Boot R2DBC 自动配置的,通过 schema.sql 以及 data.sql 配置到 ConnectionFactory。


基于 routing function 模式创建接口


WebFlux 提供了2种开发模式,一种是传统的基于注解的开发模式,使用 Controller + 注解进行开发。另一种是 routing function 模式,使用函数式的编程风格。


routing function 模式主要使用 HandlerFunction 和 RouterFunction。


  • HandlerFunction 表示一个函数,该函数为路由到它们的请求生成响应。
  • RouterFunction 可以替代 @RequestMapping 注释。 我们可以使用它将请求路由到处理程序函数。


他们就像使用带注解的 Controller 一样,只不过 http method 是通过响应式来构建的。

coRouter() 允许使用 Kotlin DSL 以及 Coroutines 轻松创建 RouterFunction。例如:

@Configuration
@EnableR2dbcRepositories
open class AppConfiguration {
    @Bean
    open fun userRoute(userHandler: UserHandler) = coRouter {
        GET("/users", userHandler::findAll)
        GET("/users/search", userHandler::search)
        GET("/users/{id}", userHandler::findUser)
        POST("/users", userHandler::addUser)
        PUT("/users/{id}", userHandler::updateUser)
        DELETE("/users/{id}", userHandler::deleteUser)
    }
    ......
}


创建 HandlerFunctions


UserHandler 是它们的 HandlerFunction 的集合,Handler 有点类似于 Service:

@Component
class UserHandler {
    private val logger = LoggerFactory.getLogger(UserHandler::class.java)
    @Autowired
    lateinit var service: UserService
    suspend fun findAll(request: ServerRequest): ServerResponse {
        val users = service.findAll()
        return ServerResponse.ok().json().bodyAndAwait(users)
    }
    suspend fun search(request: ServerRequest): ServerResponse {
        val criterias = request.queryParams()
        return when {
            criterias.isEmpty() -> ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Search must have query params"))
            criterias.contains("name") -> {
                val criteriaValue = criterias.getFirst("name")
                if (criteriaValue.isNullOrBlank()) {
                    ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria value"))
                } else {
                    ServerResponse.ok().json().bodyAndAwait(service.findByName(criteriaValue))
                }
            }
            criterias.contains("email") -> {
                val criteriaValue = criterias.getFirst("email")
                if (criteriaValue.isNullOrBlank()) {
                    ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria value"))
                } else {
                    ServerResponse.ok().json().bodyAndAwait(service.findByEmail(criteriaValue))
                }
            }
            else -> ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria"))
        }
    }
    suspend fun findUser(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toLongOrNull()
        return if (id == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))
        } else {
            val user = service.findById(id)
            if (user == null) ServerResponse.notFound().buildAndAwait()
            else ServerResponse.ok().json().bodyValueAndAwait(user)
        }
    }
    suspend fun addUser(request: ServerRequest): ServerResponse {
        val newUser = try {
            request.bodyToMono<UserDTO>().awaitFirstOrNull()
        } catch (e: Exception) {
            logger.error("Decoding body error", e)
            null
        }
        return if (newUser == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Invalid body"))
        } else {
            val user = service.addUser(newUser)
            if (user == null) ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).json().bodyValueAndAwait(ErrorMessage("Internal error"))
            else ServerResponse.status(HttpStatus.CREATED).json().bodyValueAndAwait(user)
        }
    }
    suspend fun updateUser(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toLongOrNull()
        return if (id == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))
        } else {
            val updateUser = try {
                request.bodyToMono<UserDTO>().awaitFirstOrNull()
            } catch (e: Exception) {
                logger.error("Decoding body error", e)
                null
            }
            if (updateUser == null) {
                ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Invalid body"))
            } else {
                val user = service.updateUser(id, updateUser)
                if (user == null) ServerResponse.status(HttpStatus.NOT_FOUND).json().bodyValueAndAwait(ErrorMessage("Resource $id not found"))
                else ServerResponse.status(HttpStatus.OK).json().bodyValueAndAwait(user)
            }
        }
    }
    suspend fun deleteUser(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toLongOrNull()
        return if (id == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))
        } else {
            if (service.deleteUser(id)) ServerResponse.noContent().buildAndAwait()
            else ServerResponse.status(HttpStatus.NOT_FOUND).json().bodyValueAndAwait(ErrorMessage("Resource $id not found"))
        }
    }
}


每个 HandlerFunction 函数返回的 ServerResponse 提供了对 Http 响应的访问,可以使用 build 方法来创建。 Builder 构建器可以设置响应代码,响应标题或正文。


创建 Service


UserHandler 通过 UserService 来实现具体的业务。

@Service
class UserService {
    @Autowired
    private lateinit var userRepository: UserRepository
    suspend fun findAll() = userRepository.findAll().asFlow()
    suspend fun findById(id: Long) = userRepository.findById(id).awaitFirstOrNull()
    suspend fun findByName(name: String) = userRepository.findByName(name).asFlow()
    suspend fun findByEmail(email: String) = userRepository.findByEmail(email).asFlow()
    suspend fun addUser(user: UserDTO) = userRepository.save(user.toModel()).awaitFirstOrNull()
    suspend fun updateUser(id: Long, userDTO: UserDTO): User? {
        val user = findById(id)
        return if (user != null)
            userRepository.save(userDTO.toModel(id = id)).awaitFirstOrNull()
        else null
    }
    suspend fun deleteUser(id: Long): Boolean {
        val user = findById(id)
        return if (user != null) {
            userRepository.delete(user).awaitFirstOrNull()
            true
        } else false
    }
}


UserService 的 findAll()、findByName()、findByEmail() 返回的是 Flow<User> 对象。

这是由于 Spring Data R2DBC 的 Coroutines 扩展了响应式的基础架构,因此可以将 UserService 的方法定义为 suspend 函数并将 Flux 结果转换成 Kotlin 的 Flow 类型。


创建 Repository


而 UserService 会调用 Repository 来跟数据库打交道。在创建 Repository 之前,我们先创建实体类 User:

@Table("users")
data class User(
    @Id
    val id: Long? = null,
    val name: String,
    val password: String,
    val email: String,
)


User 类具有唯一的标识符和一些字段。有了实体类之后,我们可以创建一个合适的 Repository,如下所示:

interface UserRepository : ReactiveCrudRepository<User, Long> {
    @Query("SELECT u.* FROM users u WHERE u.name = :name")
    fun findByName(name: String): Flux<User>
    @Query("SELECT u.* FROM users u WHERE u.email = :email")
    fun findByEmail(email: String): Flux<User>
}


需要注意的是,在使用了 R2DBC 之后,就没有 ORM 了,取而代之的是响应式的方式。


运行效果


展示用户列表


image.png

用户列表.jpeg


搜索用户


image.png

搜索用户.jpeg


三. 小结



本文介绍了 R2DBC 的背景,随后介绍了 WebFlux 的  routing function 模式,以及使用 RouterFunction和HandlerFunction 创建路由以处理请求并生成响应。


当  WebFlux 和 R2DBC 配置使用时,所创建的程序每一层都是通过异步处理的数据。

相关文章
|
3月前
|
开发框架 搜索推荐 数据可视化
Django框架适合开发哪种类型的Web应用程序?
Django 框架凭借其强大的功能、稳定性和可扩展性,几乎可以适应各种类型的 Web 应用程序开发需求。无论是简单的网站还是复杂的企业级系统,Django 都能提供可靠的支持,帮助开发者快速构建高质量的应用。同时,其活跃的社区和丰富的资源也为开发者在项目实施过程中提供了有力的保障。
157 62
|
2月前
|
前端开发 安全 JavaScript
2025年,Web3开发学习路线全指南
本文提供了一条针对Dapp应用开发的学习路线,涵盖了Web3领域的重要技术栈,如区块链基础、以太坊技术、Solidity编程、智能合约开发及安全、web3.js和ethers.js库的使用、Truffle框架等。文章首先分析了国内区块链企业的技术需求,随后详细介绍了每个技术点的学习资源和方法,旨在帮助初学者系统地掌握Dapp开发所需的知识和技能。
2025年,Web3开发学习路线全指南
|
3月前
|
存储 前端开发 JavaScript
如何在项目中高效地进行 Web 组件化开发
高效地进行 Web 组件化开发需要从多个方面入手,通过明确目标、合理规划、规范开发、加强测试等一系列措施,实现组件的高效管理和利用,从而提高项目的整体开发效率和质量,为用户提供更好的体验。
54 7
|
3月前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
64 2
|
4月前
|
JSON 调度 数据库
Android面试之5个Kotlin深度面试题:协程、密封类和高阶函数
本文首发于公众号“AntDream”,欢迎微信搜索“AntDream”或扫描文章底部二维码关注,和我一起每天进步一点点。文章详细解析了Kotlin中的协程、扩展函数、高阶函数、密封类及`inline`和`reified`关键字在Android开发中的应用,帮助读者更好地理解和使用这些特性。
68 1
|
5月前
|
Android开发 开发者 Kotlin
告别AsyncTask:一招教你用Kotlin协程重构Android应用,流畅度飙升的秘密武器
【9月更文挑战第13天】随着Android应用复杂度的增加,有效管理异步任务成为关键。Kotlin协程提供了一种优雅的并发操作处理方式,使异步编程更简单直观。本文通过具体示例介绍如何使用Kotlin协程优化Android应用性能,包括网络数据加载和UI更新。首先需在`build.gradle`中添加coroutines依赖。接着,通过定义挂起函数执行网络请求,并在`ViewModel`中使用`viewModelScope`启动协程,结合`Dispatchers.Main`更新UI,避免内存泄漏。使用协程不仅简化代码,还提升了程序健壮性。
179 1
|
7月前
|
安全 Android开发 Kotlin
Android经典面试题之Kotlin延迟初始化的by lazy和lateinit有什么区别?
**Kotlin中的`by lazy`和`lateinit`都是延迟初始化技术。`by lazy`用于只读属性,线程安全,首次访问时初始化;`lateinit`用于可变属性,需手动初始化,非线程安全。`by lazy`支持线程安全模式选择,而`lateinit`适用于构造函数后初始化。选择依赖于属性特性和使用场景。**
205 5
Android经典面试题之Kotlin延迟初始化的by lazy和lateinit有什么区别?
|
6月前
|
调度 Android开发 开发者
【颠覆传统!】Kotlin协程魔法:解锁Android应用极速体验,带你领略多线程优化的无限魅力!
【8月更文挑战第12天】多线程对现代Android应用至关重要,能显著提升性能与体验。本文探讨Kotlin中的高效多线程实践。首先,理解主线程(UI线程)的角色,避免阻塞它。Kotlin协程作为轻量级线程,简化异步编程。示例展示了如何使用`kotlinx.coroutines`库创建协程,执行后台任务而不影响UI。此外,通过协程与Retrofit结合,实现了网络数据的异步加载,并安全地更新UI。协程不仅提高代码可读性,还能确保程序高效运行,不阻塞主线程,是构建高性能Android应用的关键。
86 4
|
7月前
|
安全 Android开发 Kotlin
Android经典面试题之Kotlin中常见作用域函数
**Kotlin作用域函数概览**: `let`, `run`, `with`, `apply`, `also`. `let`安全调用并返回结果; `run`在上下文中执行代码并返回结果; `with`执行代码块,返回结果; `apply`配置对象后返回自身; `also`附加操作后返回自身
74 8
|
7月前
|
安全 Java Android开发
探索Android应用开发中的Kotlin语言
【7月更文挑战第19天】在移动应用开发的浩瀚宇宙中,Kotlin这颗新星以其简洁、安全与现代化的特性,正迅速在Android开发者之间获得青睐。从基本的语法结构到高级的编程技巧,本文将引导读者穿梭于Kotlin的世界,揭示其如何优化Android应用的开发流程并提升代码的可读性与维护性。我们将一起探究Kotlin的核心概念,包括它的数据类型、类和接口、可见性修饰符以及高阶函数等特性,并了解这些特性是如何在实际项目中得以应用的。无论你是刚入门的新手还是寻求进阶的开发者,这篇文章都将为你提供有价值的见解和实践指导。

热门文章

最新文章