使用Spring实现反应式事务(Reactive Transactions)

栏目: Java · 发布时间: 6年前

内容简介:本文探讨如何使用RDBC2或MongoDB来使用Spring Reactive的事务支持。在还没有加入响应式/反应式事务集成之间,Spring认为没有必须进行Reactive事务管理,因此,Spring Framework不支持Reactive @Transaction。随着时间的推移,MongoDB开始支持MongoDB Server 4.0的多文档事务,R2DBC(反应式SQL数据库驱动程序的规范)开始出现,最终在Template API中提供inTransaction(…) 方法作为执行原生本级事务的

本文探讨如何使用RDBC2或 MongoDB 来使用Spring Reactive的事务支持。

在还没有加入响应式/反应式事务集成之间,Spring认为没有必须进行Reactive事务管理,因此,Spring Framework不支持Reactive @Transaction。

随着时间的推移,MongoDB开始支持MongoDB Server 4.0的多文档事务,R2DBC(反应式 SQL 数据库驱动程序的规范)开始出现,最终在Template API中提供inTransaction(…) 方法作为执行原生本级事务的工作单元。

虽然将inTransaction(…)方法用于较小的工作块很方便,但它并不反映Spring支持事务的方式。在使用命令式编程模型时,Spring Framework允许两种事务管理安排:@Transactional和TransactionTemplate(声明性的各自的程序化事务管理)。

这两种事务管理方法都建立在PlatformTransactionManager管理事务资源事务的基础之上。PlatformTransactionManager可以是Spring提供的事务管理器实现,也可以是基于JTA的Java EE实现。

两种方法的共同之处在于它们将事务状态绑定到ThreadLocal存储,这允许事务状态管理而不传递TransactionStatus对象。事务管理应该在后台以非侵入方式进行。因为我们没有让线程继续在事务中继续有作用工作的设想,因此ThreadLocal只在命令式编程中工作。

命令式编程事务管理工作机制

事务管理需要将其事务状态与执行相关联。在命令式编程中,这通常是ThreadLocal存储 - 事务状态被绑定到一个线程,假设前提是事务代码在容器调用它的同一个线程上执行。

反应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细研究反应式执行情况,你会发现代码在不同的线程上执行。使用进程间通信时,这会更加明显。我们再也不能安全地假设我们的代码在同一个线程上完全执行了。

这种变化使的依赖ThreadLocal的事务管理实现无效。

我们需要一种不同的安排来反映事务状态,而不是一直传递一个TransactionStatus对象。

关联带外数据并不是反应空间中的新要求。我们在其他领域遇到过这种要求,例如SecurityContextSpring Security for reactive方法安全性(仅举一例)。Project Reactor是Spring自身构建其响应支持的反应库,自3.1版本开始就为订阅者的上下文提供了支持。

Reactor Context是替代ThreadLocal命令式编程的反应式编程, 上下文允许将上下文数据绑定到特定的执行。对于反应式编程,这是一个Subscription。

Reactor Context允许Spring将事务状态以及所有资源和同步绑定到特定的Subscription状态 。使用Project Reactor的所有反应代码现在都可以参与响应式事务。

反应性事务管理

从Spring Framework 5.2 M2开始,Spring通过ReactiveTransactionManagerSPI 支持响应式/反应式事务管理。

ReactiveTransactionManager是使用事务资源的反应式和非阻塞集成的事务管理抽象。它是一个会返回Publisher的反应式@Transactional方法元注解,使用TransactionalOperator实现可编程的事务管理。

两个反应式事务管理器实现是:

  • R2DBC通过Spring Data R2DBC 1.0 M2
  • MongoDB通过Spring Data MongoDB 2.2 M4

让我们来看看反应式式事务的样子:

