使用Spring实现反应式事务(Reactive Transactions)

栏目: Java · 发布时间: 5年前

内容简介:本文探讨如何使用RDBC2或MongoDB来使用Spring Reactive的事务支持。在还没有加入响应式/反应式事务集成之间,Spring认为没有必须进行Reactive事务管理,因此,Spring Framework不支持Reactive @Transaction。随着时间的推移,MongoDB开始支持MongoDB Server 4.0的多文档事务,R2DBC(反应式SQL数据库驱动程序的规范)开始出现,最终在Template API中提供inTransaction(…) 方法作为执行原生本级事务的

本文探讨如何使用RDBC2或 MongoDB 来使用Spring Reactive的事务支持。

在还没有加入响应式/反应式事务集成之间,Spring认为没有必须进行Reactive事务管理,因此,Spring Framework不支持Reactive @Transaction。

随着时间的推移,MongoDB开始支持MongoDB Server 4.0的多文档事务,R2DBC(反应式 SQL 数据库驱动程序的规范)开始出现,最终在Template API中提供inTransaction(…) 方法作为执行原生本级事务的工作单元。

虽然将inTransaction(…)方法用于较小的工作块很方便,但它并不反映Spring支持事务的方式。在使用命令式编程模型时,Spring Framework允许两种事务管理安排:@Transactional和TransactionTemplate(声明性的各自的程序化事务管理)。

这两种事务管理方法都建立在PlatformTransactionManager管理事务资源事务的基础之上。PlatformTransactionManager可以是Spring提供的事务管理器实现,也可以是基于JTA的Java EE实现。

两种方法的共同之处在于它们将事务状态绑定到ThreadLocal存储,这允许事务状态管理而不传递TransactionStatus对象。事务管理应该在后台以非侵入方式进行。因为我们没有让线程继续在事务中继续有作用工作的设想,因此ThreadLocal只在命令式编程中工作。

命令式编程事务管理工作机制

事务管理需要将其事务状态与执行相关联。在命令式编程中,这通常是ThreadLocal存储 - 事务状态被绑定到一个线程,假设前提是事务代码在容器调用它的同一个线程上执行。

反应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细研究反应式执行情况,你会发现代码在不同的线程上执行。使用进程间通信时,这会更加明显。我们再也不能安全地假设我们的代码在同一个线程上完全执行了。

这种变化使的依赖ThreadLocal的事务管理实现无效。

我们需要一种不同的安排来反映事务状态,而不是一直传递一个TransactionStatus对象。

关联带外数据并不是反应空间中的新要求。我们在其他领域遇到过这种要求,例如SecurityContextSpring Security for reactive方法安全性(仅举一例)。Project Reactor是Spring自身构建其响应支持的反应库,自3.1版本开始就为订阅者的上下文提供了支持。

Reactor Context是替代ThreadLocal命令式编程的反应式编程, 上下文允许将上下文数据绑定到特定的执行。对于反应式编程,这是一个Subscription。

Reactor Context允许Spring将事务状态以及所有资源和同步绑定到特定的Subscription状态 。使用Project Reactor的所有反应代码现在都可以参与响应式事务。

反应性事务管理

从Spring Framework 5.2 M2开始,Spring通过ReactiveTransactionManagerSPI 支持响应式/反应式事务管理。

ReactiveTransactionManager是使用事务资源的反应式和非阻塞集成的事务管理抽象。它是一个会返回Publisher的反应式@Transactional方法元注解,使用TransactionalOperator实现可编程的事务管理。

两个反应式事务管理器实现是:

  • R2DBC通过Spring Data R2DBC 1.0 M2
  • MongoDB通过Spring Data MongoDB 2.2 M4

让我们来看看反应式式事务的样子:

