ENode框架Conference案例分析系列之 - Quick Start

简介:

前言

前一篇文章介绍了Conference案例的架构设计,本篇文章开始介绍Conference案例的代码实现。由于代码比较多,一开始就全部介绍所有细节,估计很多人接受不了,也理解不了。所以,我先进行一次QuickStart的介绍,即选取某个简单典型的场景从前到后过一下每个环节。这样大家就能够快速对代码的重要关键环节有大概的理解。另外,我现在正在做ENode的官网,到时会像axon framework一样,介绍ENode框架本身、使用场景、性能数据、案例,以及论坛社区等功能;

本文打算选择Conference案例中一个不太复杂的场景(发布会议),来快速过一下需要开发者实现的代码环节。

UI

首先,我们来看一下发布一个会议的UI入口,前面的文章介绍过,当客户创建好一个会议后,他可以先编辑会议的所有座位类型,然后如果允许预订者预定了,那他就需要先发布这个会议。就像淘宝的卖家要上架商品后商品才会对买家可见一样。本质上,发布一个会议,其实就是将会议聚合根的isPublished修改为true。UI如下图所示:

Controller

当客户点击Publish按钮后,前台提交HttpPost请求到服务端,然后请求就会被ASP.NET MVC的Controller处理,Controller的Action的逻辑如下:

上面的代码比较清晰,我们先判断当前的_conference实例是否为空,如果为空,则直接返回HttpNotFound结果。那_conference实例哪里来的?这里考虑到ConferenceController中的大部分Action都要使用当前的_conference实例,所以我们为了代码的复用,在Controller的OnActionExecuting方法里,提前获取了当前的_conference实例,代码我稍后再贴。_conference实例有了之后我们就可以构建一个PublishConference的命令。该命令只需要一个当前要发布的Conference的ID即可。然后,我们调用ExecuteCommandAsync方法,去异步执行一个命令。然后,我们使用await关键字异步等待命令的处理结果。最后判断结果是否成功,做相应的处理。

ExecuteCommandAsync方法:

该方法内部使用_commandService的ExecuteAsync方法来异步执行一个命令。_commandService是ENode框架提供的分布式的命令发送或执行服务,该服务是通过ConferenceController的构造函数注入的,代码如下:

使用ENode框架开发的Controller,一般是需要两个服务,一个是commanService,用于发送命令;另一个是某个queryService,用语查询数据;Controller依赖这两个服务充分体现了CQRS架构的特点。当然,有时查询服务可能不止一个,那就可以注入多个,看我们自己需要即可。ENode使用的依赖注入框架是Autofac。大家可能在想,为何要弄一个ExecuteCommandAsync方法出来呢?因为要处理超时的情况,假如一个命令处理超时了(比如5s),那Controller的Action也需要立即返回了。TimeoutAfter的代码如下:

大家可以看到TimeoutAfter方法内部,为了实现当超过指定时间后要求的Task还未处理完的情况,我们创建了一个延后指定时间执行的Task,然后通过Task.WhenAny方法异步等待任务执行,最后判断完成的Task是哪个,从而实现超时的处理。这个做法是我在网上找到的,觉得还不错,这个做法可以让我们在实现完全异步的同时还能实现超时处理。最后,我们看一下OnActionExecuting方法:

OnActionExecuting

这个方法的代码的逻辑也比较简单,就是根据HttpRequest中包含的slug参数先获取一个Conference聚合根;如果存在,则进一步根据accessCode参数检查accessCode参数是否合法。通过合法,则认为提供的slug和accessCode有效。大家可以把slug理解为唯一定位一个Conference的,而accessCode是使用该Conference的密码。由于这个只是一个案例,所以我们通过这种简单有效的方法来为用户授权。

Command

了解了Controller的实现,我们接下来看看Command的定义,Command是一个DTO对象。代码如下:

