从版本4开始,MongoDB支持 事务。事务是建立在 会话之上的,因此,需要一个活跃的 ClientSession。
除非你在你的应用程序上下文中指定一个 MongoTransactionManager,否则事务支持是 DISABLED(禁用的)。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本地 MongoDB 事务。
为了获得对事务的完全程序化控制,你可能想在 MongoOperations 上使用会话回调。
下面的例子显示了在一个 SessionCallback 中的程序化事务控制。
Example 124. 程序性事务
ClientSession session = client.startSession(options); template.withSession(session) .execute(action -> { session.startTransaction(); try { Step step = // ...; action.insert(step); process(step); action.update(Step.class).apply(Update.set("state", // ... session.commitTransaction(); } catch (RuntimeException e) { session.abortTransaction(); } }, ClientSession::close)
获得一个新的 ClientSession。
开始事务。
如果一切按预期进行,就提交修改。
出现意外,所以要回滚一切。
完成后不要忘记关闭会话。
前面的例子让你完全控制事务行为,同时在回调中使用会话范围的 MongoOperations 实例,以确保会话被传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来消除手动事务流的一些噪音。
1. 事务和TransactionTemplate
Spring Data MongoDB事务支持一个 TransactionTemplate。下面的例子展示了如何创建和使用 TransactionTemplate。
Example 125. 事务和 TransactionTemplate
template.setSessionSynchronization(ALWAYS); // ... TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); txTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { Step step = // ...; template.insert(step); process(step); template.update(Step.class).apply(Update.set("state", // ... }; });
在 Template API 配置中启用事务同步。
使用提供的 PlatformTransactionManager 创建 TransactionTemplate。
在回调中,ClientSession 和事务已经被注册。
在运行期间改变 MongoTemplate 的状态(就像你可能认为在前面列表的第1项中可能发生的那样)会导致线程和可见性问题。
2. 事务和MongoTransactionManager
MongoTransactionManager 是通往众所周知的Spring事务支持的网关。它可以让应用程序使用 Spring的事务托管功能。MongoTransactionManager 将一个 ClientSession 绑定到线程上。MongoTemplate 会检测会话,并相应地对这些与事务相关的资源进行操作。MongoTemplate 也可以参与到其他正在进行的事务中。下面的例子展示了如何用 MongoTransactionManager 创建和使用事务。
Example 126. 事务和 MongoTransactionManager
@Configuration static class Config extends AbstractMongoClientConfiguration { @Bean MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { return new MongoTransactionManager(dbFactory); } // ... } @Component public class StateService { @Transactional void someBusinessFunction(Step step) { template.insert(step); process(step); template.update(Step.class).apply(Update.set("state", // ... }; });
在应用 application context 中注册 MongoTransactionManager。
将方法标记为事务性。
@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
3. 响应式事务
与支持响应式 ClientSession 一样,ReactiveMongoTemplate 提供了专门的方法,用于在事务中进行操作,而不必担心根据操作结果提交或停止操作。
除非你在你的 application context 中指定一个 ReactiveMongoTransactionManager,否则事务支持是 DISABLED(禁用的)。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本地MongoDB事务。
使用普通的MongoDB响应式驱动API,在一个事务性流程中的 delete 可能看起来像这样。
Example 127. 原生驱动的支持
Mono<DeleteResult> result = Mono .from(client.startSession()) .flatMap(session -> { session.startTransaction(); return Mono.from(collection.deleteMany(session, ...)) .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) .doFinally(signal -> session.close()); });
首先,我们显然需要启动session。
一旦我们有了 ClientSession,就开始事务。
通过向操作传递 ClientSession,在事务中进行操作。
如果操作异常完成,我们需要停止事务并保留错误。
当然,也可以在成功的情况下提交更改。仍然保留操作结果。
最后,我们需要确保关闭会话。
上述操作的罪魁祸首是在保留 main flow DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了相当复杂的设置。
4. 事务和TransactionalOperator
Spring Data MongoDB事务支持一个 TransactionalOperator。下面的例子展示了如何创建和使用一个 TransactionalOperator。
Example 128. 事务和 TransactionalOperator
template.setSessionSynchronization(ALWAYS); // ... TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager, new DefaultTransactionDefinition()); Step step = // ...; template.insert(step); Mono<Void> process(step) .then(template.update(Step.class).apply(Update.set("state", …)) .as(rxtx::transactional) .then();
为事务性参与启用事务同步。
使用提供的 ReactiveTransactionManager 创建 TransactionalOperator。
TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。
5. 事务和ReactiveMongoTransactionManager
ReactiveMongoTransactionManager 是通往众所周知的 Spring事务支持 的网关。它允许应用程序利用Spring的管理事务功能。ReactiveMongoTransactionManager 将 ClientSession 绑定到 subscriber Context。ReactiveMongoTemplate 会检测会话,并对这些与事务相关的资源进行相应操作。 ReactiveMongoTemplate 也可以参与其他正在进行的事务。下面的例子展示了如何用 ReactiveMongoTransactionManager 创建和使用事务。
Example 129. 事务和 ReactiveMongoTransactionManager
@Configuration public class Config extends AbstractReactiveMongoConfiguration { @Bean ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { return new ReactiveMongoTransactionManager(factory); } // ... } @Service public class StateService { @Transactional Mono<UpdateResult> someBusinessFunction(Step step) { return template.insert(step) .then(process(step)) .then(template.update(Step.class).apply(Update.set("state", …)); }; });
在application context中注册 ReactiveMongoTransactionManager。
将方法标记为事务性的。
@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
6. 事务内部的特殊行为
在事务内部,MongoDB服务器有一个稍微不同的行为。
Connection Settings
MongoDB驱动提供了一个专门的副本集名称配置选项,使驱动进入自动检测模式。这个选项有助于识别主要的副本集节点和事务中的命令路由。
确保在MongoDB的URI中添加 replicaSet。请参考 连接字符串选项 以了解更多细节。
Collection Operations
MongoDB不支持集合操作,例如在事务中创建集合。这也会影响到第一次使用时发生的即时集合创建。因此,请确保所有需要的结构都已到位。
Transient Errors
MongoDB可以为在事务性操作中出现的错误添加特殊标签。这些标签可能表示暂时性的故障,这些故障可能通过重试操作而消失。我们强烈推荐 Spring Retry 用于这些目的。然而,我们可以覆写 MongoTransactionManager#doCommit(MongoTransactionObject),以实现MongoDB参考手册中所述的重试提交操作行为。
Count
MongoDB的 count 操作是基于集合统计的,可能无法反映事务中的实际情况。当在一个多文档事务中发出 count 命令时,服务器会响应 error 50851。一旦 MongoTemplate 检测到一个活动的事务,所有暴露的 count() 方法都会被转换,并使用 $match 和 $count 操作符委托给聚合框架,保留 Query 设置,如 collation。
在 aggregation count helper 中使用 geo 命令时,有一些限制。以下运算符不能使用,必须用不同的运算符代替。
- $where → $expr
- $near → $geoWithin with $center
- $nearSphere → $geoWithin with $centerSphere
使用 Criteria.near(…) 和 Criteria.nearSphere(…) 的查询必须改写为 Criteria.within(…) 各自的 Criteria.withinSphere(…)。同样适用于 repository 查询方法中的 near 查询关键字,必须改为 within。也请参见MongoDB JIRA ticket DRIVERS-518 以进一步参考。
下面的片段显示了会话绑定闭包内的 count 用法。
session.startTransaction(); template.withSession(session) .execute(action -> { action.count(query(where("state").is("active")), Step.class) ...
上面的片段具体化为以下命令。
db.collection.aggregate( [ { $match: { state: "active" } }, { $count: "totalEntityCount" } ] )
而不是。
db.collection.find( { state: "active" } ).count()
文章下方有交流学习区!一起学习进步!也可以前往官网,加入官方微信交流群 你的支持和鼓励是我创作的动力❗❗❗
Doker的成长,欢迎大家一起陪伴!!!
我发好文,兄弟们有空请把我的官方旗舰店流量撑起来!!!
官网:Doker 多克; 官方旗舰店:Doker 多克 官方旗舰店-淘宝网 全品优惠