深入理解r2dbc-mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 深入理解r2dbc-mysql

目录



简介



mysql应该是我们在日常工作中使用到的一个非常普遍的数据库,虽然mysql现在是oracle公司的,但是它是开源的,市场占有率还是非常高的。


今天我们将会介绍r2dbc在mysql中的使用。


r2dbc-mysql的maven依赖



要想使用r2dbc-mysql,我们需要添加如下的maven依赖:


<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2.RELEASE</version>
</dependency>


当然,如果你想使用snapshot版本的话,可以这样:


<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>${r2dbc-mysql.version}.BUILD-SNAPSHOT</version>
</dependency>
<repository>
    <id>sonatype-snapshots</id>
    <name>SonaType Snapshots</name>
    <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots>
        <enabled>true</enabled>
    </snapshots>
</repository>


创建connectionFactory


创建connectionFactory的代码实际上使用的r2dbc的标准接口,所以和之前讲到的h2的创建代码基本上是一样的:


// Notice: the query string must be URL encoded
ConnectionFactory connectionFactory = ConnectionFactories.get(
    "r2dbcs:mysql://root:database-password-in-here@127.0.0.1:3306/r2dbc?" +
    "zeroDate=use_round&" +
    "sslMode=verify_identity&" +
    "useServerPrepareStatement=true&" +
    "tlsVersion=TLSv1.3%2CTLSv1.2%2CTLSv1.1&" +
    "sslCa=%2Fpath%2Fto%2Fmysql%2Fca.pem&" +
    "sslKey=%2Fpath%2Fto%2Fmysql%2Fclient-key.pem&" +
    "sslCert=%2Fpath%2Fto%2Fmysql%2Fclient-cert.pem&" +
    "sslKeyPassword=key-pem-password-in-here"
)
// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


不同的是ConnectionFactories传入的参数不同。


我们也支持unix domain socket的格式:


// Minimum configuration for unix domain socket
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:mysql://root@unix?unixSocket=%2Fpath%2Fto%2Fmysql.sock")
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


同样的,我们也支持从ConnectionFactoryOptions中创建ConnectionFactory:


ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
    .option(DRIVER, "mysql")
    .option(HOST, "127.0.0.1")
    .option(USER, "root")
    .option(PORT, 3306)  // optional, default 3306
    .option(PASSWORD, "database-password-in-here") // optional, default null, null means has no password
    .option(DATABASE, "r2dbc") // optional, default null, null means not specifying the database
    .option(CONNECT_TIMEOUT, Duration.ofSeconds(3)) // optional, default null, null means no timeout
    .option(SSL, true) // optional, default sslMode is "preferred", it will be ignore if sslMode is set
    .option(Option.valueOf("sslMode"), "verify_identity") // optional, default "preferred"
    .option(Option.valueOf("sslCa"), "/path/to/mysql/ca.pem") // required when sslMode is verify_ca or verify_identity, default null, null means has no server CA cert
    .option(Option.valueOf("sslCert"), "/path/to/mysql/client-cert.pem") // optional, default null, null means has no client cert
    .option(Option.valueOf("sslKey"), "/path/to/mysql/client-key.pem") // optional, default null, null means has no client key
    .option(Option.valueOf("sslKeyPassword"), "key-pem-password-in-here") // optional, default null, null means has no password for client key (i.e. "sslKey")
    .option(Option.valueOf("tlsVersion"), "TLSv1.3,TLSv1.2,TLSv1.1") // optional, default is auto-selected by the server
    .option(Option.valueOf("sslHostnameVerifier"), "com.example.demo.MyVerifier") // optional, default is null, null means use standard verifier
    .option(Option.valueOf("sslContextBuilderCustomizer"), "com.example.demo.MyCustomizer") // optional, default is no-op customizer
    .option(Option.valueOf("zeroDate"), "use_null") // optional, default "use_null"
    .option(Option.valueOf("useServerPrepareStatement"), true) // optional, default false
    .option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
    .option(Option.valueOf("tcpNoDelay"), true) // optional, default false
    .option(Option.valueOf("autodetectExtensions"), false) // optional, default false
    .build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