非常简单,由于ENode框架基类提供的Command类已经提供了一个AggregateRootId的属性,所以我们的PublishConference命令无需再定义其他额外的属性了。需要提一下的是,ENode框架要求,所有Command要创建或修改的聚合根的ID必须在Command发送之前赋值,这个是框架的一个约束,我认为这个通常不是问题。如果你希望聚合根的ID是一个long,那也许你需要自己部署一个全局long生成服务了,有兴趣的朋友可以和我交流实现方案,我有实现经验。如果你的ID是一个字符串,那用ENode框架提供的ObjectId类即可,它可以帮你自动生成一个24位长度的全局唯一顺序字符串。接下来我们看看Command Handler的实现。

Command Handler

一个CommandHandler中的代码通常是一句话,ENode框架的最大好处是可以让开发者无需关注C端的技术实现,开发者只需要关心如何实现自己的业务逻辑即可。如上图所示,我们会先定义一个ConferenceCommandHandler的类,然后实现ICommandHandler<PublishConference>接口,然后进一步实现对应的Handle方法。在Handle方法内部,我们只需要从当前的上下文根据Command所关联的聚合根ID获取当前要操作的聚合根,然后调用聚合根的业务方法即可。我们不需要像经典DDD那样把聚合根从IConferenceRepository中取出来,再修改聚合根,再保存聚合根。并且经典DDD往往还会和工作单元(Unit of Work)配合;因为经典DDD,是支持一个应用层的方法同时修改多个聚合根的,而ENode框架是要求一个命令一次只能创建或修改一个聚合根,即架构设计上就是面向最终一致性的,主要目的为了实现更高的吞吐量,这点开发者需要明确与了解。CommandHandler,从代码实现的角度,我相信ENode框架提供的方式是非常简单和直接的,没有任何多余的东西。大家可以看到使用ENode框架开发,大部分情况是不需要定义Repository的。下面我们来看看Domain聚合根的实现。

Domain & Domain Event

使用ENode框架开发的领域模型,聚合根的实现通常是这样的:

  1. 继承基类AggregateRoot<TAggregateRootId>
  2. 构造函数要传给基类聚合根的ID
  3. 然后聚合根自己会提供一些可以修改自己状态的公共方法,例如上面的Publish,Unpublish方法
  4. 聚合根内部会有一些私有的Handle方法,这些Handle方法是根据对应的事件,更新自己的状态(事件驱动聚合根状态的修改)

当前我们这里被调用的方法是Publish,该方法内部,先判断当前聚合根是否已经处于发布状态,如果是,则抛出异常即可,当然,你选择忽略也没问题;如果不是,则调用ApplyEvent方法Apply当前领域事件。ApplyEvent方法的逻辑是,先找到当前事件对应的Handle方法,然后调用该Handle方法;然后调用完成后,把当前事件放入一个聚合根内部的事件队列中。

如果对ENode框架的实现有一定了解的朋友应该知道,ENode在处理一个命令时,ENode框架处理Command的核心流程是这样的:

  1. 先创建一个空的ICommandContext对象;
  2. 调用CommandHandler的Handle方法,并把ICommandContext传给Handle方法;
  3. 当Handle方法结束后,ENode框架就能知道当前ICommandContext中有哪些聚合根修改了或创建了(框架要求一个命令一次只能涉及一个聚合根的修改)然后框架如果拿到了某个修改的聚合根,它就拿出该聚合根里上面提到的内部的事件队列里的事件。然后根据这些事件生成一个EventStream的对象;
  4. 持久化EventStream对象到EventStore;
  5. 发布(Publish)EventStream到EQueue,然后外部的Event Handler就都能响应领域事件了;

上面这个是正常流程,在这里我顺便提一下,为了让大家更好的理解内部实现的机制。通过上面这些介绍,我想大家应该至少可以理解上面的Publish方法和Handle方法了吧。

另外,有些朋友可能会想,为何是先产生事件,再修改状态呢?

