内容简介:Akka Persistence是一个相当不错的事件溯源EventSourcing实现。当您选择EventSourcing架构时,自然也将采用CQRS方法。因为你不希望将读查询等操作变成持久写操作的额外的工作,读查询应该作为单独的访问路径完成。当然代价就是换来最终一致性,所以最大的问题是:如何有效更新读取模型?让我们从一些问题开始:1.读模型应基于持久保存的事件进行更新,2.必须保证没有事件可以丢失,
Akka Persistence是一个相当不错的事件溯源EventSourcing实现。当您选择EventSourcing架构时,自然也将采用CQRS方法。因为你不希望将读查询等操作变成持久写操作的额外的工作,读查询应该作为单独的访问路径完成。当然代价就是换来最终一致性,所以最大的问题是:如何有效更新读取模型?让我们从一些问题开始:
1.读模型应基于持久保存的事件进行更新,
2.必须保证没有事件可以丢失,
3.事件顺序必须保证。
前两点非常明显,如果您不关心事件顺序,则根本不需要事件源。选择正确的策略在很大程度上取决于你的领域(你拥有多少持久Actor,你正在制造多少事件等等)以及用于存储事件的底层数据库。在写这篇文章的时候,我选择存储事件(和快照)的武器是Apache Cassandra--一个高度可扩展的分布式数据库。现有的Cassandra插件已经多次证明它是稳定的并且可以生产。有些传闻说Scylla是更有效的存储,但它仍然处于研发阶段。
通常,你有很多不同的读模型。其中一些比其他更重要。这就是为什么我们这个解决方案可以独立扩展到每个读模型的原因。
Akka Persistent Query
第一种方法非常简单,我们可以使用Akka堆栈中的内置解决方案,即Persistence Query。这个想法如下:
1.连接到事件日志并将事件作为流提供。
2.更新读取模型。
3.保存处理后的事件序列号。
val eventJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) eventJournal .eventsByPersistenceId(persistenceId, startingSequenceNr, Long.MaxValue) .via(updateSingleReadModel) .mapAsync(1)(saveSequnceNr)
保存序列号对于恢复阶段是必需的,这样您就不会从头开始处理事件。
简单而优雅的解决方案,对吗?不幸的是不是响应式Rective的。默认情况下Cassandra时间间隔为3秒。起初,这可能很好,但假设生产中有10000个Persistence Actor,这实际上是一个非常小的数量,但足以杀死你的应用程序。对于每个Persistence Actor,您将需要启动一个流,请相信我10000个流不是最好的主意。实际上,如果要独立更新读取模型,您应该将Persistence Actor的数量乘以读取模型的数量。
相比采用eventsByPersistenceId,您应该标记您的事件并使用eventsByTag查询。在大多数情况下,这工作得很好,但你可能会面临事件分发的问题。假设大部分事件是由Persistence Actor的1%产生的。这可能导致其他99% Persistence Actor的事件处理滞后,因为所有事件都将在同一个源中。对此的解决方案可能是某种形式的分片标记,就像在Lagom实现中一样。
不幸的是,没有任何技巧可以解决持久查询(即轮询数据)的主要问题。3秒滞后在其他情况下不成问题,对于某些情况即使0.5秒也不可接受。太小的间隔也会为底层数据库创建不必要的负载。该分析其他选项了。
CDC
如何直接从数据库流式传输数据?Cassandra(与大多数数据库一样)支持CDC机制。理论上,我们可以轻松连接到更改日志并使用它来更新读取模型。太好了,但这里有一个问题。Cassandra是一个分布式数据库,因此每个数据库节点都有一个单独的CDC日志文件并管理这些日志以确保事件顺序将是一场噩梦。此时,CDC应该被认为是“有害的”。
Kafka作为一个数据库
如果从Cassandra的读取效率是一个大问题,也许我们可以使用像卡夫卡这样的消息队列作为事件存储?从卡夫卡中读取事件流将会(在设计上)非常有效。每个读模型将由不同的卡夫卡消费者更新,这可能是一个独立的过程,完全自主,独立和独立可扩展。整个概念在我们之前的博客文章中有详细描述。对于某些应用,这种方法可以顺利运行。但是,在某些情况下,Kafka(或实际上任何消息队列)作为数据库可能带来很多其他问题:
1.快照管理。
2.Retention滞留管理(Retention应该可能被禁用)。
3.Kafka分区 - 为了保持顺序,来自聚合的所有事件必须放入单个分区,该分区必须适合单个节点。在一些重载情况下,这可能是一个阻碍或需要解决的挑战。
4.Akka Persistence不支持。
卡桑德拉,卡夫卡和至少一次交付
如何将两个概念结合在一起?Cassandra用于存储事件(真相的来源)和Kafka作为读取模型处理器的附加层。
理论上,这是完美的。唯一的问题是如何有效地把事件发给卡夫卡。像eventsByPersistenceId或eventsByTag(如上所述)持久性查询取代更新读取模型可用于向卡夫卡发送事件,但滞后和分发问题仍将保留。
另一种方法可能是在更新状态阶段之后立即向Kafka发送事件。Persistence Actor的算法很简单:
1.接收命令
2.保存事件(S)
3.更新状态
4.发送事件给卡夫卡
我们来看看可能存在的问题。为了确保事件顺序,发送给Kafka必须阻塞整个Actor,这当然是一个坏主意,因为它会降低Persistent Actor的性能。我们可以将卡夫卡生产者委托给一名独立的child actor(我们称之为KafkaSender)。太棒了,但是我们必须确保Actor之间的消息传递。这可以通过使用AtLeastOnceDelivery特征来完成。你可能觉得没有什么是免费的。确实,更多的消息在我们的集群中循环传播,我们会失去顺序:
至少一次投递意味着原始邮件发送顺序并不总是保留。
考虑使用一些事件缓冲机制以确保事件顺序?请停止!至少一次交付可以不同的、更乐观的方式完成。您可以将事件发送到KafkaSender而无需确认发送,但您需要监视事件序列号。如果序列号有任何差异,则需要采取额外的措施:
1.如果序列号低于当前序列号 - 事件已经处理完毕,可以跳过
2.在间隔高于1的情况下,存储消息并启动eventsByPersistenceId以填补空白,并清除挂起的消息
当然,当前序列号应该在KafkaSender失败或持久Actor重启后保持并恢复。听起来很复杂?诚然,但没有人说高可扩展性是便宜的。
结论
在你跳到最后一个最真实,最有效,可扩展但最难实施的解决方案之前,如果你确实需要这种级别的可扩展性(以及不可避免的复杂性),请问问你自己。我看到使用事务性事件持久和读模型更新的事件采购实现了。只要能满足你预期的延迟和吞吐量,这种方法就非常好。但是,如果您需要将事件溯源架构压缩到最后一毫秒,我希望这篇文章能够帮助您找到最适合您的用例的解决方案。
[该贴被banq于2018-06-29 19:39修改过]
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。