ASP.NET Core微服务之开源事件总线CAP的初步使用

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Tip: 此篇已加入.NET Core微服务基础系列文章索引一、CAP简介下面的文字来自CAP的Wiki文档:https://github.com/dotnetcore/CAP/wiki  CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

Tip: 此篇已加入.NET Core微服务基础系列文章索引

一、CAP简介

CAP

下面的文字来自CAP的Wiki文档:https://github.com/dotnetcore/CAP/wiki

  CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。我们可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

  CAP 的应用场景主要有以下两个:

  • 分布式事务中的最终一致性(异步确保)的方案
  • 具有高可用性的 EventBus

  CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,我们不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的将CAP集成到项目中。

  CAP 目前支持使用 Sql Server,MySql,PostgreSql 数据库的项目;

  CAP 同时支持使用 EntityFrameworkCore 和 Dapper 的项目,可以根据需要选择不同的配置方式;

  CAP的作者为园友savorboard(杨晓东),成都地区的.NET社区领导者,棒棒哒!

二、案例结构

  此次试验仍然和上一篇基于MassTransit的案例一样(其实是我懒得再改,直接拿来复用),共有四个MicroService应用程序,当用户下订单时会通过CAP作为事件总线发布消息,作为订阅者的库存和配送服务会接收到消息并消费消息。此次试验会采用RabbitMQ作为消息队列,采用MSSQL作为关系型数据库(同时CAP也是支持MSSQL的)。

  准备工作:为所有服务通过NuGet安装CAP及其相关包

PM> Install-Package DotNetCore.CAP
 下面是RabbitMQ的支持包
PM> Install-Package DotNetCore.CAP.RabbitMQ
 下面是MSSQL的支持包
PM> Install-Package DotNetCore.CAP.SqlServer

三、具体实现

3.1 OrderService

  (1)启动配置:这里主要需要给CAP指定数据库(它会在这个数据库中创建本地消息表Published和Received)以及使用到的消息队列(这里是RabbitMQ)

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // Repository
        services.AddScoped<IOrderRepository, OrderRepository>();

        // EF DbContext
        services.AddDbContext<OrderDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:OrderDB"]);

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<OrderDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"]; 
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)Controller:这里会调用Repository去实现业务逻辑和发送消息

    [Route("api/Order")]
    public class OrderController : Controller
    {
        public IOrderRepository OrderRepository { get; }

        public OrderController(IOrderRepository OrderRepository)
        {
            this.OrderRepository = OrderRepository;
        }

        [HttpPost]
        public string Post([FromBody]OrderDTO orderDTO)
        {
            var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult();

            return result ? "Post Order Success" : "Post Order Failed";
        }
    }

  (3)Repository:这里实现了两种方式:EF和Dapper(基于ADO.NET),其中EF方式中不需要传transaction(当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储),而基于ADO.NET方式中需要传transaction(由于不能获取到事务上下文,所以需要用户手动的传递事务上下文到CAP中)。

    public class OrderRepository : IOrderRepository
    {
        public OrderDbContext DbContext { get; }
        public ICapPublisher CapPublisher { get; }
        public string ConnStr { get; } // For Dapper use

        public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr)
        {
            this.DbContext = DbContext;
            this.CapPublisher = CapPublisher;
            this.ConnStr = ConnStr;
        }

        public async Task<bool> CreateOrderByEF(IOrder order)
        {
            using (var trans = DbContext.Database.BeginTransaction())
            {
                var orderEntity = new Order()
                {
                    ID = GenerateOrderID(),
                    OrderUserID = order.OrderUserID,
                    OrderTime = order.OrderTime,
                    OrderItems = null,
                    ProductID = order.ProductID // For demo use
                };

                DbContext.Orders.Add(orderEntity);
                await DbContext.SaveChangesAsync();

                // When using EF, no need to pass transaction
                var orderMessage = new OrderMessage()
                {
                    ID = orderEntity.ID,
                    OrderUserID = orderEntity.OrderUserID,
                    OrderTime = orderEntity.OrderTime,
                    OrderItems = null,
                    ProductID = orderEntity.ProductID // For demo use
                };
                
                await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage);

                trans.Commit();
            }

            return true;
        }

        public async Task<bool> CreateOrderByDapper(IOrder order)
        {
            using (var conn = new SqlConnection(ConnStr))
            {
                conn.Open();
                using (var trans = conn.BeginTransaction())
                {
                    // business code here
                    string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID)
                                                                VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)";

                    order.ID = GenerateOrderID();
                    await conn.ExecuteAsync(sqlCommand, param: new
                    {
                        OrderID = order.ID,
                        OrderTime = DateTime.Now,
                        OrderUserID = order.OrderUserID,
                        ProductID = order.ProductID
                    }, transaction: trans);

                    // For Dapper/ADO.NET, need to pass transaction
                    var orderMessage = new OrderMessage()
                    {
                        ID = order.ID,
                        OrderUserID = order.OrderUserID,
                        OrderTime = order.OrderTime,
                        OrderItems = null,
                        MessageTime = DateTime.Now,
                        ProductID = order.ProductID // For demo use
                    };

                    await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans);

                    trans.Commit();
                }
            }

            return true;
        }

        private string GenerateOrderID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }

        private string GenerateEventID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }
    }

  这里摘抄一段CAP wiki中关于事务的一段介绍:

  事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,但是业务代码确执行失败。

  这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。