主要原因是因为这个Handle方法是会在事件溯源(ES)的时候被重复利用的。当我们要从EventStore通过ES还原某个聚合根时,我们是先从EventStore获取该聚合根所产生的所有的事件,然后对每个事件调用聚合根的对应的Handle方法,从而实现聚合根状态的还原。这个过程也就是我们常说的事件溯源,即ES(Event Sourcing)。

需要强调的是,聚合根应该在产生事件之前把各种业务规则和业务逻辑实现掉,然后只有当前操作满足所有的业务规则时,才调用ApplyEvent方法。然后在聚合根里的所有Handle方法中,就是仅仅简单的等于号赋值操作,不能有任何业务逻辑,这点非常重要。为什么要这样呢?因为假如我们把一些业务规则和逻辑放在Handle方法中,比如if怎么样的时候做什么赋值,else的时候做另外的赋值。那假如哪一天我们的Handle方法里的判断逻辑变化了,那我们通过事件溯源还原出来的聚合根的状态就不对了。这点应该不难理解吧。

从更高层面(哲学)的角度来理解,EventStore中存储的事件并不是完整的历史。事件+聚合根的Handle方法才是完整的历史,两者结合才可以完整地将聚合根的状态还原到最新状态。因为是历史,历史无法改变,所以我们的事件和Handle方法也都不能修改;或者如果真的要修改,也必须确保兼容老的结构和实现,这点非常重要。下面我们来看看Event Handler的实现:

Event Handler

EventHandler的作用是根据C端聚合根产生的事件来更新CQRS的读库。需要注意的是ENode整个框架对外提供的API基本都是异步IO的(实际上内部的实现也都是异步IO的,只有整个链路都是异步得,才能发挥异步的好处)。所以我们更新读库时,需要使用ADO.NET提供的Async方法类更新DB。这里我使用ENode自带的Dapper轻量级高性能ORM来实现对读库的更新。上面的代码中就是更新Conference表的IsPublish字段。但是为了确保避免并发导致的数据覆盖,所以我们需要严谨的利用乐观控制来确保数据不会被覆盖,ENode要求我们使用Version机制来实现乐观锁。

关于并发控制的讨论,其实还有非常多的细节可以讨论。我之前写过一篇文章,大家有兴趣的可以去看一下,本文的目的是做一个QuickStart,所以不做过多展开了。TryUpdateRecordAsync方法的内部实现如下,很简单,我就不做介绍了。

还有一点需要特别提一下,就是为何要使用Dapper而不使用EF这种ORM框架。因为ENode框架实现的是CQRS+ES的架构。所以,我们在更新读库时,是根据事件更新读库。那怎么样的更新是最快的呢?就是直接通过Insert或Update语句来更新DB。而如果通过EF这种框架,因为是面向OO的ORM,所以一般是需要先从DB取出数据转换为对象,再更新对象,再保存对象这样的思路。这个过程我个人认为,对于CQRS+ES架构的应用来说,是比较繁琐和低效(2次IO,先读出来,再保存回去)的。我们在更新读库时,更好的方式应该是利用像Dapper这样的ORM框架,简单直接的更新读库(一次IO操作即可)。另外,我通过对Dapper做了一些简单的二次封装,可以做到用最直接的代码实现目的,且兼顾了代码的可读性、可维护性、灵活性,以及性能。另外,查询数据时,通过Dapper也非常简单,而且还支持返回dynamic对象。Dapper是基于约定的框架,不需要做ORM映射方面的配置。我个人认为使用在CQRS+ES架构中是非常合理的。所以,对我来说,EF可以退休了,呵呵。

总结

好了,上面介绍了发布会议的所有需要用户写的代码,是不是很简单呢?我个人认为和经典DDD的架构相比,由于有ENode框架的支持,所以开发基于CQRS+ES架构的应用,是非常简单的。下一篇要写什么还没想好,大家还想了解什么,可以及时给我反馈啊。