<b>class</b> TransactionalService {

  <b>final</b> DatabaseClient db

  TransactionalService(DatabaseClient db) {
    <b>this</b>.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    <b>return</b> db.execute()
      .sql(<font>"INSERT INTO person (name, age) VALUES('Joe', 34)"</font><font>)
      .fetch().rowsUpdated()
      .then(db.execute().sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe')"</font><font>)
      .then();
  }
}
</font>

反应事务看起来非常类似于注释驱动中的命令事务,主要的区别在于我们使用DatabaseClient,这是一个反应性资源抽象。所有事务管理都在幕后进行,利用Spring的事务拦截器和ReactiveTransactionManager。

Spring基于方法返回类型分辨要应用的事务管理类型:

  • 方法返回一个Publisher类型:响应式事务管理
  • 所有其他return类型:传统的命令式事务管理

这种区别很重要,因为您仍然可以使用命令式组件,例如JPA或JDBC查询,如果将这些查询结果包装成一个Publisher类型,Spring将应用反应而不是命令式事务管理,反应式事务管理就不会打开ThreadLocal中绑定的JPA或JDBC所需的事务。

TransactionalOperator

下一步,让我们看一下编程化事务管理TransactionalOperator:

ConnectionFactory factory = …
ReactiveTransactionManager tm = <b>new</b> R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql(<font>"INSERT INTO person (name, age) VALUES('joe', 'Joe')"</font><font>)
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe')"</font><font>)
    .then())
  .as(rxtx::transactional);
</font>

上面的代码包含一些值得注意的组件:

  • R2dbcTransactionManager:这是R2DBC的反应式事务管理器ConnectionFactory。
  • DatabaseClient:客户端使用R2DBC驱动程序提供对SQL数据库的访问。
  • TransactionalOperator:此运算符将所有上游R2DBC发布者与事务上下文相关联。您可以使用操作员样式as(…::transactional)或回拨样式execute(txStatus -> …)。

订阅后会懒惰地反应式事务,operator启动事务,设置适当的隔离级别并将数据库连接与其订户上下文相关联。所有参与(上游)Publisher实例都使用一个上下文绑定事务连接。

Reactive-functional operator 链可以是线性的(通过使用单个Publisher)或非线性的(通过合并多个流)。Publisher使用operator风格样式时,反应式事务将会影响所有上游。要将事务范围限制为特定的Publishers 集合,请应用回调样式,如下所示:

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql(<font>"INSERT INTO person (name, age) VALUES('Jack', 31)"</font><font>)
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  <b>return</b> db.execute()
    .sql(</font><font>"INSERT INTO person (name, age) VALUES('Joe', 34)"</font><font>)
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql(</font><font>"INSERT INTO contacts (name) VALUES('Joe Black')"</font><font>)
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);
</font>

在上面的示例中,事务管理仅限于在execute(…)里面订阅的Publisher实例。换句话说,事务是作用域的。execute(…)中Publisher实例参与事务,并且命名outsideTransaction的Publisher在事务之外执行其工作。

Spring Data MongoDB

R2DBC是Spring与反应式的集成之一。另一个事务集成是通过Spring Data MongoDB访问MongoDB,您可以使用反应式编程来参与多文档事务。

Spring Data MongoDB是使用ReactiveMongoTransactionManager,这是一个ReactiveTransactionManager实现。它创建会话并管理事务,以便在托管事务中执行的代码参与多文档事务。

以下示例显示了MongoDB的编程事务管理:

ReactiveTransactionManager tm 
  = <b>new</b> ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.<b>class</b>)
  .apply(Update.set(<font>"state"</font><font>, …))
  .then(template.insert(EventLog.<b>class</b>).one(<b>new</b> EventLog(…))
  .as(rxtx::transactional)
  .then();
</font>

上面的代码设置一个ReactiveTransactionManager并用于TransactionalOperator在单个事务中执行多个写操作。ReactiveMongoTemplate被配置为参与反应式交易。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Servlet与JSP核心技术

Servlet与JSP核心技术

/ 人民邮电出版社 / 2001-10 / 55.00元

一起来看看 《Servlet与JSP核心技术》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具