事件溯源作为一种应用程序体系结构模式越来越流行。事件源涉及将应用程序进行的状态更改建模为事件的不可变序列或“日志”。事件源不是在现场修改应用程序的状态,而是将触发状态更改的事件存储在不可变的日志中,并将状态更改建模为对日志中事件的响应。我们之前曾写过有关事件源,Apache Kafka及其相关性的文章。在本文中,我将进一步探讨这些想法,并展示流处理(尤其是Kafka Streams)如何帮助将事件源和CQRS付诸实践。
让我们举个例子。考虑一个类似于Facebook的社交网络应用程序(尽管完全是假设的),当用户更新其Facebook个人资料时会更新个人资料数据库。当用户更新其个人资料时,需要通知多个应用程序-搜索应用程序,以便可以将用户的个人资料重新编制索引以便可以在更改的属性上进行搜索;新闻订阅源应用程序,以便用户的联系可以找到有关个人资料更新的信息;数据仓库ETL应用程序将最新的概要文件数据加载到支持各种分析查询等的中央数据仓库中。
基于事件源的架构
事件来源涉及更改配置文件Web应用程序,以将配置文件更新建模为事件(发生的重要事件),并将其写入中央日志(例如Kafka主题)。在这种情况下,所有需要响应配置文件更新事件的应用程序,只需订阅Kafka主题并创建各自的物化视图-可以写缓存,在Elasticsearch中为事件建立索引或简单地计算in -内存聚合。个人档案Web应用程序本身也订阅了相同的Kafka主题,并将更新内容写入个人档案数据库。
事件溯源:一些权衡
使用事件源对应用程序进行建模有许多优点-它提供了对对象进行的每个状态更改的完整日志;因此故障排除更加容易。通过将用户意图表示为不可变事件的有序日志,事件源为企业提供了审核和合规性日志,这还具有提供数据源的额外好处。它支持弹性应用程序;回滚应用程序等于倒退事件日志和重新处理数据。具有较好的性能特点;写入和读取可以独立缩放。它实现了松散耦合的应用程序体系结构。它使向基于微服务的体系结构过渡变得更容易。但最重要的是:
事件源支持构建前向兼容的应用程序体系结构,即将来可以添加更多需要处理同一事件但创建不同实例化视图的应用程序的能力。
对于上述优点,也有一些缺点。事件源具有更高的学习曲线;这是一个陌生的新编程模型。事件日志可能涉及更多的查询工作,因为它需要将事件转换为适合查询的所需物化状态。
那是对事件源和一些权衡的快速介绍。本文无意探讨事件源的细节或提倡其用途。您可以在此处阅读有关事件来源和各种折衷方法的更多信息。
Kafka作为事件溯源的支柱
事件源与Apache Kafka相关。这是如何进行的-事件来源涉及维护多个应用程序可以订阅的不可变事件序列。Kafka是一种高性能,低延迟,可扩展和持久的日志,已被全球数千家公司使用,并经过了大规模的实战测试。因此,Kafka是存储事件的自然支柱,同时向基于事件源的应用程序体系结构发展。
事件溯源和CQRS
此外,事件源和CQRS应用程序体系结构模式也相关。命令查询责任隔离(CQRS)是最常用于事件源的应用程序体系结构模式。CQRS涉及在内部将应用程序分为两部分-命令端命令系统更新状态,而查询端则在不更改状态的情况下获取信息。CQRS提供了关注点分离–命令或写端与业务有关;它不关心查询,数据上的不同实例化视图,针对性能的实例化视图的最佳存储等。另一方面,查询或读取端全部与读取访问权限有关。其主要目的是使查询快速高效。
Refactoring an application using event sourcing and CQRS
事件源与CQRS一起工作的方式是使应用程序的一部分在对事件日志或Kafka主题的写入过程中对更新进行建模。这与事件处理程序配对,该事件处理程序订阅Kafka主题,根据需要转换事件,并将实例化视图写入读取存储。最后,应用程序的读取部分针对读取存储发出查询。
CQRS具有一些优点-它使负载与写入和读取分离,从而可以分别缩放。各种读取路径本身可以独立缩放。此外,可以针对应用程序的查询模式优化读取存储;图形应用程序可以将Neo4j用作其读取存储,搜索应用程序可以使用Lucene索引,而简单的内容服务Web应用程序可以使用嵌入式缓存。除了技术优势之外,CQRS还具有组织上的优势-通过将写入和读取路径分离,您可以使负责写入和读取路径的业务逻辑的团队脱钩。
本文仅涉及CQRS细微差别的表面。如果您想了解更多信息,建议阅读Martin Fowler和Udi Dahan关于该主题的文章。
到目前为止,我已经对事件源和CQRS进行了介绍,并描述了Kafka如何自然地将这些应用程序架构模式付诸实践。但是,流处理在何处以及如何进入画面?
CQRS和Kafka的Streams API
这是流处理,尤其是Kafka Streams如何启用CQRS的方法。事件处理程序订阅事件日志(Kafka主题),使用事件,处理这些事件,并将结果更新应用于读取存储。对事件流进行低延迟转换的过程称为流处理。在Apache Kafka的0.10版本中,社区发布了Kafka Streams。一个强大的流处理引擎,用于对Kafka主题上的转换进行建模。
Kafka Streams非常适合在应用程序内部构建事件处理程序组件,该应用程序旨在使用CQRS进行事件来源。它是一个库,因此可以将其嵌入任何标准Java应用程序中,以对事件流进行转换建模。例如,这是一个使用Kafka Streams进行字数统计的代码片段;您可以在Confluent示例github存储库中访问整个程序的代码。
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde,"TextLinesTopic");
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KStream<String, Long> wordCounts = textLines
.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
因此,可以轻松地将应用程序内的事件处理程序表示为Kafka Streams拓扑,但更进一步,有两个不同的选项可用于将事件处理程序的输出建模为对应用程序状态进行建模的数据存储的更新。
采取1:将应用程序状态建模为外部数据存储
Kafka Streams拓扑的输出可以是Kafka主题(如上例所示),也可以写入外部数据存储(如关系数据库)。从世界的角度来看,事件处理程序建模为Kafka Streams拓扑,而应用程序状态建模为用户信任和操作的外部数据存储。执行CQRS的此选项主张使用Kafka Streams仅对事件处理程序建模,而将应用程序状态保留在外部数据存储中,该外部数据存储是Kafka Streams拓扑的最终输出。
以2:在Kafka Streams中将应用程序状态建模为本地状态
作为一种替代方法,除了对事件处理程序进行建模之外,Kafka Streams还提供了一种对应用程序状态进行建模的有效方法-它支持开箱即用的本地,分区和持久状态。此本地状态可以是RocksDB存储,也可以是内存中的哈希映射。
运作方式是,将嵌入Kafka Streams库以进行有状态流处理的应用程序的每个实例都托管应用程序状态的子集,建模为状态存储的碎片或分区。状态存储区的分区方式与应用程序的密钥空间相同。结果,服务于到达特定应用程序实例的查询所需的所有数据在状态存储碎片中本地可用。Kafka Streams通过透明地将对状态存储所做的所有更新记录到高度可用且持久的Kafka主题中,来提供对该本地状态存储的容错功能。因此,如果应用程序实例死亡,并且托管的本地状态存储碎片丢失,则Kafka Streams只需读取高度可用的Kafka主题并将状态数据重新填充即可重新创建状态存储碎片。
实际上,Kafka Streams将Kafka用作其本地嵌入式数据库的提交日志。这正是在封面下设计传统数据库的方式-事务或重做日志是事实的源头,而表只是对存储在事务日志中的数据的物化视图。
Kafka Streams中的本地,分区,持久状态
将Kafka Streams用于使用CQRS构建的有状态应用程序还具有更多优势– Kafka Streams还内置了负载平衡和故障转移功能。如果一个应用程序实例失败,则Kafka Streams会自动在其余应用程序实例之间重新分配Kafka主题的分区以及内部状态存储碎片。同样,Kafka Streams允许弹性缩放。如果启动了使用Kafka Streams执行CQRS的应用程序的新实例,它将自动在新启动的应用程序实例之间平均移动状态存储的现有碎片以及Kafka主题的分区。所有这些功能都以透明的方式提供给Kafka Streams用户。
需要使用Kafka Streams转换为基于CQRS的模式的应用程序不必担心应用程序及其状态的容错性,可用性和可伸缩性。
该嵌入式,分区且持久的状态存储通过Kafka Streams独有的一流抽象-KTable向用户公开。
Kafka流中的交互式查询
在即将发布的Apache Kafka版本中,Kafka Streams将允许其嵌入式状态存储可查询。
Kafka Streams中的这一独特功能-交互式查询(以前被Kafka社区称为Queryable State)-也使其适合将CQRS设计模式应用于应用程序。事件处理程序被建模为Kafka Streams拓扑,该拓扑将数据生成到读取存储,该存储不过是Kafka Streams内部的嵌入式状态存储。应用程序的读取部分将StateStore API用于状态存储,并基于其get()API来提供读取服务。
使用Kafka和Kafka Streams的事件源和基于CQRS的应用程序
Kafka Streams中的交互式查询的情况
请注意,使用交互式查询功能在Kafka Streams中使用嵌入式状态存储纯粹是可选的,并非对所有应用程序都有意义。有时,您只想使用您知道并信任的外部数据库。或者,在使用Kafka Streams时,您也可以将数据发送到外部数据库(例如Cassandra),并让应用程序的读取部分查询该数据。
但是,何时使用像这样的本地嵌入式应用程序状态才有意义?这里有一些利弊考虑-
缺点
- 现在生成的应用程序是有状态的,需要多加注意才能进行管理。
- 它涉及远离您知道和信任的数据存储。
优点
- 移动的零件更少;只是您的应用程序和Kafka集群。您不必部署,维护和操作外部数据库即可存储应用程序所需的状态。
- 它可以更快,更有效地使用应用程序状态。数据对于您的应用程序是本地的(在内存中或可能在SSD上);您可以快速访问它。这对于需要访问大量应用程序状态的应用程序特别有用。而且,在进行聚合以进行流处理的商店和商店应答查询之间没有数据重复。
- 它提供了更好的隔离;状态在应用程序内。一个恶意应用程序无法淹没其他有状态应用程序共享的中央数据存储。
- 它具有灵活性。内部应用程序状态可以针对应用程序所需的查询模式进行优化。
使用Kafka做事件溯源和CQRS:大赢家
我上面列出的利弊体现了所涉及的各种折衷,但是,我认为,朝着此应用程序体系结构迈进的最重要的胜利就是应用程序升级变得更加简单。处理应用程序的非停机升级的传统模型(依赖于外部数据库来确定其应用程序状态)相当复杂。无需停机升级就不需要同时运行新版本和旧版本的应用程序。升级几个实例后,如果发现错误,则需要能够透明地将负载切换回同一应用程序的旧实例。鉴于新实例和旧实例将需要更新外部数据库中的相同表,因此需要格外小心,以在不破坏状态存储中数据的情况下进行此类无停机升级。
现在,对于依赖于本地嵌入式状态的有状态应用程序,考虑相同的无停机升级问题。通过此模型,您可以与旧版本一起推出新版本的应用程序(在Kafka Streams中具有不同的应用程序ID)。每个人都拥有按照其应用程序业务逻辑版本指示的方式处理的应用程序状态副本。您可以逐步将流量从旧的引导到新的。如果新版本的某个错误会在应用程序状态存储区中产生意外结果,那么您始终可以将其丢弃,修复该错误,重新部署该应用程序并让其从日志中重建其状态。
放在一起:零售库存应用
现在让我们以一个例子来说明如何将本文介绍的概念付诸实践-如何使用Kafka和Kafka Streams为应用程序启用事件源和CQRS。
样本零售应用程序体系结构
考虑一个实体零售商的应用程序,该应用程序管理所有商店的库存;当新货到达或发生新销售时,它会更新库存表,并且要知道商店库存的当前状态,它会查询库存表。
具有事件源的零售应用程序架构—由Kafka提供支持
如果我们将事件采购体系结构模式应用于此Inventory应用,则新的货件将在Shipments Kafka主题中表示为事件。同样,新销售将以Sales Kafka主题(可能由Sales应用程序编写)中的事件表示。为简单起见,我们假设“销售”和“发货”主题中的Kafka消息的关键字是{商店ID,商品ID},而值是商店中商品数量的计数。
Inventory应用程序内的事件处理程序被建模为Kafka Streams拓扑,该拓扑连接了Sales和Shipments Kafka主题。联接操作创建并更新状态存储库InventoryTable,该状态存储库表示以连续方式更新的清单的当前状态。
连接操作的内部结构以构建库存表
可以将这样的应用程序部署在不同计算机上的多个实例中(如下图所示)。而且,InventoryApp的每个实例都承载InventoryTable的分片的子集,其中包含此联接操作的结果。当用户查询InventoryApp来了解商店中某商品的当前库存数量时,
- 运行InventoryApp的随机服务器收到一个请求:GET / inventory / stores / {store id} / items / {item id} / count
- 它使用Kafka Streams实例上的metadataForKey()API来获取商店的StreamsMetadata和密钥。StreamsMetadata保存Kafka Streams拓扑中每个商店的主机和端口信息。应用程序使用StreamsMetadata检查该实例是否具有包含关键字{store id,item id}的InventoryTable分区。如果是这样,它将使用本地Kafka Streams实例上的store(“ InventoryTable”)api来获取该商店并对其进行查询。
- 如果不是,它将为当前持有包含{store id,item id}的Kafka分区的实例找到主机/端口,并转发GET请求到/ inventory / stores / {store id} / items / {item id} / count到在该主机上运行的InventoryApp实例。
- 向用户返回库存盘点
在Kafka Streams中使用交互式查询的InventoryState应用程序
要了解有关“交互式查询”功能的更多信息,请阅读其文档。除了这些资源之外,请参阅Capital One的演示文稿,该演示文稿将在实践中应用本文中介绍的一些思想,并概述使用Kafka Streams的基于REST,事件源,CQRS和响应流处理的应用程序体系结构。
如上例所示,存储和查询本地状态对于某些有状态应用程序可能没有意义。有时,您想将状态存储在您知道并信任的外部数据库中。例如,在上面的示例中,您可以使用Kafka Streams通过join操作来计算库存数量,但选择将结果写入外部数据库并查询。
但是,值得注意的是,构建具有查询本地状态的有状态应用程序有许多优点,如本文前面所述。
结论性思想
事件寻源为应用程序使用零损失协议记录其固有的不可避免的状态变化提供了一种有效的方法。这意味着恢复既简单又高效,因为它完全基于日记或像Kafka这样的有序日志。CQRS更进一步,将原始事件变成可查询的视图;精心形成的与其他业务流程相关的视图。Kafka的Streams API提供了以流方式创建这些视图所需的声明性功能,以及可扩展的查询层,因此用户可以直接与此视图进行交互。结果是在Apache Kafka上构建了适用的基于事件源和CQRS的应用程序体系结构;允许此类应用程序还利用Kafka的核心竞争力-性能,可伸缩性,安全性,可靠性和大规模采用。
最重要的是,以这种方式构建有状态的应用程序可使组织最终获得松散耦合的应用程序体系结构-一种具有弹性和可伸缩性,更易于故障排除和升级的应用程序体系结构,最重要的是,该体系结构具有前向兼容性。
对更多感兴趣?
如果您喜欢本文,则可能需要继续使用以下资源,以了解有关Apache Kafka上流处理的更多信息:
- 使用Apache Kafka的流SQL引擎KSQL入门,并遵循Stream Processing Cookbook中的各种教程和示例快速入门。
- 开始使用Kafka Streams API来构建自己的实时应用程序和微服务。
- 观看我们的分为三部分的在线讲座系列,了解KSQL如何工作的来龙去脉,并学习如何有效地使用它来执行监视,安全性和异常检测,在线数据集成,应用程序开发,流ETL等。
- 通过Docker浏览有关Kafka Streams API的Confluent教程,并使用我们的Confluent演示应用程序。