目录
相关文章
|
1月前
|
人工智能 数据可视化 API
AI Agents Loop异步执行可视化Tutorial 借助AgentBoard工具可视化工作流
本文介绍了AI Agent的异步执行循环(Agent Loop),并展示了如何利用开源框架agentboard可视化这一过程。通过分析不同框架(如AutoGen、LangGraph、AutoAgent)对Agent Loop的抽象,文章详细说明了从简单的功能调用到复杂的多阶段执行流程的设计。此外,还提供了使用agentboard进行日志记录与流程可视化的具体示例,包括安装步骤、代码实现及运行方法,帮助开发者更高效地调试和优化AI Agent的应用。
AI Agents Loop异步执行可视化Tutorial 借助AgentBoard工具可视化工作流
XR Interaction Toolkit教程⭐三、实现抓取和交互功能
XR Interaction Toolkit教程⭐三、实现抓取和交互功能
|
6月前
|
监控 Linux Python
python自研流星监控系统meteor_monitor(第二篇)
该文介绍了替代流星监控软件UFOcaptureHD2的新方案,强调了原软件的性能消耗大和收费问题。文中提供了一个GitHub链接以获取最新代码。推荐使用配备Windows 10/11、2.4GHz四核CPU的主机,搭配索尼MX291摄像头进行监控。程序基于ffmpeg,支持不同编码器,如mjpeg、h264_qsv等,具体编码器选择取决于硬件环境。安装涉及创建虚拟环境、安装Python 3.10+及依赖项,并提供了Windows和Linux的详细步骤。此外,程序通过帧差法进行运动检测,然后过滤掉非流星目标,最后使用ffmpeg对原始视频切片并存储。
python自研流星监控系统meteor_monitor(第二篇)
|
6月前
|
存储 监控 Java
python自研流星监控系统meteor_monitor(第一篇)
本文介绍了作者开发的一个Python流星监控系统,替代了性能不佳且收费的ufocapturehd2软件。系统采用Win10相机应用低耗录制视频,通过SikuliX进行自动化控制,分段录制并存储到本地,然后通过脚本同步到NAS。视频分析使用帧差法检测流星,支持分布式分析。代码已更新,旧文章不再适用,最新内容可见:[用python自行开发的流星监控系统meteor_monitor(第二篇)-CSDN博客](https://github.com/xingxinghuo1000/meteor_monitor_scripts.git)。
|
JavaScript 前端开发 数据挖掘
Echarts数据分析系统Data Analysis Platform使用说明文档
Echarts数据分析系统Data Analysis Platform使用说明文档
128 0
|
缓存 JavaScript 前端开发
GIAC-2022sh 学习笔记 | 开放原子开源基金会-贺师俊-ES2022 — JS is dead, long live ecosystem
GIAC-2022sh 学习笔记 | 开放原子开源基金会-贺师俊-ES2022 — JS is dead, long live ecosystem
192 0
GIAC-2022sh 学习笔记 | 开放原子开源基金会-贺师俊-ES2022 — JS is dead, long live ecosystem
|
Serverless
emprical 模块学习与分析 note6
emprical 模块学习与分析 note6
220 0
|
机器学习/深度学习 算法 搜索推荐
cs224w(图机器学习)2021冬季课程学习笔记4 Link Analysis: PageRank (Graph as Matrix)
cs224w(图机器学习)2021冬季课程学习笔记4 Link Analysis: PageRank (Graph as Matrix)
cs224w(图机器学习)2021冬季课程学习笔记4 Link Analysis: PageRank (Graph as Matrix)
|
缓存 监控 NoSQL
Quick-Task 动态脚本支持框架之结构设计篇
前面两篇博文,主要是整体介绍和如何使用;接下来开始进入正题,逐步剖析,这个项目是怎么一步一步搭建起来的;本篇博文则主要介绍基本骨架的设计,围绕项目的核心点,实现一个基础的原型系统
288 0
Quick-Task 动态脚本支持框架之结构设计篇
|
数据可视化
hands-on-data-analysis 第二单元 第四节数据可视化
hands-on-data-analysis 第二单元 第四节数据可视化
152 0
hands-on-data-analysis 第二单元 第四节数据可视化