Akka Persistence、Kafka、Cassandra实现CQRS和E

栏目: Scala · 发布时间: 6年前

内容简介:Akka Persistence是一个相当不错的事件溯源EventSourcing实现。当您选择EventSourcing架构时,自然也将采用CQRS方法。因为你不希望将读查询等操作变成持久写操作的额外的工作,读查询应该作为单独的访问路径完成。当然代价就是换来最终一致性,所以最大的问题是:如何有效更新读取模型?让我们从一些问题开始:1.读模型应基于持久保存的事件进行更新,2.必须保证没有事件可以丢失,

Akka Persistence是一个相当不错的事件溯源EventSourcing实现。当您选择EventSourcing架构时,自然也将采用CQRS方法。因为你不希望将读查询等操作变成持久写操作的额外的工作,读查询应该作为单独的访问路径完成。当然代价就是换来最终一致性,所以最大的问题是:如何有效更新读取模型?让我们从一些问题开始:

1.读模型应基于持久保存的事件进行更新,

2.必须保证没有事件可以丢失,

3.事件顺序必须保证。

前两点非常明显,如果您不关心事件顺序,则根本不需要事件源。选择正确的策略在很大程度上取决于你的领域(你拥有多少持久Actor,你正在制造多少事件等等)以及用于存储事件的底层数据库。在写这篇文章的时候,我选择存储事件(和快照)的武器是Apache Cassandra--一个高度可扩展的分布式数据库。现有的Cassandra插件已经多次证明它是稳定的并且可以生产。有些传闻说Scylla是更有效的存储,但它仍然处于研发阶段。

通常,你有很多不同的读模型。其中一些比其他更重要。这就是为什么我们这个解决方案可以独立扩展到每个读模型的原因。

Akka Persistent Query

第一种方法非常简单,我们可以使用Akka堆栈中的内置解决方案,即Persistence Query。这个想法如下:

1.连接到事件日志并将事件作为流提供。

2.更新读取模型。

3.保存处理后的事件序列号。

val eventJournal  = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

eventJournal
  .eventsByPersistenceId(persistenceId, startingSequenceNr, Long.MaxValue)
  .via(updateSingleReadModel)
  .mapAsync(1)(saveSequnceNr)

保存序列号对于恢复阶段是必需的,这样您就不会从头开始处理事件。

简单而优雅的解决方案,对吗?不幸的是不是响应式Rective的。默认情况下Cassandra时间间隔为3秒。起初,这可能很好,但假设生产中有10000个Persistence Actor,这实际上是一个非常小的数量,但足以杀死你的应用程序。对于每个Persistence Actor,您将需要启动一个流,请相信我10000个流不是最好的主意。实际上,如果要独立更新读取模型,您应该将Persistence Actor的数量乘以读取模型的数量。

相比采用eventsByPersistenceId,您应该标记您的事件并使用eventsByTag查询。在大多数情况下,这工作得很好,但你可能会面临事件分发的问题。假设大部分事件是由Persistence Actor的1%产生的。这可能导致其他99% Persistence Actor的事件处理滞后,因为所有事件都将在同一个源中。对此的解决方案可能是某种形式的分片标记,就像在Lagom实现中一样。

不幸的是,没有任何技巧可以解决持久查询(即轮询数据)的主要问题。3秒滞后在其他情况下不成问题,对于某些情况即使0.5秒也不可接受。太小的间隔也会为底层数据库创建不必要的负载。该分析其他选项了。

CDC

如何直接从数据库流式传输数据?Cassandra(与大多数数据库一样)支持CDC机制。理论上,我们可以轻松连接到更改日志并使用它来更新读取模型。太好了,但这里有一个问题。Cassandra是一个分布式数据库,因此每个数据库节点都有一个单独的CDC日志文件并管理这些日志以确保事件顺序将是一场噩梦。此时,CDC应该被认为是“有害的”。

Kafka作为一个数据库