只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败___。_

换句话说,CAP会确保我们这段逻辑中业务代码和消息代码都成功了,才会真正让事务commit。

3.2 StorageService

  (1)启动配置:这里主要是指定Subscriber

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // EF DbContext
        services.AddDbContext<StorageDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:StorageDB"]);

        // Subscriber
        services.AddTransient<IOrderSubscriberService, OrderSubscriberService>();

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<StorageDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"];
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)实现Subscriber

  首先定义一个接口,建议放到公共类库中

    public interface IOrderSubscriberService
    {
        Task ConsumeOrderMessage(OrderMessage message);
    }

  然后实现这个接口,记得让其实现ICapSubscribe接口,然后我们就可以使用 CapSubscribeAttribute 来订阅 CAP 发布出来的消息。

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;
        
        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}");
            await UpdateStorageNumberAsync(message);
        }

        private async Task<bool> UpdateStorageNumberAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1
                                                                WHERE StorageID = @ProductID";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    ProductID = order.ProductID
                });

                return count > 0;
            }
        }
    }

*.CAP约定消息端在方法实现的过程中需要实现幂等性,所谓幂等性就是指用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。这里我没有考虑,实际中需要首先进行验证,避免二次更新

3.3 DeliveryService

  (1)启动配置:与StorageService高度类似,只是使用的不是同一个数据库

  (2)实现Subscriber

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;

        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}");
            await AddDeliveryRecordAsync(message);
        }

        private async Task<bool> AddDeliveryRecordAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"INSERT INTO [dbo].[Deliveries](DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime)
                                                            VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    DeliveryID = Guid.NewGuid().ToString(),
                    OrderID = order.ID,
                    OrderUserID = order.OrderUserID,
                    ProductID = order.ProductID,
                    CreatedTime = DateTime.Now
                });

                return count > 0;
            }
        }
    }

3.4 快速测试

  (1)启动3个微服务,Check 数据库表状态

  首先会看到在各个数据库中均创建了本地消息表,这两个表的含义如下:

  Cap.Published:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 ICapPublisher 接口 Publish 的消息内容。

  Cap.Received:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 CapSubscribe[] 订阅的那些消息。

  然后看看各个表的数据,目前只有库存表有数据,因为我们要做的只是更新。

  (2)通过Postman发一个Post请求

  (3)Check控制台输出的日志信息

  (4)Check数据库中的业务表和消息表数据:可以看到发送者和接收者都执行成功了,如果其中任何一个参与者发生了异常或者连接不上,CAP会有默认的重试机制(默认是50次最大重试次数,每次重试间隔60s),当失败总次数达到默认失败总次数后,就不会进行重试了,我们可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。

  另外,由于CAP会在数据库中创建消息表,因此难免会考虑到其性能。CAP提供了一个数据清理的机制,默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt (字段名)不为空并且小于当前时间的数据。

四、小结

  本篇首先简单介绍了一下CAP这个开源项目,然后基于上一篇中的下订单的小案例来进行了基于CAP的改造,并通过一个实例的运行来看到了结果。当然,这个实例并不完美,很多点都没有考虑(比如消息端消费时的幂等性)和失败重试的场景实践等等等等。由于时间和精力的关系,目前只使用到这儿,以后有机会能够应用上会研究下CAP的源码,最后感谢杨晓东为.NET社区带来了一个优秀的开源项目!

示例代码

  Click Here => 点我点我

参考资料

  CAP - GitHub : https://github.com/dotnetcore/CAP

  CAP - Wiki : https://github.com/dotnetcore/CAP/wiki

  杨晓东,《BASE:一种ACID的替代方案

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
12天前
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:&lt;https://github.com/khellang/Scrutor&gt;
35 5
|
2月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
1月前
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
40 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
20天前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
24 3
|
2月前
mcr.microsoft.com/dotnet/core/aspnet:2.1安装libgdiplus
mcr.microsoft.com/dotnet/core/aspnet:2.1安装libgdiplus
34 1
|
2月前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
1月前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
128 6
|
1月前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
49 1
|
18天前
|
Java 开发者 微服务
从单体到微服务:如何借助 Spring Cloud 实现架构转型
**Spring Cloud** 是一套基于 Spring 框架的**微服务架构解决方案**,它提供了一系列的工具和组件,帮助开发者快速构建分布式系统,尤其是微服务架构。
135 68
从单体到微服务:如何借助 Spring Cloud 实现架构转型
|
17天前
|
运维 监控 持续交付
微服务架构解析:跨越传统架构的技术革命
微服务架构(Microservices Architecture)是一种软件架构风格,它将一个大型的单体应用拆分为多个小而独立的服务,每个服务都可以独立开发、部署和扩展。
147 36
微服务架构解析:跨越传统架构的技术革命