看eShopOnContainers学一个EventBus

简介: 最近在看微软eShopOnContainers 项目,看到事件总线觉得不错,和大家分享一下看完此文你将获得什么?eShop中是如何设计事件总线的实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战发布订阅模式发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。

最近在看微软eShopOnContainers 项目,看到事件总线觉得不错,和大家分享一下

看完此文你将获得什么?

  1. eShop中是如何设计事件总线的
  2. 实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战

发布订阅模式

发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。

eShop中的EventBus就是基于这种模式的发布/订阅
发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例

在eShop中有两个EventBus的实现:

  • 基于RabbitMq的EventBusRabbitMQ
  • 基于AzureServiceBus的EventBusServiceBus

IEventBus开始

先来看一看,所有EventBus的接口IEventBus

public interface IEventBus
{
    void Publish(IntegrationEvent @event);

    void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;

    void SubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void UnsubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;
}

嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:

  1. Publish 发布
  2. Subscribe 订阅
  3. Unsubscribe 取消订阅

这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:

  1. 发布的内容(消息)必须是IntegrationEvent及其子类
  2. 订阅事件必须指明要订阅事件的类型,并附带处理器类型
  3. 处理器必须是IIntegrationEventHandler的实现类

Ok,看到这里先不要管Dynamic相关的方法,然后记住这个两个关键点:

  1. 事件必须继承IntegrationEvent
  2. 处理器必须实现IIntegrationEventHandler<T>TIntegrationEvent子类

另外,看下 IntegrationEvent有什么

public class IntegrationEvent
{
    public IntegrationEvent()
    {
        Id = Guid.NewGuid();
        CreationDate = DateTime.UtcNow;
    }

    public Guid Id  { get; }
    public DateTime CreationDate { get; }
}

IEventBusSubscriptionsManager是什么

public interface IEventBusSubscriptionsManager
{
    bool IsEmpty { get; }
    event EventHandler<string> OnEventRemoved;
    void AddDynamicSubscription<TH>(string eventName)
       where TH : IDynamicIntegrationEventHandler;

    void AddSubscription<T, TH>()
       where T : IntegrationEvent
       where TH : IIntegrationEventHandler<T>;

    void RemoveSubscription<T, TH>()
         where TH : IIntegrationEventHandler<T>
         where T : IntegrationEvent;
    void RemoveDynamicSubscription<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
    bool HasSubscriptionsForEvent(string eventName);
    Type GetEventTypeByName(string eventName);
    void Clear();
    IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
    IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
    string GetEventKey<T>();
}

这个接口看起来稍显复杂些,我们来简化下看看:

public interface IEventBusSubscriptionsManager
{
    void AddSubscription<T, TH>()
    void RemoveSubscription<T, TH>()
    IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() 
}

最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。

SubscriptionInfo是什么?

public bool IsDynamic { get; }
public Type HandlerType{ get; }

SubscriptionInfo中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。

这是你可能会有另一个疑问:

这个和IEventBus有什么关系?

  1. IEventBusSubscriptionsManager含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等
  2. IEventBusSubscriptionsManagerIEventBus使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:

     public void Subscribe<T, TH>()
         where T : IntegrationEvent
         where TH : IIntegrationEventHandler<T>
     {
         // 查询事件的全名
         var eventName = _subsManager.GetEventKey<T>();
    
         //向mq添加注册
         DoInternalSubscription(eventName);
    
         // 向manager添加订阅
         _subsManager.AddSubscription<T, TH>();
     }
    
     private void DoInternalSubscription(string eventName)
     {
         var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
         if (!containsKey)
         {
             if (!_persistentConnection.IsConnected)
             {
                 _persistentConnection.TryConnect();
             }
    
             using (var channel = _persistentConnection.CreateModel())
             {
                 channel.QueueBind(queue: _queueName,
                                     exchange: BROKER_NAME,
                                     routingKey: eventName);
             }
         }
     }

    查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。

另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle方法处理事件信息:

    private async Task ProcessEvent(string eventName, string message)
    {
        // 从manager查询信息
        if (_subsManager.HasSubscriptionsForEvent(eventName))
        {
            using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
            {

                // 从manager获取处理器
                var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                foreach (var subscription in subscriptions)
                {

                    // Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理)
                    if (subscription.IsDynamic)
                    { 
                        var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                        dynamic eventData = JObject.Parse(message);
                        await handler.Handle(eventData);
                    }
                    else
                    {
                        var eventType = _subsManager.GetEventTypeByName(eventName);
                        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                        var handler = scope.ResolveOptional(subscription.HandlerType);
                        var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                        await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                    }
                }
            }
        }
    }

IEventBusSubscriptionsManager的默认实现

在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager

这个类中有两个重要的字段

    private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
    private readonly List<Type> _eventTypes;

他们分别存储了事件列表和事件处理器信息词典

接下来就是实现一个

基于内存的事件总线

我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了

先贴出最终代码:

public class InMemoryEventBus : IEventBus
{
    private readonly IServiceProvider _provider;
    private readonly ILogger<InMemoryEventBus> _logger;
    private readonly ISubscriptionsManager _manager;
    private readonly IList<IntegrationEvent> _events;
    public InMemoryEventBus(
        IServiceProvider provider,
        ILogger<InMemoryEventBus> logger, 
        ISubscriptionsManager manager)
    {
        _provider = provider;
        _logger = logger;
        _manager = manager;
    }

    public void Publish(IntegrationEvent e)
    {

        var eventType = e.GetType();
        var handlers = _manager.GetHandlersForEvent(eventType.FullName);

        foreach (var handlerInfo in handlers)
        {
            var handler = _provider.GetService(handlerInfo.HandlerType);

            var method = handlerInfo.HandlerType.GetMethod("Handle");

            method.Invoke(handler, new object[] { e });
        }
    }

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {

        _manager.AddSubscription<T, TH>();

    }

