使用Spring Data R2DBC进行异步RDBMS访问 - Lanky Dan Dev Blog

栏目: Java · 发布时间: 6年前

内容简介:不久前,JDBC驱动程序的反应变体称为R2DBC发布了,它允许数据异步流式传输到已订阅它的任何端点,结合使用像R2DBC这样的反应式驱动程序和Spring WebFlux,可以编写一个完整的响应式应用程序来异步进行数据的接收和发送。在这篇文章中,我们将重点关注数据库端:从连接到数据库,然后最终保存和检索数据。我们使用Spring Data实现数据库端的反应式应用,与所有Spring Data模块一样,它为我们提供了开箱即用的配置,可以减少我们需要编写的样板代码量以获得我们的应用程序设置。最重要的是,它在数据

不久前,JDBC驱动程序的反应变体称为R2DBC发布了,它允许数据异步流式传输到已订阅它的任何端点,结合使用像R2DBC这样的反应式驱动程序和Spring WebFlux,可以编写一个完整的响应式应用程序来异步进行数据的接收和发送。在这篇文章中,我们将重点关注数据库端:从连接到数据库,然后最终保存和检索数据。

我们使用Spring Data实现数据库端的反应式应用,与所有Spring Data模块一样,它为我们提供了开箱即用的配置,可以减少我们需要编写的样板代码量以获得我们的应用程序设置。最重要的是,它在数据库驱动程序上提供了一个层,更容易编制任务变得更容易。

对于这篇文章的内容,我使用的是Postgres数据库,当然,H2和Microsoft SQL Server都有自己的R2DBC驱动程序实现。

依赖

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-r2dbc</artifactId>
    <version>1.0.0.M1</version>
  </dependency>
  <dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>1.0.0.M6</version>
  </dependency>
  <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>repository.spring.milestone</id>
    <name>Spring Milestone Repository</name>
    <url>http:<font><i>//repo.spring.io/milestone</url></i></font><font>
  </repository>
</repositories>
</font>

使用Spring Boot的次数越多,就越习惯于使用spring-boot-starter导入单个依赖项。我希望会有spring-boot-starter-r2dbc依赖,但不幸的是,没有这样一个依赖。在编写本文时,它没有自己的Spring Boot模块,我相信未来会让R2DBC驱动程序更容易设置。目前,我们需要手动填写一些额外的依赖项。此外,R2DBC库只有Milestone版本(更多证明它们是新的)所以我们需要确保引入Spring Milestone库。

连接到数据库

感谢Spring Data为我们做了很多工作,需要手动创建的唯一Bean ConnectionFactory包含数据库的连接细节:

@Configuration
@EnableR2dbcRepositories
<b>class</b> DatabaseConfiguration(
  @Value(<font>"\${spring.data.postgres.host}"</font><font>) <b>private</b> val host: String,
  @Value(</font><font>"\${spring.data.postgres.port}"</font><font>) <b>private</b> val port: Int,
  @Value(</font><font>"\${spring.data.postgres.database}"</font><font>) <b>private</b> val database: String,
  @Value(</font><font>"\${spring.data.postgres.username}"</font><font>) <b>private</b> val username: String,
  @Value(</font><font>"\${spring.data.postgres.password}"</font><font>) <b>private</b> val password: String
) : AbstractR2dbcConfiguration() {

  override fun connectionFactory(): ConnectionFactory {
    <b>return</b> PostgresqlConnectionFactory(
      PostgresqlConnectionConfiguration.builder()
        .host(host)
        .port(port)
        .database(database)
        .username(username)
        .password(password).build()
    )
  }
}
</font>

这里要注意的第一件事是扩展AbstractR2dbcConfiguration。该类包含一堆我们不再需要手动创建的Bean。实现connectionFactory是类的唯一要求,因为创建DatabaseClientBean 需要它。

这种结构是Spring Data模块的典型结构,因此在尝试不同的模块时会感觉非常熟悉。此外,我希望一旦自动配置可用,就可以删除这个手动配置,并且只能通过自动配置application.properties驱动。

Spring可以连接到正在运行的Postgres实例的配置:

Postgres的port属性默认值5432 ;host,database,username和password是PostgresqlConnectionFactory需要的定义,缺少一个会抛出异常。

这个例子的最后一条值得注意的信息是使用@EnableR2dbcRepositories。此注释指示Spring查找扩展Spring Repository接口的任何存储库接口。这用作检测Spring Data存储库的基础接口。我们将在下一节中进一步了解这一点。要从中获取的主要信息是您需要使用@EnableR2dbcRepositories注释来充分利用Spring Data的功能。

创建Spring Data Repository

如上所述,在本节中,我们将介绍添加Spring Data Repository。这些存储库是Spring Data的一个很好的特性,这意味着您不需要编写大量额外代码来编写查询。不幸的是,至少就目前而言,Spring R2DBC不能像其他Spring Data模块那样进行推断查询(我相信这会在某些时候添加)。这意味着您需要使用@Query注释并手动编写SQL。让我们来看看:

@Repository
<b>interface</b> PersonRepository : R2dbcRepository<Person, Int> {

  @Query(<font>"SELECT * FROM people WHERE name = $1"</font><font>)
  fun findAllByName(name: String): Flux<Person>

  @Query(</font><font>"SELECT * FROM people WHERE age = $1"</font><font>)
  fun findAllByAge(age: Int): Flux<Person>
}
</font>

此接口扩展R2dbcRepository。又扩展了ReactiveCrudRepository,ReactiveCrudRepository提供标准的CRUD功能,据我所知,R2dbcRepository它不提供任何额外的功能,而是为更好的上下文命名而创建的接口。

