Akka Persistence、Kafka、Cassandra实现CQRS和E

栏目: Scala · 发布时间: 7年前

内容简介:Akka Persistence是一个相当不错的事件溯源EventSourcing实现。当您选择EventSourcing架构时,自然也将采用CQRS方法。因为你不希望将读查询等操作变成持久写操作的额外的工作,读查询应该作为单独的访问路径完成。当然代价就是换来最终一致性,所以最大的问题是:如何有效更新读取模型?让我们从一些问题开始:1.读模型应基于持久保存的事件进行更新,2.必须保证没有事件可以丢失,

Akka Persistence是一个相当不错的事件溯源EventSourcing实现。当您选择EventSourcing架构时,自然也将采用CQRS方法。因为你不希望将读查询等操作变成持久写操作的额外的工作,读查询应该作为单独的访问路径完成。当然代价就是换来最终一致性,所以最大的问题是:如何有效更新读取模型?让我们从一些问题开始:

1.读模型应基于持久保存的事件进行更新,

2.必须保证没有事件可以丢失,

3.事件顺序必须保证。

前两点非常明显,如果您不关心事件顺序,则根本不需要事件源。选择正确的策略在很大程度上取决于你的领域(你拥有多少持久Actor,你正在制造多少事件等等)以及用于存储事件的底层数据库。在写这篇文章的时候,我选择存储事件(和快照)的武器是Apache Cassandra--一个高度可扩展的分布式数据库。现有的Cassandra插件已经多次证明它是稳定的并且可以生产。有些传闻说Scylla是更有效的存储,但它仍然处于研发阶段。

通常,你有很多不同的读模型。其中一些比其他更重要。这就是为什么我们这个解决方案可以独立扩展到每个读模型的原因。

Akka Persistent Query

第一种方法非常简单,我们可以使用Akka堆栈中的内置解决方案,即Persistence Query。这个想法如下:

1.连接到事件日志并将事件作为流提供。

2.更新读取模型。

3.保存处理后的事件序列号。

val eventJournal  = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

eventJournal
  .eventsByPersistenceId(persistenceId, startingSequenceNr, Long.MaxValue)
  .via(updateSingleReadModel)
  .mapAsync(1)(saveSequnceNr)

保存序列号对于恢复阶段是必需的,这样您就不会从头开始处理事件。

简单而优雅的解决方案,对吗?不幸的是不是响应式Rective的。默认情况下Cassandra时间间隔为3秒。起初,这可能很好,但假设生产中有10000个Persistence Actor,这实际上是一个非常小的数量,但足以杀死你的应用程序。对于每个Persistence Actor,您将需要启动一个流,请相信我10000个流不是最好的主意。实际上,如果要独立更新读取模型,您应该将Persistence Actor的数量乘以读取模型的数量。

相比采用eventsByPersistenceId,您应该标记您的事件并使用eventsByTag查询。在大多数情况下,这工作得很好,但你可能会面临事件分发的问题。假设大部分事件是由Persistence Actor的1%产生的。这可能导致其他99% Persistence Actor的事件处理滞后,因为所有事件都将在同一个源中。对此的解决方案可能是某种形式的分片标记,就像在Lagom实现中一样。

不幸的是,没有任何技巧可以解决持久查询(即轮询数据)的主要问题。3秒滞后在其他情况下不成问题,对于某些情况即使0.5秒也不可接受。太小的间隔也会为底层数据库创建不必要的负载。该分析其他选项了。

CDC

如何直接从数据库流式传输数据?Cassandra(与大多数数据库一样)支持CDC机制。理论上,我们可以轻松连接到更改日志并使用它来更新读取模型。太好了,但这里有一个问题。Cassandra是一个分布式数据库,因此每个数据库节点都有一个单独的CDC日志文件并管理这些日志以确保事件顺序将是一场噩梦。此时,CDC应该被认为是“有害的”。

Kafka作为一个数据库

如果从Cassandra的读取效率是一个大问题,也许我们可以使用像卡夫卡这样的消息队列作为事件存储?从卡夫卡中读取事件流将会(在设计上)非常有效。每个读模型将由不同的卡夫卡消费者更新,这可能是一个独立的过程,完全自主,独立和独立可扩展。整个概念在我们之前的博客文章中有详细描述。对于某些应用,这种方法可以顺利运行。但是,在某些情况下,Kafka(或实际上任何消息队列)作为数据库可能带来很多其他问题:

1.快照管理。

2.Retention滞留管理(Retention应该可能被禁用)。

3.Kafka分区 - 为了保持顺序,来自聚合的所有事件必须放入单个分区,该分区必须适合单个节点。在一些重载情况下,这可能是一个阻碍或需要解决的挑战。

4.Akka Persistence不支持。

卡桑德拉,卡夫卡和至少一次交付

如何将两个概念结合在一起?Cassandra用于存储事件(真相的来源)和Kafka作为读取模型处理器的附加层。

Akka Persistence、Kafka、Cassandra实现CQRS和E

理论上,这是完美的。唯一的问题是如何有效地把事件发给卡夫卡。像eventsByPersistenceId或eventsByTag(如上所述)持久性查询取代更新读取模型可用于向卡夫卡发送事件,但滞后和分发问题仍将保留。

另一种方法可能是在更新状态阶段之后立即向Kafka发送事件。Persistence Actor的算法很简单:

1.接收命令

2.保存事件(S)

3.更新状态

4.发送事件给卡夫卡

我们来看看可能存在的问题。为了确保事件顺序,发送给Kafka必须阻塞整个Actor,这当然是一个坏主意,因为它会降低Persistent Actor的性能。我们可以将卡夫卡生产者委托给一名独立的child actor(我们称之为KafkaSender)。太棒了,但是我们必须确保Actor之间的消息传递。这可以通过使用AtLeastOnceDelivery特征来完成。你可能觉得没有什么是免费的。确实,更多的消息在我们的集群中循环传播,我们会失去顺序:

至少一次投递意味着原始邮件发送顺序并不总是保留。

考虑使用一些事件缓冲机制以确保事件顺序?请停止!至少一次交付可以不同的、更乐观的方式完成。您可以将事件发送到KafkaSender而无需确认发送,但您需要监视事件序列号。如果序列号有任何差异,则需要采取额外的措施:

1.如果序列号低于当前序列号 - 事件已经处理完毕,可以跳过

2.在间隔高于1的情况下,存储消息并启动eventsByPersistenceId以填补空白,并清除挂起的消息

当然,当前序列号应该在KafkaSender失败或持久Actor重启后保持并恢复。听起来很复杂?诚然,但没有人说高可扩展性是便宜的。

结论

在你跳到最后一个最真实,最有效,可扩展但最难实施的解决方案之前,如果你确实需要这种级别的可扩展性(以及不可避免的复杂性),请问问你自己。我看到使用事务性事件持久和读模型更新的事件采购实现了。只要能满足你预期的延迟和吞吐量,这种方法就非常好。但是,如果您需要将事件溯源架构压缩到最后一毫秒,我希望这篇文章能够帮助您找到最适合您的用例的解决方案。

[该贴被banq于2018-06-29 19:39修改过]


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java Message Service API Tutorial and Reference

Java Message Service API Tutorial and Reference

Hapner, Mark; Burridge, Rich; Sharma, Rahul / 2002-2 / $ 56.49

Java Message Service (JMS) represents a powerful solution for communicating between Java enterprise applications, software components, and legacy systems. In this authoritative tutorial and comprehens......一起来看看 《Java Message Service API Tutorial and Reference》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试