    public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
    {
        throw new NotImplementedException();
    }

    public void Unsubscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        _manager.RemoveSubscription<T, TH>();
    }

    public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
    {
        throw new NotImplementedException();
    }
}

首先构造函数中声明我们要使用的东西:

public InMemoryEventBus(
    IServiceProvider provider,
    ILogger<InMemoryEventBus> logger, 
    ISubscriptionsManager manager)
{
    _provider = provider;
    _logger = logger;
    _manager = manager;
}

这里要注意的就是IServiceProvider provider这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的

public void Subscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{

    _manager.AddSubscription<T, TH>();

}

public void Unsubscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    _manager.RemoveSubscription<T, TH>();
}

订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。

接下来就是最重要的Publish方法,实现Publish有两种方式:

  1. 使用额外的线程和Queue让发布和处理异步
  2. 为了简单起见,我们先写个简单易懂的同步的

     public void Publish(IntegrationEvent e)
     {
         // 首先要拿到集成事件的Type信息
         var eventType = e.GetType();
    
         // 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfo
         var handlers = _manager.GetHandlersForEvent(eventType.FullName);
    
         // 不解释循环
         foreach (var handlerInfo in handlers)
         {
             // 从DI中获取类型的实例
             var handler = _provider.GetService(handlerInfo.HandlerType);
    
             // 拿到Handle方法
             var method = handlerInfo.HandlerType.GetMethod("Handle");
    
             // 调用方法
             method.Invoke(handler, new object[] { e });
         }
     }

OK,我们的InMemoryEventBus就写好了!

要实践这个InMemoryEventBus,那么还需要一个IntegrationEvent的子类,和一个IIntegrationEventHandler<T>的实现类,这些都不难,例如我们做一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字作为事件数据,B去订阅事件,并在自己的处理器中处理名字信息。

思路是这样的:

  1. 写一个 AddUserEvent:IntegrationEvent,里面有一个UserId和一个UserName
  2. 写一个AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中输出UserId和Name到日志。
  3. 注册DI,你要注册下面这些服务:

     IEventBus=>InMemoryEventBus
     ISubscriptionsManager=>InMemorySubscriptionsManager
     AddUserEventHandler=>AddUserEventHandler
  4. 在Startup中为刚刚写的事件和处理器添加订阅(在这里已经可以获取到IEventBus实例了)
  5. 写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的AddUserEvent作为参数传进去。

OK!到这里一个切实可用的InMemoryEventBus就可以使用了。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
资源调度 JavaScript 数据处理
vue3 element组件上传图片
vue3 element组件上传图片
865 0
|
Ubuntu Linux Windows
Linux系统盘制作(Rufus)
Linux系统盘制作(Rufus)
5483 0
|
10月前
|
存储 Windows
SD 卡格式化还可以恢复数据吗?4个方法帮你恢复SD卡数据
SD卡作为存储设备,被广泛应用于手机、相机和其他数码设备中。有时我们可能会不小心格式化SD卡,或者系统提示需要格式化,导致重要数据似乎“一瞬间消失”。但别急,实际上,大多数情况下数据是可以恢复的。接下来,我们将用简单易懂的语言,带你了解格式化后数据是否还在,以及如何一步步找回那些丢失的文件。
|
运维 监控 Kubernetes
读《SRE:Google运维解密》一点思考
## 0、为什么诞生SRE? + 原因一:企业成本的增长通用户的增长不成线性变化。但是随着系统的复杂度提升,组建越来越多,用户的流量压力也越来越大,相关的变更也会越来越多,各模块之间的变更顺序也会越来越复杂。
4842 0
读《SRE:Google运维解密》一点思考
|
Java Linux Maven
IDEA Maven项目上传jar包到nexus仓库图文教程
IDEA Maven项目上传jar包到nexus仓库图文教程
1478 0
|
数据可视化 JavaScript 定位技术
vue中使用vue-echarts引入百度地图实现数据可视化
本文主要讲解vue-echarts引入百度地图实现数据可视化,实现在项目中使用百度地图的效果
841 0
vue中使用vue-echarts引入百度地图实现数据可视化
|
机器学习/深度学习 Kubernetes 算法
浅析自动机器学习(AutoML)工具NNI(上)
NNI 简介 NNI (Neural Network Intelligence) 是一个轻量级但功能强大的自动机器学习(AutoML)工具包,可帮助用户自动化特征工程、神经架构搜索、超参数调优和模型压缩,并支持单机、本地多机、云等不同的运行环境。
|
消息中间件 SQL 监控
接口自动化测试实践指导(下):接口自动化测试断言设置思路
接口自动化测试实践指导(下):接口自动化测试断言设置思路
3663 0
接口自动化测试实践指导(下):接口自动化测试断言设置思路
|
JavaScript 网络协议 PHP
使用阿里云npm镜像加速
npm(node package manager)是随同NodeJS一起安装的包管理工具,能解决NodeJS代码部署上的很多问题,常见的使用场景有以下几种: 允许用户从NPM服务器下载别人编写的第三方包到本地使用。 允许用户从NPM服务器下载并安装别人编写的命令行程序到本地使用。 允许用户将自己编写的包或命令行程序上传到NPM服务器供别人使用。 由于新版的nodejs已经集成了npm,所以之前npm也一并安装好了。同样可以通过输入 &quot;npm -v&quot; 来测试是否成功安装。命令如下,出现版本提示表示安装成功
53823 4
|
运维 Linux
CentOS 9 开局配置
CentOS 9 发布有几年了,一直没有尝试使用,CentOS 9 有一些变动。
889 0