R2dbcRepository接受两个通用参数,一个是作为输入并作为输出生成的实体类。第二个是主键的类型。因此,在这种情况下,Person类由PersonRepository(有意义)管理,内部的主键字段Person是Int。

在这个类中函数的返回类型是ReactiveCrudRepository提供的Flux和Mono,这些是Spring使用的Project Reactor类型,作为默认的Reactive Stream类型。Flux表示多个元素的流,而 Mono表示单个结果。

最后,正如我之前在示例中提到的,每个函数都使用注释@Query。语法非常简单,SQL是注释中的一个字符串。$1($2,$3等...更多输入)表示输入到函数的值。完成此操作后,Spring将处理其余内容并将输入传递到各自的输入参数中,收集结果并将其映射到存储库的指定实体类。

快速查询实体

这里不多说,只是简单地展示了Person使用的类PersonRepository。

@Table(<font>"people"</font><font>)
data <b>class</b> Person(
  @Id val id: Int? = <b>null</b>,
  val name: String,
  val age: Int
)
</font>

id已被设为可为空并提供null默认值以允许Postgres自己生成下一个合适的值。如果主键不是可空null的并且id提供了值,则Spring实际上会尝试在保存时运行更新而不是插入。

该实体将映射到people下面定义的表:

CREATE TABLE people (
  id SERIAL PRIMARY KEY, 
  name VARCHAR NOT NULL, 
  age INTEGER NOT NULL
);

看看发生什么?

现在让我们来看看它实际上在做什么。下面是一些代码,它们插入一些记录并以几种不同的方式检索它们:

@SpringBootApplication
<b>class</b> Application : CommandLineRunner {

  @Autowired
  <b>private</b> lateinit <b>var</b> personRepository: PersonRepository

  override fun run(vararg args: String?) {
    personRepository.saveAll(
      listOf(
        Person(name = <font>"Dan Newton"</font><font>, age = 25),
        Person(name = </font><font>"Laura So"</font><font>, age = 23)
      )
    ).log().subscribe()
    personRepository.findAll().subscribe { log.info(</font><font>"findAll - $it"</font><font>) }
    personRepository.findAllById(Mono.just(1)).subscribe { log.info(</font><font>"findAllById - $it"</font><font>) }
    personRepository.findAllByName(</font><font>"Laura So"</font><font>).subscribe { log.info(</font><font>"findAllByName - $it"</font><font>) }
    personRepository.findAllByAge(25).subscribe { log.info(</font><font>"findAllByAge - $it"</font><font>) }
  }
}
</font>

这段代码实际上有可能没有实际插入或读取某些记录,反应式应用程序意味着异步执行操作,因此该应用程序已开始在不同的线程中处理函数调用,不阻塞主线程,这些异步进程可能永远不会完全执行。出于这个原因,在这段代码中应该调用Thread.sleep,但我从示例中删除它们以保持一切都很整洁。

运行上面代码的输出如下所示:

2019-02-11 09:04:52.294  INFO 13226 --- [           main] reactor.Flux.ConcatMap.1                 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2019-02-11 09:04:52.295  INFO 13226 --- [           main] reactor.Flux.ConcatMap.1                 : request(unbounded)
2019-02-11 09:04:52.572  INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1                 : onNext(Person(id=35, name=Dan Newton, age=25))
2019-02-11 09:04:52.591  INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1                 : onNext(Person(id=36, name=Laura So, age=23))
2019-02-11 09:04:52.591  INFO 13226 --- [actor-tcp-nio-1] reactor.Flux.ConcatMap.1                 : onComplete()
2019-02-11 09:04:54.472  INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application    : findAll - Person(id=35, name=Dan Newton, age=25)
2019-02-11 09:04:54.473  INFO 13226 --- [actor-tcp-nio-2] com.lankydanblog.tutorial.Application    : findAll - Person(id=36, name=Laura So, age=23)
2019-02-11 09:04:54.512  INFO 13226 --- [actor-tcp-nio-4] com.lankydanblog.tutorial.Application    : findAllByName - Person(id=36, name=Laura So, age=23)
2019-02-11 09:04:54.524  INFO 13226 --- [actor-tcp-nio-5] com.lankydanblog.tutorial.Application    : findAllByAge - Person(id=35, name=Dan Newton, age=25)

说明:

  • onSubscribe和request发生在Flux调用它的主线程上。只有saveAll输出,因为它包含了log功能。将其添加到其他调用中也会记录主线程的相同结果。
  • subscribe函数中包含的执行和它们的内部步骤Flux在不同的线程上运行。

这并不是真实地表示如何在实际应用程序中使用Reactive Streams,而是希望演示如何使用它们并对它们的执行方式有一些了解。

结论

总而言之,Reactive Streams已经出现在一些RDBMS数据库中,这要归功于R2DBC驱动程序和Spring Data,它们在顶层构建了一层,使一切变得更加整洁。通过使用Spring Data R2DBC,我们可以创建与数据库的连接并开始查询它,而无需太多代码。尽管Spring已经为我们做了很多事情,但它可能会做得更多。目前,它没有Spring Boot自动配置支持。这有点烦人。但是,我确信有人会尽快做到这一点,并使一切都比现在更好。

这篇文章中使用的代码可以在我的 GitHub上找到 。​​​​​​​


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Out of their Minds

Out of their Minds

Dennis Shasha、Cathy Lazere / Springer / 1998-07-02 / USD 16.00

This best-selling book is now available in an inexpensive softcover format. Imagine living during the Renaissance and being able to interview that eras greatest scientists about their inspirations, di......一起来看看 《Out of their Minds》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具