如果从Cassandra的读取效率是一个大问题,也许我们可以使用像卡夫卡这样的消息队列作为事件存储?从卡夫卡中读取事件流将会(在设计上)非常有效。每个读模型将由不同的卡夫卡消费者更新,这可能是一个独立的过程,完全自主,独立和独立可扩展。整个概念在我们之前的博客文章中有详细描述。对于某些应用,这种方法可以顺利运行。但是,在某些情况下,Kafka(或实际上任何消息队列)作为数据库可能带来很多其他问题:

1.快照管理。

2.Retention滞留管理(Retention应该可能被禁用)。

3.Kafka分区 - 为了保持顺序,来自聚合的所有事件必须放入单个分区,该分区必须适合单个节点。在一些重载情况下,这可能是一个阻碍或需要解决的挑战。

4.Akka Persistence不支持。

卡桑德拉,卡夫卡和至少一次交付

如何将两个概念结合在一起?Cassandra用于存储事件(真相的来源)和Kafka作为读取模型处理器的附加层。

Akka Persistence、Kafka、Cassandra实现CQRS和E

理论上,这是完美的。唯一的问题是如何有效地把事件发给卡夫卡。像eventsByPersistenceId或eventsByTag(如上所述)持久性查询取代更新读取模型可用于向卡夫卡发送事件,但滞后和分发问题仍将保留。

另一种方法可能是在更新状态阶段之后立即向Kafka发送事件。Persistence Actor的算法很简单:

1.接收命令

2.保存事件(S)

3.更新状态

4.发送事件给卡夫卡

我们来看看可能存在的问题。为了确保事件顺序,发送给Kafka必须阻塞整个Actor,这当然是一个坏主意,因为它会降低Persistent Actor的性能。我们可以将卡夫卡生产者委托给一名独立的child actor(我们称之为KafkaSender)。太棒了,但是我们必须确保Actor之间的消息传递。这可以通过使用AtLeastOnceDelivery特征来完成。你可能觉得没有什么是免费的。确实,更多的消息在我们的集群中循环传播,我们会失去顺序:

至少一次投递意味着原始邮件发送顺序并不总是保留。

考虑使用一些事件缓冲机制以确保事件顺序?请停止!至少一次交付可以不同的、更乐观的方式完成。您可以将事件发送到KafkaSender而无需确认发送,但您需要监视事件序列号。如果序列号有任何差异,则需要采取额外的措施:

1.如果序列号低于当前序列号 - 事件已经处理完毕,可以跳过

2.在间隔高于1的情况下,存储消息并启动eventsByPersistenceId以填补空白,并清除挂起的消息

当然,当前序列号应该在KafkaSender失败或持久Actor重启后保持并恢复。听起来很复杂?诚然,但没有人说高可扩展性是便宜的。

结论

在你跳到最后一个最真实,最有效,可扩展但最难实施的解决方案之前,如果你确实需要这种级别的可扩展性(以及不可避免的复杂性),请问问你自己。我看到使用事务性事件持久和读模型更新的事件采购实现了。只要能满足你预期的延迟和吞吐量,这种方法就非常好。但是,如果您需要将事件溯源架构压缩到最后一毫秒,我希望这篇文章能够帮助您找到最适合您的用例的解决方案。

[该贴被banq于2018-06-29 19:39修改过]


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

引爆社群

引爆社群

唐兴通 / 机械工业出版社华章科技 / 2015-3 / 49

科技延伸媒介,移动互联网技术催生了微信等各种新的媒体;媒介更新人文,新媒介让互联网重回社群时代;人文重塑商业规则,社群时代的商业模式与法则究竟是怎样的?我们应该如何去适应和应对?本书提出的“新4C法则”为此给出了解决方案。关于如何运用这个法则,本书给出了详尽的阐述。但是,要利用好这个法则,需要我们对它有深入的理解。 场景(Context):移动互联网时代场景为王,所有基于移动互联网的产品和服......一起来看看 《引爆社群》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具