<b>class</b> TransactionalService {

  <b>final</b> DatabaseClient db

  TransactionalService(DatabaseClient db) {
    <b>this</b>.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    <b>return</b> db.execute()
      .sql(<font>"INSERT INTO person (name, age) VALUES('Joe', 34)"</font><font>)
      .fetch().rowsUpdated()
      .then(db.execute().sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe')"</font><font>)
      .then();
  }
}
</font>

反应事务看起来非常类似于注释驱动中的命令事务,主要的区别在于我们使用DatabaseClient,这是一个反应性资源抽象。所有事务管理都在幕后进行,利用Spring的事务拦截器和ReactiveTransactionManager。

Spring基于方法返回类型分辨要应用的事务管理类型:

  • 方法返回一个Publisher类型:响应式事务管理
  • 所有其他return类型:传统的命令式事务管理

这种区别很重要,因为您仍然可以使用命令式组件,例如JPA或JDBC查询,如果将这些查询结果包装成一个Publisher类型,Spring将应用反应而不是命令式事务管理,反应式事务管理就不会打开ThreadLocal中绑定的JPA或JDBC所需的事务。

TransactionalOperator

下一步,让我们看一下编程化事务管理TransactionalOperator:

ConnectionFactory factory = …
ReactiveTransactionManager tm = <b>new</b> R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql(<font>"INSERT INTO person (name, age) VALUES('joe', 'Joe')"</font><font>)
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe')"</font><font>)
    .then())
  .as(rxtx::transactional);
</font>

上面的代码包含一些值得注意的组件:

  • R2dbcTransactionManager:这是R2DBC的反应式事务管理器ConnectionFactory。
  • DatabaseClient:客户端使用R2DBC驱动程序提供对SQL数据库的访问。
  • TransactionalOperator:此运算符将所有上游R2DBC发布者与事务上下文相关联。您可以使用操作员样式as(…::transactional)或回拨样式execute(txStatus -> …)。

订阅后会懒惰地反应式事务,operator启动事务,设置适当的隔离级别并将数据库连接与其订户上下文相关联。所有参与(上游)Publisher实例都使用一个上下文绑定事务连接。

Reactive-functional operator 链可以是线性的(通过使用单个Publisher)或非线性的(通过合并多个流)。Publisher使用operator风格样式时,反应式事务将会影响所有上游。要将事务范围限制为特定的Publishers 集合,请应用回调样式,如下所示:

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql(<font>"INSERT INTO person (name, age) VALUES('Jack', 31)"</font><font>)
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  <b>return</b> db.execute()
    .sql(</font><font>"INSERT INTO person (name, age) VALUES('Joe', 34)"</font><font>)
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe Black')"</font><font>)
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);
</font>

在上面的示例中,事务管理仅限于在execute(…)里面订阅的Publisher实例。换句话说,事务是作用域的。execute(…)中Publisher实例参与事务,并且命名outsideTransaction的Publisher在事务之外执行其工作。

Spring Data MongoDB

R2DBC是Spring与反应式的集成之一。另一个事务集成是通过Spring Data MongoDB访问MongoDB,您可以使用反应式编程来参与多文档事务。

Spring Data MongoDB是使用ReactiveMongoTransactionManager,这是一个ReactiveTransactionManager实现。它创建会话并管理事务,以便在托管事务中执行的代码参与多文档事务。

以下示例显示了MongoDB的编程事务管理:

ReactiveTransactionManager tm 
  = <b>new</b> ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.<b>class</b>)
  .apply(Update.set(<font>"state"</font><font>, …))
  .then(template.insert(EventLog.<b>class</b>).one(<b>new</b> EventLog(…))
  .as(rxtx::transactional)
  .then();
</font>

上面的代码设置一个ReactiveTransactionManager并用于TransactionalOperator在单个事务中执行多个写操作。ReactiveMongoTemplate被配置为参与反应式交易。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Computational Geometry

Computational Geometry

Mark de Berg、Otfried Cheong、Marc van Kreveld、Mark Overmars / Springer / 2008-4-16 / USD 49.95

This well-accepted introduction to computational geometry is a textbook for high-level undergraduate and low-level graduate courses. The focus is on algorithms and hence the book is well suited for st......一起来看看 《Computational Geometry》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试