或者下面的unix domain socket格式:


// Minimum configuration for unix domain socket
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
    .option(DRIVER, "mysql")
    .option(Option.valueOf("unixSocket"), "/path/to/mysql.sock")
    .option(USER, "root")
    .build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


使用MySqlConnectionFactory创建connection



上面的例子中,我们使用的是通用的r2dbc api来创建connection,同样的,我们也可以使用特有的MySqlConnectionFactory来创建connection:


MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
    .host("127.0.0.1")
    .user("root")
    .port(3306) // optional, default 3306
    .password("database-password-in-here") // optional, default null, null means has no password
    .database("r2dbc") // optional, default null, null means not specifying the database
    .serverZoneId(ZoneId.of("Continent/City")) // optional, default null, null means query server time zone when connection init
    .connectTimeout(Duration.ofSeconds(3)) // optional, default null, null means no timeout
    .sslMode(SslMode.VERIFY_IDENTITY) // optional, default SslMode.PREFERRED
    .sslCa("/path/to/mysql/ca.pem") // required when sslMode is VERIFY_CA or VERIFY_IDENTITY, default null, null means has no server CA cert
    .sslCert("/path/to/mysql/client-cert.pem") // optional, default has no client SSL certificate
    .sslKey("/path/to/mysql/client-key.pem") // optional, default has no client SSL key
    .sslKeyPassword("key-pem-password-in-here") // optional, default has no client SSL key password
    .tlsVersion(TlsVersions.TLS1_3, TlsVersions.TLS1_2, TlsVersions.TLS1_1) // optional, default is auto-selected by the server
    .sslHostnameVerifier(MyVerifier.INSTANCE) // optional, default is null, null means use standard verifier
    .sslContextBuilderCustomizer(MyCustomizer.INSTANCE) // optional, default is no-op customizer
    .zeroDateOption(ZeroDateOption.USE_NULL) // optional, default ZeroDateOption.USE_NULL
    .useServerPrepareStatement() // Use server-preparing statements, default use client-preparing statements
    .tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
    .tcpNoDelay(true) // optional, controls TCP No Delay, default is false
    .autodetectExtensions(false) // optional, controls extension auto-detect, default is true
    .extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
    .build();
ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);
// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


或者下面的unix domain socket方式:


// Minimum configuration for unix domain socket
MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
    .unixSocket("/path/to/mysql.sock")
    .user("root")
    .build();
ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


执行statement


首先看一个简单的不带参数的statement:


connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')")
    .execute(); // return a Publisher include one Result


然后看一个带参数的statement:


connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
    .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
    .bind("name", "Some one") // Not one-to-one binding, call twice of native index-bindings, or call once of name-bindings.
    .add()
    .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
    .bind(1, "My Nickname")
    .bind(2, "Naming show")
    .returnGeneratedValues("generated_id")
    .execute(); // return a Publisher include two Results.


注意,如果参数是null的话,可以使用bindNull来进行null值的绑定。


接下来我们看一个批量执行的操作:


connection.createBatch()
    .add("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')")
    .add("UPDATE `earth` SET `count` = `count` + 1 WHERE `id` = 'human'")
    .execute(); // return a Publisher include two Results.


执行事务


我们看一个执行事务的例子:


