ENode框架单台机器在处理Command时的设计思路

简介:

设计目标

  1. 尽量快的处理命令和事件,保证吞吐量;
  2. 处理完一个命令后不需要等待命令产生的事件持久化完成就能处理下一个命令,从而保证领域内的业务逻辑处理不依赖于持久化IO,实现真正的in-memory;
  3. 保证命令、事件处理的顺序性,先来的先处理,先产生的先处理;
  4. 保证一个聚合根的事件只有一个线程在持久化,并按事件产生的顺序持久化;
  5. 持久化事件时如果遇到并发冲突时(聚合根ID+事件版本号出现重复)的处理代价要轻;
  6. 要能利用多核的优势;

总体设计思路

  1. 先将命令根据聚合根ID路由到CommandMailBox里;
  2. 单线程处理CommandMailBox中的命令,由于聚合根在in-memory本地内存,所以处理非常快;
  3. 处理成功后更新聚合根的in-memory内存;
  4. 内存更新后将聚合根产生的事件同样原理路由到EventMailBox里;
  5. 单线程批量处理EventMailBox中的事件;由于是批量,所以持久化的吞吐量也可以保证;
  6. 处理完成一批事件后,把这一批事件对应的命令从CommandMailBox中移除;

详细设计思路

  1. 设计N个存放命令的CommandMailBox,命令首先按聚合根ID的hashcode取摸路由到对应的CommandMailBox;
  2. 每个CommandMailBox都有一个maxOffset, consumeOffset,以及一个CommandProcessor(单线程)在不停的处理;maxOffset表示最后一个命令的位置;consumeOffset表示当前正在处理的命令的位置;
  3. CommandProcessor的处理逻辑;
    • 创建、修改聚合根;
    • 更新聚合根的in-memory缓存;
    • 将聚合根产生的事件按聚合根ID的hashcode取摸路由到对应的EventMailBox;EventMailBox的个数也是程序启动时配置;
  4. 每个EventMailBox都有一个maxOffset, consumeOffset,以及一个EventProcessor(单线程)在不停的处理;maxOffset表示最后一个事件的位置;consumeOffset表示当前正在处理的事件的位置;
  5. EventProcessor的处理逻辑:
    • 按次序批量获取一批要处理的事件;
    • 批量持久化事件到EventStore,采用SqlBulkCopy;
    • 如果持久化一切顺利,则publish这一批事件(publish如果遇到网络IO异常,则重试,直到成功为止),然后继续持久化下一批,并同时将当前这一批事件对应的命令从CommandMailBox中删除;.如果持久化遇到并发冲突(事件的aggregateRootId+Version重复),则对当前这一批事件一个个按顺序持久化。如果当前事件持久化成功,则同样publish该事件,当然遇到IO异常时也要不断重试,直到成功为止;成功后通知CommandMailBox移除当前事件对应的命令;如果当前事件持久化出现并发冲突,就做如下处理:
    1. 先通知当前事件对应聚合根暂停处理后续的命令;
    2. 用Event Sourcing技术将in-memory中的聚合根的状态还原到最新状态,确保下次执行command时基于的聚合根的状态是最新的;
    3. 把这一批里该聚合根的所有事件移除,把EventMailBox中的该聚合根的所有事件移除;
    4. 将CommandMailBox的处理位置重置为当前事件对应的命令的offset;从而可以确保产生并发冲突的事件对应的命令以及后续的命令能再重新被处理一遍;
    5. 通知当前事件对应聚合根继续处理后续的命令(从哪个位置开始处理,在上面第4步已经重置过了);
    6. 这一批的所有事件都一个个处理完之后,按同样的逻辑继续处理下一批事件;

其他说明

  1. 上面的设计基于一个前提,就是一个聚合根几乎不会同时在两台服务器上同时存在并处理命令,否则就会出现并发冲突,而并发冲突的处理的代价还是比较复杂的,应该尽量避免;这点可以通过EQueue保证;
  2. 当聚合根处理了命令,尝试更新in-memory内存时,可能有一种情况会失败。就是如果这个命令是创建聚合根的,而有可能并发的时候这个聚合根可能在内存中已经有了,则创建完聚合根添加到内存时,应该能检测出来并记录错误日志,然后该命令产生的事件也不必放入EventMailBox,然后认为该命令处理成功即可。
  3. 上面的设计中没有谈到当遇到命令重复执行时的设计思路,大家可以自己想想:)
目录
相关文章
|
2月前
|
存储 分布式计算 Hadoop
ChunkServer 原理与架构详解
【8月更文第30天】在分布式文件系统中,ChunkServer 是一个重要的组件,负责存储文件系统中的数据块(chunks)。ChunkServer 的设计和实现对于确保数据的高可用性、一致性和持久性至关重要。本文将深入探讨 ChunkServer 的核心原理和内部架构设计,并通过代码示例来说明其实现细节。
33 1
|
2月前
|
资源调度 Java 调度
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
|
5月前
|
NoSQL 关系型数据库 MySQL
单机模拟集群(三主两从)
单机模拟集群(三主两从)
|
存储 人工智能
分布式与集群(一):我眼中的分布式与集群
我们国家的优势:我们相比其他国家拥有天然的人口优势;人多了,需求便来,如何做到服务好这么多人,面临很大的挑战,同时也蕴藏着巨大的机会;当然也催生了以BAT为代表的一批互联网公司的诞生,我们的技术也进入世界一流水平;
157 4
|
11月前
|
Kubernetes 调度 容器
二进制 k8s 集群下线 worker 组件流程分析和实践
二进制 k8s 集群下线 worker 组件流程分析和实践
93 0
|
11月前
|
流计算
110 Storm集群的进程及日志熟悉
110 Storm集群的进程及日志熟悉
56 0
|
存储 负载均衡 算法
【集群】集群的概念(相关知识)、常用工具/算法、常见集群
文章目录 前言 一、集群架构 1.1 负载调度器 1.1.1 常用调度算法
139 0
【集群】集群的概念(相关知识)、常用工具/算法、常见集群
|
设计模式 Go uml
我是状态机, 一颗永远骚动的机器引擎
状态机是一种行为设计模式,它允许对象在其内部状态改变时改变其行为。看起来好像对象改变了它的类。
我是状态机, 一颗永远骚动的机器引擎
|
分布式计算 运维 并行计算
​分布式系统与单节点系统的本质区别是什么?
​分布式系统与单节点系统的本质区别是什么?
332 0
​分布式系统与单节点系统的本质区别是什么?
|
负载均衡 API 测试技术
Istio流控,服务发现,负载均衡,核心流程是如何实现的?
Istio架构体系中,流控(Traffic Management)虽然是数据平面的Envoy Proxy实施的,但整个架构的核心其实在于控制平面的Pilot。
922 0