connection.beginTransaction()
    .then(Mono.from(connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')").execute()))
    .flatMap(Result::getRowsUpdated)
    .thenMany(connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
        .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
        .bind("name", "Some one")
        .add()
        .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
        .bind(1, "My Nickname")
        .bind(2, "Naming show")
        .returnGeneratedValues("generated_id")
        .execute())
    .flatMap(Result::getRowsUpdated)
    .then(connection.commitTransaction());


使用线程池



为了提升数据库的执行效率,减少建立连接的开销,一般数据库连接都会有连接池的概念,同样的r2dbc也有一个叫做r2dbc-pool的连接池。


r2dbc-pool的依赖:


<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-pool</artifactId>
  <version>${version}</version>
</dependency>


如果你想使用snapshot版本,也可以这样指定:


<dependency>
  <groupId>io.r2dbc</groupId>
  <artifactId>r2dbc-pool</artifactId>
  <version>${version}.BUILD-SNAPSHOT</version>
</dependency>
<repository>
  <id>spring-libs-snapshot</id>
  <name>Spring Snapshot Repository</name>
  <url>https://repo.spring.io/libs-snapshot</url>
</repository>


看一下怎么指定数据库连接池:


ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:pool:<my-driver>://<host>:<port>/<database>[?maxIdleTime=PT60S[&…]");
Publisher<? extends Connection> connectionPublisher = connectionFactory.create();


可以看到,我们只需要在连接URL上面添加pool这个driver即可。


同样的,我们也可以通过ConnectionFactoryOptions来创建:


ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
   .option(DRIVER, "pool")
   .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
   .option(HOST, "…")
   .option(PORT, "…") 
   .option(USER, "…")
   .option(PASSWORD, "…")
   .option(DATABASE, "…")
   .build());
Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
// Alternative: Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());


最后, 你也可以直接通过创建ConnectionPoolConfiguration来使用线程池:


ConnectionFactory connectionFactory = …;
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
   .maxIdleTime(Duration.ofMillis(1000))
   .maxSize(20)
   .build();
ConnectionPool pool = new ConnectionPool(configuration);
Mono<Connection> connectionMono = pool.create();
// later
Connection connection = …;
Mono<Void> release = connection.close(); // released the connection back to the pool
// application shutdown
pool.dispose();
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
分布式计算 Java 数据库连接
了解Spring R2DBC的声明式事务实现机制
# Spring非反应式事务实现原理 Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。 # 基于反应式的Spring事务有何不同 Spring的反应式实现是基于Reactor框架,该框架
2982 0
|
缓存 NoSQL Java
Spring Boot 3 整合 Spring Cache 与 Redis 缓存实战
Spring Boot 3 整合 Spring Cache 与 Redis 缓存实战
|
11月前
|
Java API 数据库
如何使用Spring Boot构建RESTful API,以在线图书管理系统为例
【10月更文挑战第9天】本文介绍了如何使用Spring Boot构建RESTful API,以在线图书管理系统为例,从项目搭建、实体类定义、数据访问层创建、业务逻辑处理到RESTful API的实现,详细展示了每个步骤。通过Spring Boot的简洁配置和强大功能,开发者可以高效地开发出功能完备、易于维护的Web应用。
279 3
|
SQL druid Java
解决 ‘The last packet successfully received from the server was xxx milliseconds ago‘ 问题
解决 ‘The last packet successfully received from the server was xxx milliseconds ago‘ 问题
6228 0
|
Kubernetes Cloud Native Java
探索Quarkus:Java的新一代高性能轻量级框架
探索Quarkus:Java的新一代高性能轻量级框架
4894 3
|
IDE Java Maven
【Java】已解决:java.lang.NoSuchMethodError异常
【Java】已解决:java.lang.NoSuchMethodError异常
5387 0
|
消息中间件 存储 缓存
一文快速掌握高性能内存队列Disruptor
`Disruptor`是LMAX公司开源的高性能内存消息队列,单线程处理能力可达600w订单/秒。本文从使用和设计角度探讨这款Java消息队列。作者sharkChili是Java开发者,CSDN博客专家,Java Guide项目维护者。文章介绍了Disruptor的基础使用,包括前置步骤、消息模型、消息处理器配置、生产者实现,并展示了效果。同时,文章详细解析了Disruptor的工作流程和高效原因,如无锁操作、分支预测和缓存填充。最后,作者提供相关资源链接并邀请读者加入交流群。
2573 0
r2dbc分页条件查询
r2dbc分页条件查询
310 0
|
监控 Dubbo Java
超详细的Sentinel入门
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
超详细的Sentinel入门