发布订阅模式原理及其应用(多种语言实现)

简介: 发布订阅模式原理及其应用(多种语言实现)

软件设计模式发布-订阅模式 及其应用


本文介绍的设计模式是一种思想,与编程语言无关。其中包含 python、typescript、dart、powershell等多种语言的代码,读者也可以自己尝试用其他语言再撸一次。

1. 引例:从我的一个经历说起

1.1 从 订阅 到 发布

记得一九年的时候我刚刚来到深圳工作,众所周知那时候还没有爆发 新冠疫情,身边的同事们组队去香港购物是常有的事情。但是那会儿我还没有办理港澳通行证,于是年底回老家的时候去当地政务中心办理了。

办证是需要时间的,万万没想到的是二零年春节前夕——新冠疫情爆发了。当我回到深圳后的某一天接到老家政务中心的电话,通知我由于疫情的原因,通信证的办理已经被暂停了并且什么时候恢复办理还不能确定,如果愿意等待,则需要到恢复办证的时候,再通知我们

—— 这就是一个 发布-订阅模式的典型例子。

发布-订阅 模式 模式中的多方可以分为两类,一类是消息的 发布者,另外一类是消息的 订阅者。在上面的案例中,政务中心的工作人员就是 发布者,当我表示愿意等到恢复通信证办理时,我就 向发布者订阅了 恢复办理的通知(消息),因此我时消息的 订阅者

这样有什么好处呢:

  • 对于我(订阅者)来说,不需要每隔几天就打电话到政务服务中心(发布者)去询问是否恢复办理的消息;
  • 对于政务服务中心(发布者)同样也不需要每天回答相同的问题——毕竟何时恢复办理他们也不能确定。
  • 一旦恢复办理,政务服务中心(发布者)可以一次性地通知所有和我一样地广大订阅者

看到了吗——相比于我们去轮询以获取消息,改用发布-订阅 模式 同时节省了我们双方地时间!

多么棒地思想!——运用于程序设计中岂不秒哉?

1.2 如果我不想继续订阅了

有一种情况也是非常常见的,那就是我不愿意继续等待消息了,也有可能是这个消息对我来说已经不重要了。这时我不再希望继续收到来自发布者的恢复办理通知,那就需要 退订

还记得吗——当我们订阅的时候,是将我们的订阅意愿登记在发布者那边的,这样就能实现发布者在适当的时候通过查询 所有的登记记录 然后逐一通知。

因此如果一旦有用户需要退订,其实很简单,只需要订阅者在他们所登记订阅的“订阅者登陆表”中将订阅信息删除掉即可,这样下一次广播通知的时候就不会再将消息发送给退订的用户。

2. 发布-订阅 的 实践、应用、思考

2.1 实践:用 Python 来复现上面的场景

如果现在你好像明白 发布-订阅模式 的基本思想了——那么就请成热打铁,跟着我用程序来模拟一下证件办理的情景。为了便于理解,我们选用了最简单(到小学生都能一学就会)的 Python 语言来实现它。当然你也可以在阅读后尝试使用其它的语言来实现一遍。(如果需要,你可可以参考下面代码的 TypeScript 实现

class Publisher:
    def __init__(self, name):
        self.subscribers = set() # 用于登记(保存)订阅者 的容器
        self.name = name         # 发布者的名字其实没有实质作用,只是为了后面打印文章
    def add_subscriber(self, subscriber):
        """用于发布者接受订阅者的订阅,即添加订阅者
        """
        self.subscribers.add(subscriber)
    def remove_subscriber(self, subscriber):
        """取消订阅,即把一个已经登记在册的订阅记录从登记容器中进行删除
        """
        self.subscribers.remove(subscriber)
        print(f"\n=>{subscriber.name} 已取消订阅。\n")
    def notify_all(self, arg):
        """通知所有订息者
        """
        for subscriber in self.subscribers:
            subscriber.notify(self, arg)
class Subscriber:
    def __init__(self, name):
        self.name = name
    def notify(self, publisher, arg):
        """用于被发布者通知的接口
        """
        print(f"\"{self.name}\"(订阅者) 收到的通知来自 \"{publisher.name}\"(发布者)的通知: {arg}")
if __name__ == '__main__':
    publisher = Publisher("政务服务中心")
    jackLee = Subscriber("jackLee")
    jackMa = Subscriber("jackMa")
    publisher.add_subscriber(jackLee)
    publisher.add_subscriber(jackMa)
    print('------- 第一次发布消息 -------')
    publisher.notify_all("[通知] 恢复证件办理!")
    # 用户 jackMa 取消订阅
    publisher.remove_subscriber(jackMa)
    print('------- 第二次发布消息 -------')
    publisher.notify_all("[通知] 恢复证件办理!")

运行该脚本(你需要安装 Python 运行时,可以在Python 官网下载安装之:https://www.python.org/)可以看到控制台上的输出结果如下:

------- 第一次发布消息 -------
"jackMa"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!
"jackLee"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!
=>jackMa 已取消订阅。
------- 第二次发布消息 -------
"jackLee"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!

可以看到订阅者有两个订阅者实例:jackLee(本人)、jackMa(可能是阿里出来的)共同订阅了证件办理信息。

政务服务中心(发布者)第一次发布恢复通知的时候,jackLeejackMa 这两个同学都订阅了消息,因此都受到了来自该中心的通知。后来,jackMa 可能由于已经派小弟火速前往该中心取走了他的证件不需要继续订阅了,于是该中心的工作人(发布者)员调用publisher.remove_subscriber(jackMa) 从该中心的订阅者记录表中移除了 jackMa 的订阅记录。

于是,到了该中心第二次发布消息的时候,jackMa 已经不会再收到恢复证件办理消息,而 jackLee 还可以接收到恢复证件办理的消息。

2.2 应用:一个 Web UI 中的案例

在这个例子中的 发布者 和 订阅者的实现和上一个案例中的基本相同(请参考附录中上一个案例的 TypeScript 实现)。

先看代码:

<!DOCTYPE html>
<html lang="zh-CN">
  <head>
    <meta charset="utf-8">
    <meta name="keywords" content="Publish-subscribe mode">
    <title>发布订阅在一个在Web UI中的例子</title>
  </head>
  <body>
    <div class="col">
      <button class=".my-button" id="myButton">点击我</button>
      <div id="myDisplay">你还未曾点击过呢!</div>
    </div>
    <script setup="" lang="ts">
    class Publisher {
      _subscribers;
      _name;
      constructor(name) {
        this._subscribers = new Set()
        this._name = name
      }
      get name() {
        return this._name;
      }
      addSubscriber(subscriber) {
        this._subscribers.add(subscriber)
      }
      removeSubscriber(subscriber) {
        this._subscribers.delete(subscriber)
        console.log(`\n=>${subscriber.name} 已取消订阅。\n`)
      }
      notifyAll(data) {
        this._subscribers.forEach((subscriber) => {
          subscriber.update(this, data)
        })
      }
    }
    class Subscriber {
      _name
      constructor(name) {
        this._name = name
      }
      get name() {
        return this._name
      }
      update(publisher, data) {
        const displayElement = document.getElementById('myDisplay');
        displayElement.innerHTML = data.msg + data.ct.toString();
      }
    }
    let counter = 0;
    const publisher = new Publisher('publisher');
    const subscriber = new Subscriber('subscriber');
    publisher.addSubscriber(subscriber);
    const notify = () => {
      counter += 1;
      const data = { msg: '你已经点击了:', ct: counter + ' 次!' };
      publisher.notifyAll(data);
    }
    document.getElementById('myButton').addEventListener('click', notify);
    </script>
    <style>
      html {
        width: 100%;
        height: 100%;
      }
      body {
        width: 100%;
        height: 100%;
        display: grid;
        place-items: center;
      }
      .col {
        display: flex;
        flex-direction: column;
      }
    </style>
  </body>
</html>

在浏览器打开该 Html 效果看起来是这样的:

在这个例子中我们用一个 按钮来触发另一个显示区的UI变化,这个触发是通过调用发布者的 notifyAll 方法实现的。实际上,我们可以多个按钮共用一个 发布者,使用相同的方式进行消息的发布。

对于订阅者来说,其实也可以有多个,比如这里的显示区:

<div id="myDisplay">你还未曾点击过呢!</div>

我们只要稍作更改:

<div id="myDisplay1">显示区1</div>
<div id="myDisplay2">显示区2</div>
<div id="myDisplay3">显示区3</div>

那这有什么用呢?

举一个实际生活中的例子,有三个人去售楼中心询问房价,售楼中心需要等待上级的具体通知才知道单价,而三个人虽然都订阅了房价消息,但是由于他们各自想买的面积大小不一样。这时作为售楼中心来说,一旦上级公司确定好价格,虽然是统一发布Email给三个购房者的,但是三个购房者接受到的房屋参考价格却是不一样的。怎么样,我想你一定可以自己把这样的场景用编程实现出来。

3. 通用型发布者对象的改进

3.1 从 Subscriber 的服务员 到 事件的发布者

在阅读本小节前请读者先自己尝试回答这个问题:Subscriber 类真的有必要实现吗?

在我们上面的代码中,Subscriber 类 实现了几乎唯一一个有用的方法:update,它的作用却是给 Publisher 类的 notifyAll 方法进行调用。

从现实生活中给一个解释:

notifyAll 是消息的发布这发布消息的工具,update 是订阅用户接受到的新的定制化消息,比如同样是订阅了售房信息,但是由于不同类别的购房者订阅时所选定的楼层、大小等参数不一样,则这些不同订阅者接收到的发布结果不一样——也就表面原始的消息需要为不同的订阅者做一些 定制化 处理。在之前的代码实现中,这个消息的定制化工作就是使用 Subscriber 类的 update 方法实现的。

很显然,上表面的代码要真正实现定制化,往往不仅是参数值的不同,可能对参数的处理也不一样。因此仅仅依赖参数data是不合理的。因此我们大概是需要写多个仅仅 update 方法的实现不同的 Subscriber 类——这不太好。略好一点的办法是,让 update 方法接受的不是单纯的数据 data,而是一个 回调函数 传入 update 方法中。

先不着急修改我们的代码。对于发布者来说,似乎可以提供 更加周到 的服务——直接登记好订阅着的定制化需求处理方式,使用订阅者要求的处理方式处理好定制的消息后,直接告诉订阅者。——因此 update 这个接受表示用户定制化需求处理方式的方法可以直接合并到 发布者那边。

于是 Subscriber 类 就不需要了,现在我们只需要更新一下我们的 Publisher。更新的思路是这样的:

  • 添加订阅者时(Publisher.addSubscriber)不仅需要记录订阅者名字,还要记录一个对应的响应函数用以消息发布后给订阅者提供定制化服务。

Publisher 看,需要登记的内容又多了一些。不过好在 订阅者名称(认为是唯一标识符)和 与之对于的服务(回调函数),是对应的关系,既可以一对一,也可以一对多(表示这个订阅者需要多个定制化服务)。因此我们将 Publisher 的 “记录本”改成下面的类型:

Map<string, Function[]>

这个映射(Map)的 key 就表示 订阅者名称,而value 部分是一组函数,表示该订阅者需要的各种服务。

另外,到了这里,对于 Publisher来说,添加订阅者就转化为了 为订阅者订阅各种定制化服务 。同时反过来看,对于某个具体的订阅者 Subscriber,一旦它的服务定制数组 (Function[])为空数组,表明他已经没有任何订阅,也不再需要接收发布者的任何消息了。

因此先前我们使用的方法名addSubscriber 不适用了,从含以上换成 addListener 似乎更加合适

为什么呢? 我们接下来对此做进一步说明。

一直以来,我们聚焦点都在于 发布者订阅者,而忽略了 引起发布者发布的事件。 这个方法接受两个参数,一个是用户名,一个是为用户新增的回调。同时必须指出的是,这个 回调 往往是需要再其调用时接受一些数据的,比如由发布者发布的某些原始数据,他们就像是时时刻刻地 监听着守候着 发布者 发布一个事件一旦这个 事件/消息 被发布,就 完成消息发布后为 订阅者 所提供地服务

换一下思路,我们接着把聚焦点转移到 事件 上来。

其实从现实中看,同一个事件发生,可能意味着可能需 要干很多事,既可以 服务更多的订阅者,也可以干其它任何的工作——我们一味地想着在发布者处登记订阅者的id然后完成订阅者的需求,那么 没有区别为何事件而需要去发布这些消息

更好的做法是 不再记录订阅者,而是记录为什么要发布消息给订阅者——也就是记录 事件这样我们就可以在同一个事件发生的时候,通过一系列的属于该事件函数(可能一个或多个回调函数服务于同一个订阅者),完成该事件的响应,也就是回调函数们。

从这个意义上看,我们所关注所谓的 订阅者 可以看作 一个事件发布后,发布者需要调用的一组函数。而所谓 发布,实际上就是调用这组函数以 完成事件(的回调)。因此我们接下来该用 listener 表示监听事件以待执行的回调函数, event 表示事件名, emit 表示这个事件发生后需要由发布者调用函数的过程。

至此,我们的 Publisher 从一个 Subscriber 的服务员 转型成为了职业 事件的管理者,不妨给它改个名——EventEmitter

完成代码:

class EventEmitter {
  private _events: Map<string | symbol, Function[]>;
  constructor() {
    this._events = new Map()
  }
  /**
   * 添加事件的回调函数,若事件不存在则先创建该事件
   * @param event 
   * @param callback 
   */
  public addListener(event: string, callback: Function) {
    // 如果订阅者还不存在,先添加订阅者,再为订阅者添加对应的回调到其回调数组
    if (!this._events.has(event)) {
      this._events.set(event, []);
    }
    // 如果订阅者已经存在,则直接为订阅者增加新的回调到其回调数组
    this._events.get(event)?.push(callback);
  }
  /**
   * 移除事件的回调函数,如果事件不存在则什么不干而直接返回
   * @param event 
   * @param listener 
   * @returns 
   */
  public removeListener(event: string, listener: Function) {
    if (!this._events.has(event)) {
      return ;
    }
    const listeners = this._events.get(event);
    const index = listeners?.indexOf(listener);
    if (index !== undefined && index !== -1) {
        listeners?.splice(index, 1);
    }
  }
  /**
   * 触发一个事件
   * @param name 要触发的事件的名字
   * @param args 给事件回调的参数
   * @returns 
   */
  emit(event: string, ...args: any[]): boolean {
    // 索引对应于给定事件的监听器数组
    const listeners = this._events.get(event);
    // 若事件不存在直接返回 false
    if (!listeners) {
      return false;
    }
    // 执行给定事件的所有监听器函数(回调)
    listeners.forEach((listener) => {
      listener(...args);
    });
    return true;
  }
}

3.2 完善事件的添加移除发出

在上一小节,我们的 EventEmitter 有三个方法,分别用于添加事件及其回调(addListener)、移除事件回调(removeListener)、发出事件。其中前两个方法,为了能链式添加或删除事件(回调),需要返回自身的引用。

另外,还应该可以一次性移除所有的事件回调、获取指定事件的当前所有已经注册的回调数量、。

代码修改如下:

export class EventEmitter {
    private _events: Map<string|symbol, Function[]>;
    constructor() {
        this._events = new Map();
    }
    /**
     * 添加 listener
     * @param event 
     * @param listener 
     * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
     */
    addListener(event: string|symbol, listener: Function): this {
        if (!this._events.has(event)) {
            this._events.set(event, []);
        }
        this._events.get(event)?.push(listener);
        if (this._events.get(event)?.length === this._maxListeners) {
            console.warn(`Max listeners (${this._maxListeners}) reached for event ${String(event)}`);
        }
        return this
    }
    /**
     * 移除指定的 listener
     * @param event 
     * @param listener 
     * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
     */
    removeListener(event: string|symbol, listener: Function): this {
        if (!this._events.has(event)) {
            return this;
        }
        const listeners = this._events.get(event);
        const index = listeners?.indexOf(listener);
        if (index !== undefined && index !== -1) {
            listeners?.splice(index, 1);
        }
        return this
    }
    /**
     * 移除所有的 Listener
     * @param event 事件名
     * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
     */
    removeAllListeners(event?: string|symbol): this {
        if (event) {
            this._events.delete(event);
        } else {
            this._events.clear();
        }
        return this
    }
    /**
     * 获取指定事件的监听器数组
     * @param event 一个表示事件(key)的字符串
     * @returns 事件回调
     */
    listeners(event: string|symbol): Function[] {
        return this._events.get(event) || [];
    }
    /**
     * 事件发生器
     * 
     * - 若指定事件不存在,则返回 flase
     * - 反之,则执行指定事件的所有回调,并返回true
     * 
     * @param event 事件名字符串(key)
     * @param args 事件回调的参数
     * @returns 事件是否存在
     */
    emit(event: string|symbol, ...args: any[]): boolean {
        // 索引对应于给定事件的监听器数组
        const listeners = this._events.get(event);
        // 若事件不存在直接返回 false
        if (!listeners) {
            return false;
        }
        // 执行给定事件的所有监听器函数(回调)
        listeners.forEach((listener) => {
            listener(...args);
        });
        return true;
    }
    /**
     * 返回指定事件的监听器(回调)数量
     * 
     * 当指定的事件不存在时返回值为0
     * @param event 事件名字符串(key)
     * @returns 指定事件的现有监听器的数量
     */
    listenerCount(event: string|symbol): number {
        const listeners: Function[] | undefined = this._events.get(event);
        return listeners?.length || 0;
    }
}

3.3 限制侦听器的数量

本节我们在之前的基础上添加一个限制监听器数量的函数。这个功能比较简单,需要添加一个属性用于记录设置的最大监听器数量。

export class EventEmitter {
private _events: Map<string|symbol, Function[]>;
  private _maxListeners: number;
  constructor() {
      this._events = new Map();
      this._maxListeners = 10;
  }
  // ...
  // 修改或添加以下方法
  /**
   * 添加 listener
   * @param event 
   * @param listener 
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  addListener(event: string|symbol, listener: Function): this {
      if (!this._events.has(event)) {
          this._events.set(event, []);
      }
      this._events.get(event)?.push(listener);
      if (this._events.get(event)?.length === this._maxListeners) {
          // console.warn(`Max listeners (${this._maxListeners}) reached for event ${String(event)}`);
          throw Error('Max listeners exceeded')
      }
      return this
  }
  /**
   * 设定最大 Listener 数量
   * @param n 
   */
  set maxListeners(n: number) {
      this._maxListeners = n;
  }
  /**
   * 获取最大 Listener 数量
   */
  get maxListeners():number {
      return this._maxListeners;
  }
  /**
   * 设定最大 Listener 数量
   * @param n 要设置的最大监听器数量
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  setmaxListeners(n: number) {
    this._maxListeners = n;
    return this
  }
}

3.4 单次有效的侦听器

export class EventEmitter {
  // ...省略其它内容
  /**
   * 为名为 event 的事件添加 一次性 的 listener 函数。
   * 下次 event 被触发时,该 listener 被移除,然后被调用。
   * @param event 事件名字符串
   * @param listener 事件回调函数
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  once(event: string | symbol, listener: Function) {
    // 创建一个新函数,在被调用后移除自身
    const _onceListener = (...args: any[]) => {
      this.removeListener(event, _onceListener);
      listener(...args);
    };
    this.addListener(event, _onceListener);
    return this;
  }
  /**
   * 将名为 `event` 的事件的 **一次性** 的 `listener` 函数添加到 listeners 数组的`_beginning_ ` 中。
   * 下次 `event` 事件被触发时,该 侦听器 被移除,然后被调用。
   * @param event 
   * @param listener 监听器回调函数
   */
  prependOnceListener(event: string | symbol, listener: Function) {
    // 创建一个新函数,在被调用后移除自身
    const _onceListener = (...args: any[]) => {
      this.removeListener(event, _onceListener);
      listener(...args);
    };
    this.prependListener(event, _onceListener);
    return this
  }
}

4. 发布-订阅模式 的 高级实践

4.1 全局状态管理初步

一些有前端编程基础的读者可能使用过很多知名的全局状态管理模块,比如 专用于 VueVuex(废弃)、Pinia(思想较新,用起来爽),以及由 React 团队推出但是也可以用于其它任何需要状态管理地方的 Redux(就是不怎么好用, 被站在肩上的前巨人)。

这些工具的所谓 状态管理,无非是多个不同模块需要操作同一个数据对象,对其进行增删改查,说成 数据同步 模块其实更容易让外人理解。

不过起初它真的就是从实现 状态管理(State Manager)来的。我们这一节的目标就是通过 发布-订阅模式 实现这样一个状态管理工具 StateManager

说明:

这里我们使用的 EventEmitter 就是之前小节带大家写好的通用发布器 EventEmitter 。其目录位置为 src/utils/event.ts,本小节直接导入过来使用。

下面,我们来实现状态管理器 StateManager 对象:

// src/utils/state.ts
import { EventEmitter } from './event'
/**
 * 状态管理器
 */
export class StateManager<T> {
    private _state: T;
    private _eventEmitter: EventEmitter;
    constructor(initialState: T) {
      this._state = initialState;
      this._eventEmitter = new EventEmitter();
    }
    /**
     * 获取当前的状态值
     */
    public get state():T {
      return this._state;
    }
    /**
     * 更新的状态
     */
    public set state(newState: T) {
      this._state = newState;
      this._eventEmitter.emit('stateChange', this._state);
    }
    /**
     * 订阅状态改变时的事件(回调)
     * - 一个状态发生改变时,可以订阅多个回调消息
     * - 所谓消息即以一定方式封装的回调函数
     * - 所有回调消息将在状态变化时依次发出(执行回调)
     * - 依次执行的顺序也就是订阅消息的顺序
     * @param callback 状态改变后订阅的回调函数
     */
    public subscribe(callback: (state: T) => void): this {
      this._eventEmitter.addListener('stateChange', callback);
      return this;
    }
    /**
     * 取消订阅的回调事件
     * @param callback 要退订的回调函数
     */
    public unsubscribe(callback: (state: T) => void): this {
      this._eventEmitter.removeListener('stateChange', callback);
      return this
    }
  }

导出我们的工具:

// src/utils/index/ts
export * from './event'
export * from './state'

现在我们新建 src/index.ts 文件,调用 StateManager 来简单使用它:

// src/index.ts
import { StateManager } from './utils'
// 创建状态管理实例
const stateManager = new StateManager<{
    value:number
}>({ value: 0, });
// 副作用函数1
const sideEffec1 = (state: any) => {
    console.log('我是 sideEffec1 ,状态被更新,当前状态值为:', state.value);
}
// 副作用函数2
const sideEffec2 = (state: any) => {
  console.log('我是 sideEffec2 ,执行顺序为执行 “subscribe” 方法的顺序。')
}
// 设置状态更新后的回调,可以采用链式调用
stateManager
  .subscribe(sideEffec1)
  .subscribe(sideEffec2)
// 更新你的状态
console.log('------- 第一次改变状态 -------');
stateManager.state = { value: 1 };
console.log('\n------- 第二次改变状态 -------');
stateManager
  .unsubscribe(sideEffec2)  // 取消状态发生改变时订阅的 sideEffec2。
  .state = { value: 2 };    // 指挥执行没有被取消的订阅

4.2 在有限状态机(FSM)中的应用

FSM 是离散数学中介绍的一个思想,在 数字电路设计 及其相关的 EDA 技术(如硬件描述语言)中应用最为广泛。随着计算机技术的发展,FSM 作为一种独立的设计模式,也是编程思想,被大量应用于软件工程中。关于 finite-state machine (FSM) 相关知识 我在另外一篇博文《有限状态机(FSM)理论及其实践》 中有更加详细的介绍。这一小结仅仅是将 发布-订阅 模式应用于状态机的编写,提供一种状态机编写方案。


         

5. (可忽略)NodeJS 中的 EventEmitter

5.1 概述

NodeJS 的 events 模块提供了一个名为 EventEmitter 的对象,其除了比我们前面源码实现的 EventEmitter 多了两个别名方法(分别是onoff)外,似乎每找到任何其它的区别。这两个别名方法的情况如下:

  • on 方法为 addListener 方法的别名方法,其类型签名如下(当然与 addListener 方法完全一样):
on(eventName: string | symbol, listener: (...args: any[]) => void): this;
  • off 方法为 removeListener 方法的别名方法,其类型签名如下(当然与 addListener 方法完全一样):
off(eventName: string | symbol, listener: (...args: any[]) => void): this;

NodeJS 环境中你可直接将该对象导入使用:

import {EventEmitter} from 'event';
// ... your codes

如果你能读懂我之前关于 EventEmitter 的实现,下一小节 5.2 EventEmitter 接口解析 关于 NodeJS 中提供的接口不看也罢,只当直到在该运行时下可以使用即可。

5.2 EventEmitter 接口解析

5.1.1 on()、addListener() 方法

其中 addListeneron(eventName, listener) 的别名,它们的方法类型签名如下:

addListener(eventName: string | symbol, listener: (...args: any[]) => void): this;
on(eventName: string | symbol, listener: (...args: any[]) => void): this;

参数:

  • eventName 事件的名称
  • listener 回调函数

返回:

  • EventEmitter的引用,以便可以链式调用

listener函数添加到名为eventName的事件的 listeners 数组的末尾。不检查是否已经添加了listener。多次调用传递 eventNamelistener的相同组合将导致多次添加和调用listener。例如:

server.on('connection', (stream) => {
  console.log('someone connected!');
});

默认情况下,event listeners 按照添加的顺序被调用。emitter.prependListener() 方法可用作将 event listeners 添加到 listener 数组开头的替代方法。

const myEE = new EventEmitter();
myEE.on('foo', () => console.log('a'));
myEE.prependListener('foo', () => console.log('b'));
myEE.emit('foo');

Output[]:

b
a

5.1.2 once() 方法

为名为 eventName 的事件添加 一次性listener 函数。下次eventName被触发时,该 listener 被移除,然后被调用。其类型签名为:

once(eventName: string | symbol, listener: (...args: any[]) => void): this;

参数:

  • eventName 事件的名称。
  • listener 回调函数

返回对 EventEmitter 的引用,以便可以链式调用

例如:

server.once('connection', (stream) => {
  console.log('Ah, we have our first user!');
});

默认情况下,event listeners 按照添加的顺序被调用。emitter.prependOnceListener() 方法可用作将 event listeners 添加到 listener 数组开头的替代方法。

const myEE = new EventEmitter();
myEE.once('foo', () => console.log('a'));
myEE.prependOnceListener('foo', () => console.log('b'));
myEE.emit('foo');

Output[]:

b
a

5.1.3 removeListener()、off() 方法

其中off()emitter.removeListener() 的别名

removeListener(eventName: string | symbol, listener: (...args: any[]) => void): this;
off(eventName: string | symbol, listener: (...args: any[]) => void): this;

返回对 EventEmitter 的引用,以便可以链式调用

从名为 eventName 的事件的 listener 数组中删除指定的 listener

const callback = (stream) => {
  console.log('someone connected!');
};
server.on('connection', callback);
// ...
server.removeListener('connection', callback);

removeListener() 最多从侦听器数组中删除一个 listener 实例。如果任何单个 listener 已多次添加到指定 eventName 的 listener 数组中,则必须多次调用 removeListener() 来删除每个实例。

一旦一个事件被发出,所有在发出时附加到它的 listener 都被按顺序调用。这意味着任何 removeListener()removeAllListeners() 调用 _after_ 发出和 _before_ 最后一个 listener 完成执行不会将它们从正在进行的 emit() 中移除。后续事件的行为与预期一致。

const myEmitter = new MyEmitter();
const callbackA = () => {
  console.log('A');
  myEmitter.removeListener('event', callbackB);
};
const callbackB = () => {
  console.log('B');
};
myEmitter.on('event', callbackA);
myEmitter.on('event', callbackB);
// callbackA 删除监听程序回调,但仍会被调用。
// 发出时的 内部 listener 数组 [callbackA,callbackB]
myEmitter.emit('event');
// Prints:
//   A
//   B
// callbackB 现已删除。
// 内部 listener 数组[callbackA]
myEmitter.emit('event');
// Prints:
//   A

因为 listener 是使用内部数组进行管理的,所以调用此函数将会更改任何侦听器的位置索引,这些 listener 注册为 _after_ 被删除的 listener 。这不会影响 listener 的调用顺序,但这意味着需要重新创建由 emitter.listeners() 方法返回的 listener 数组的任何副本。

当单个函数作为单个事件的处理程序被多次添加时(如下例所示), removeListener() 将删除最近添加的实例。在该示例中,删除了once('ping')listener :

const ee = new EventEmitter();
function pong() {
  console.log('pong');
}
ee.on('ping', pong);
ee.once('ping', pong);
ee.removeListener('ping', pong);
ee.emit('ping');
ee.emit('ping');

5.1.4 removeAllListeners() 方法

removeAllListeners(event?: string | symbol): this;

删除所有 listener 或指定 eventName 的侦听器。

删除代码中其他地方添加的 listener 是一种不好的做法,特别是当 EventEmitter 实例是由其他组件或模块(例如 sockets 或 文件流)创建的时候。

返回对 EventEmitter 的引用,以便可以链式调用

5.1.5 setMaxListeners() 方法

setMaxListeners(n: number): this;

默认情况下,如果为某个特定事件添加了10个以上的 listener, EventEmitter 将打印一个警告。这是一个有用的缺省值,有助于发现内存泄漏。 emitter.setMaxListeners() 方法允许为此特定的 EventEmitter 实例修改限制。该值可以设置为Infinity(或0),以表示无限数量的侦听器。

返回对 EventEmitter 的引用,以便可以链式调用

5.1.6 getMaxListeners() 方法

getMaxListeners(): number;

返回 EventEmitter 的当前最大 listener 值,该值由 emitter.setMaxListeners(n) 设置或默认为 {@link defaultMaxListeners}。

5.1.7 listeners() 方法

listeners(eventName: string | symbol): Function[];

返回名为 eventName 的 event listener 数组的副本。

server.on('connection', (stream) => {
  console.log('someone connected!');
});
console.log(util.inspect(server.listeners('connection')));

Output[]:

[ [Function] ]

5.1.8 rawListeners() 方法

rawListeners(eventName: string | symbol): Function[];

返回名为 eventName 的事件的侦听器数组的副本,包括任何wrappers (比如由 .once() 创建的 wrappers)。

const emitter = new EventEmitter();
emitter.once('log', () => console.log('log once'));
// 使用函数 `onceWrapper` 返回一个新数组,该函数的属性 `listener` 包含上面绑定的 原始listener
const listeners = emitter.rawListeners('log');
const logFnWrapper = listeners[0];
// 将 "log once" 记录到控制台,并且不解除 `once` 事件 logFnWrapper.listener(); 的绑定;
// 将 "log once" 记录到控制台,并删除侦听器 logFnWrapper();
emitter.on('log', () => console.log('log persistently'));
// 将返回一个新数组,该数组包含一个由 `const newListeners = emitter.rawListeners('log');` 上方的  `.on()` 绑定的函数
// 将 "log persistently" 记录到控制台两次
newListeners[0]();
emitter.emit('log');

5.1.9 emit() 方法

按照注册顺序同步调用为名为 eventName 的事件注册的每个listener,并将提供的参数传递给每个 listener。

如果事件有侦听器,则返回 true ,否则返回 false

const EventEmitter = require('events');
const myEmitter = new EventEmitter();
// 第一个 listener
myEmitter.on('event', function firstListener() {
  console.log('Helloooo! first listener');
});
// 第二个 listener
myEmitter.on('event', function secondListener(arg1, arg2) {
  console.log(`event with parameters ${arg1}, ${arg2} in second listener`);
});
// 第三个 listener
myEmitter.on('event', function thirdListener(...args) {
  const parameters = args.join(', ');
  console.log(`event with parameters ${parameters} in third listener`);
});
console.log(myEmitter.listeners('event'));
myEmitter.emit('event', 1, 2, 3, 4, 5);

Output[]:

[
  [Function: firstListener],
  [Function: secondListener],
  [Function: thirdListener]
]
Helloooo! first listener
event with parameters 1, 2 in second listener
event with parameters 1, 2, 3, 4, 5 in third listener
emit(eventName: string | symbol, ...args: any[]): boolean;

5.1.10 listenerCount() 方法

返回侦听名为 eventName的事件的 listeners 数量。

  • eventName 正在侦听的事件的名称
listenerCount(eventName: string | symbol): number;

5.1.11 prependListener() 方法

listener 函数添加到名为 eventName 的事件的listeners数组的_beginning_ of中。不检查是否已经添加了 listener。多次调用传递“事件名称”和 listener 的相同组合将导致多次添加和调用 listener

prependListener(eventName: string | symbol, listener: (...args: any[]) => void): this;

参数:

  • eventName 事件的名称。
  • listener 回调函数

返回对 EventEmitter 的引用,以便可以链式调用

server.prependListener('connection', (stream) => {
  console.log('someone connected!');
});

5.1.12 prependOnceListener() 方法

将名为 eventName 的事件的一次性listener 函数添加到 listeners 数组的_beginning_ 中。下次 eventName 被触发时,该 listener 被移除,然后被调用。

server.prependOnceListener('connection', (stream) => {
  console.log('Ah, we have our first user!');
});

返回对 EventEmitter 的引用,以便可以链式调用

  • eventName 事件的名称。
  • listener 回调函数
prependOnceListener(eventName: string | symbol, listener: (...args: any[]) => void): this;

5.1.13 prependOnceListener() 方法

返回一个数组,其中列出 emitter 已注册 listeners 的事件。数组中的值是字符串 或 Symbol的。

const EventEmitter = require('events');
const myEE = new EventEmitter();
myEE.on('foo', () => {});
myEE.on('bar', () => {});
const sym = Symbol('symbol');
myEE.on(sym, () => {});
console.log(myEE.eventNames());
// Prints: [ 'foo', 'bar', Symbol(symbol) ]
eventNames(): Array<string | symbol>;
写文章需要花很多时间,有些内容买一堆书都找不到解决办法,发布出来免费共享知识已经很不错了。少数盗用者不但出处不写,有些无聊到甚至把写好的出处扣掉的再拿去搞到自己网站上搞个付费阅读,还有什么加微信读原文的,有没有一点底线。人出来混讲点武德好不好。

附录


F1. 第2小结代码的 其它语言 版本

Typescript

class Publisher {
    private _subscribers: Set<Subscriber>;
    private _name: string;
    constructor(name: string) {
        this._subscribers = new Set() // 用于登记(保存)订阅者 的容器
        this._name = name         // 发布者的名字其实没有实质作用,只是为了后面打印文章
    }
    public get name(): string {
        return this._name;
    }
    /**用于发布者接受订阅者的订阅,即添加订阅者 */
    public add_subscriber(subscriber: Subscriber) {
        this._subscribers.add(subscriber)
    }
    /**取消订阅,即把一个已经登记在册的订阅记录从登记容器中进行删除 */
    public remove_subscriber(subscriber: Subscriber) {
        this._subscribers.delete(subscriber)
        console.log(`\n=>${subscriber.name} 已取消订阅。\n`)
    }
    /**通知所有订息者 */
    public notify_all(arg: string) {
        this._subscribers.forEach((subscriber) => {
            subscriber.notify(this, arg)
        })
    }
}
class Subscriber {
    private _name: string
    constructor(name: string) {
        this._name = name
    }
    public get name(): string {
        return this._name
    }
    /**用于被发布者通知的接口 */
    public notify(publisher: Publisher, arg: string) {
        console.log(`${this._name}(订阅者) 收到的通知来自 ${publisher.name}(发布者)的通知: ${arg}`)
    }
}
const publisher = new Publisher("政务服务中心")
const jackLee = new Subscriber("jackLee")
const jackMa = new Subscriber("jackMa")
publisher.add_subscriber(jackLee)
publisher.add_subscriber(jackMa)
console.log('------- 第一次发布消息 -------')
publisher.notify_all("[通知] 恢复证件办理!")
// 用户 jackMa 取消订阅
publisher.remove_subscriber(jackMa)
console.log('------- 第二次发布消息 -------')
publisher.notify_all("[通知] 恢复证件办理!")

F2. 通用发布者对象实现的完整实现

typescript 版本

export class EventEmitter {
  private _events: Map<string | symbol, Function[]>;
  private _maxListeners: number;
  constructor() {
    this._events = new Map();
    this._maxListeners = 10;
  }
  /**
   * 添加 listener
   * @param event 
   * @param listener 
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  addListener(event: string | symbol, listener: Function): this {
    if (!this._events.has(event)) {
      this._events.set(event, []);
    }
    this._events.get(event)?.push(listener);
    if (this._events.get(event)?.length === this._maxListeners) {
      // console.warn(`Max listeners (${this._maxListeners}) reached for event ${String(event)}`);
      throw Error('Max listeners exceeded')
    }
    return this
  }
  /**
   * 移除指定的 listener
   * @param event 
   * @param listener 
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  removeListener(event: string | symbol, listener: Function): this {
    if (!this._events.has(event)) {
      return this;
    }
    const listeners = this._events.get(event);
    const index = listeners?.indexOf(listener);
    if (index !== undefined && index !== -1) {
      listeners?.splice(index, 1);
    }
    return this
  }
  /**
   * 移除所有的 Listener
   * @param event 事件名
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  removeAllListeners(event?: string | symbol): this {
    if (event) {
      this._events.delete(event);
    } else {
      this._events.clear();
    }
    return this
  }
  /**
   * 设定最大 Listener 数量
   * @param n 要设置的最大监听器数量
   */
  set maxListeners(n: number) {
    this._maxListeners = n;
  }
  /**
   * 获取最大 Listener 数量
   */
  get maxListeners(): number {
    return this._maxListeners;
  }
  /**
   * 设定最大 Listener 数量
   * @param n 要设置的最大监听器数量
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  setmaxListeners(n: number) {
    this._maxListeners = n;
    return this
  }
  /**
   * 获取指定事件的监听器数组
   * @param event 一个表示事件(key)的字符串
   * @returns 事件回调数组
   */
  listeners(event: string | symbol): Function[] {
    let _: Function[] | undefined = this._events.get(event);
    if (_) {
      return [..._]
    }
    return [];
  }
  /**
   * 返回名为 event 的事件的 侦听器数组
   * @param event  事件名
   * @returns 事件回调数组
   */
  rawListeners(event: string | symbol) {
    return this._events.get(event) || [];
  }
  /**
   * 发出具有指定参数的事件
   * 
   * - 若指定事件不存在,则返回 flase
   * - 反之,则执行指定事件的所有回调,并返回true
   * 
   * @param event 事件名字符串(key)
   * @param args 事件回调的参数
   * @returns 事件是否存在
   */
  emit(event: string | symbol, ...args: any[]): boolean {
    // 索引对应于给定事件的监听器数组
    const listeners = this._events.get(event);
    // 若事件不存在直接返回 false
    if (!listeners) {
      return false;
    }
    // 执行给定事件的所有监听器函数(回调)
    listeners.forEach((listener) => {
      listener(...args);
    });
    return true;
  }
  /**
   * 返回指定事件的监听器(回调)数量
   * 
   * - 当指定的事件不存在时返回值为 0
   * 
   * @param event 事件名字符串(key)
   * @returns 指定事件的现有监听器的数量
   */
  listenerCount(event: string | symbol): number {
    const listeners: Function[] | undefined = this._events.get(event);
    return listeners?.length || 0;
  }
  /**
   * 将 `listener` 函数添加到名为 `event` 的事件的 listeners 数组的开头。
   * - 不检查是否已经添加了 `listener`。
   * - 多次调用传递“事件名称”和 `listener` 的相同组合将导致多次添加和调用 `listener`。
   * 
   * @param event 事件名
   * @param listener 事件回调函数
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  prependListener(event: string | symbol, listener: Function): this {
    if (!this._events.has(event)) {
      this._events.set(event, []);
    }
    this._events.get(event)?.unshift(listener);
    if (this._events.get(event)?.length === this._maxListeners) {
      console.warn(`Max listeners (${this._maxListeners}) reached for event ${String(event)}`);
    }
    return this;
  }
  /**
   * 将名为 `event` 的事件的 **一次性** 的 `listener` 函数添加到 listeners 数组的`_beginning_ ` 中。
   * 下次 `event` 事件被触发时,该 侦听器 被移除,然后被调用。
   * @param event 
   * @param listener 监听器回调函数
   */
  prependOnceListener(event: string | symbol, listener: Function) {
    // 创建一个新函数,在被调用后移除自身
    const _onceListener = (...args: any[]) => {
      this.removeListener(event, _onceListener);
      listener(...args);
    };
    this.prependListener(event, _onceListener);
    return this
  }
  /**
   * 为名为 event 的事件添加 一次性 的 listener 函数。
   * 下次 event 被触发时,该 listener 被移除,然后被调用。
   * @param event 事件名字符串
   * @param listener 事件回调函数
   * @returns 返回对 `EventEmitter` 的引用,以便可以**链式调用**。
   */
  once(event: string | symbol, listener: Function) {
    // 创建一个新函数,在被调用后移除自身
    const _onceListener = (...args: any[]) => {
      this.removeListener(event, _onceListener);
      listener(...args);
    };
    this.addListener(event, _onceListener);
    return this;
  }
}

例子:

const myEmitter = new EventEmitter();
// --------------- addListener 例子 ---------------
console.log('--------------- addListener 例子 ---------------');
// 添加一个事件监听器
myEmitter.addListener('eventName', () => {
  console.log('触发了一个名为 eventName 事件!');
});
// 触发事件
myEmitter.emit('eventName');
// --------------- once 例子 ---------------
console.log('--------------- once 例子 ---------------')
// 添加一个一次性事件监听器
myEmitter.once('onceEvent', () => {
  console.log('这个事件只会触发一次!');
});
// 触发一次性事件
myEmitter.emit('onceEvent');
// 再次触发一次性事件,这个监听器不会再次被调用
myEmitter.emit('onceEvent');
console.log('--------------- removeListener 例子 ---------------')
// 移除事件监听器
const listener = () => {
  console.log('这个监听器将被移除');
};
myEmitter.addListener('removeListenerEvent', listener);
myEmitter.removeListener('removeListenerEvent', listener);
// 移除所有事件监听器
myEmitter.removeAllListeners();

输出结果如下:

--------------- addListener 例子 ---------------
触发了一个名为 eventName 事件!
--------------- once 例子 ---------------
这个事件只会触发一次!
--------------- removeListener 例子 ---------------

python 语言版本

class EventEmitter:
    _event={}
    _max_listeners = 10
    def __init__(self):
        self._events = {}
    def add_listener(self, event, listener):
        """添加 listener
        """
        if event not in self._events:
            self._events[event] = []
        self._events[event].append(listener)
    def remove_listener(self, event, listener):
        """移除指定的 listener
        """
        if event in self._events:
            self._events[event].remove(listener)
    def remove_all_listeners(self, event):
        """移除所有的 Listener
        """
        if event in self._events:
            self._events[event] = []
    def once(self, event, listener):
        """为名为 event 的事件添加 一次性 的 listener 函数。
        """
        def _once_listener(*args, **kwargs):
            self.remove_listener(event, _once_listener)
            return listener(*args, **kwargs)
        self.add_listener(event, _once_listener)
    def listener_count(self, event):
        """返回指定事件的监听器数量
        """
        if event in self._events:
            return len(self._events[event])
        return 0
    def get_event_listeners(self, event):
        """获取指定事件的监听器数组
        """
        if event in self._events:
            return self._events[event]
        return []
    def listeners(self, event):
        """获取指定事件的监听器数组
        get_event_listeners 的同名方法
        """
        return self.get_event_listeners(event)
    def set_max_listeners(self, n):
        """设定最大 Listener 数量
        """
        self._max_listeners = n
    def get_max_listeners(self):
        """获取最大 Listener 数量
        """
        return self._max_listeners
    def emit(self, event, *args, **kwargs):
        """触发指定的事件
        """
        if event in self._events:
            for listener in self._events[event]:
                listener(*args, **kwargs)
    def prepend_listener(self, event, listener):
        """往前插入事件监听器
        """
        if event not in self._events:
            self._events[event] = []
        self._events[event].insert(0, listener)
    def prepend_once_listener(self, event, listener):
        """往前插入一次性事件监听器
        """
        def _once_listener(*args, **kwargs):
            self.remove_listener(event, _once_listener)
            return listener(*args, **kwargs)
        self.prepend_listener(event, _once_listener)
    def event_names(self):
        """获取所有事件名
        """
        return list(self._events.keys())

用法举例:

event_emitter = EventEmitter()
# 定义一些监听器函数
def listener_1():print("执行 listener_1")
def listener_2():print("执行 listener_2")
def listener_3():print("执行 listener_3")
def listener_4():print("执行 listener_4")
print('-------------- addListener、emit、removeListener --------------')
event_emitter.add_listener("my_event_1", listener_1)          # 将侦听器添加到 "my_event_1" 事件中
event_emitter.add_listener("my_event_1", listener_2)
event_emitter.emit("my_event_1")                              # 触发 "my_event_1" 事件
event_emitter.remove_listener("my_event_1", listener_1)       # 从 "my_event_1" 事件中删除监听器 listener_1
event_emitter.emit("my_event_1")                              # 再次触发 "my_event_1" 事件
print(event_emitter._events)
print('-------------- once、listenerCount、getEventListeners --------------')
event_emitter.once("my_event_2", listener_3)                  # 为 "my_event_1" 事件添加一次性监听器
num_listeners = event_emitter.listener_count("my_event_2")    # 获取 "my_event" 事件的侦听器数量
listeners = event_emitter.get_event_listeners("my_event_2")   # 获取 "my_event" 事件的侦听器
event_emitter.set_max_listeners(3)                            # 设置 EventEmitter 实例的最大侦听器数量
max_listeners = event_emitter.get_max_listeners()             # 获取 EventEmitter 实例的最大侦听器数量
print(event_emitter._events)
event_emitter.remove_all_listeners("my_event_1")              # 移除所有事件的监听器
event_emitter.remove_all_listeners("my_event_2")              # 移除所有事件的监听器
print(event_emitter._events)
print('-------------- once、listenerCount、getEventListeners --------------')
event_listeners = event_emitter.listeners("my_event_3")       # 获取 "my_event" 事件的侦听器
event_emitter.prepend_listener("my_event_3", listener_3)      # 在 "my_event" 事件前添加一个侦听器
event_emitter.prepend_once_listener("my_event_3", listener_3) # 在 "my_event" 事件前添加一个一次性侦听器
event_names = event_emitter.event_names()                     # 获取所有具有侦听器的事件的名称
print('event_names:',event_names)
print(event_emitter._events)

输出结果如下:

-------------- addListener、emit、removeListener --------------
执行 listener_1
执行 listener_2
执行 listener_2
{'my_event_1': [<function listener_2 at 0x0000023CAC3D6480>]}
-------------- once、listenerCount、getEventListeners --------------
{'my_event_1': [<function listener_2 at 0x0000023CAC3D6480>], 'my_event_2': [<function EventEmitter.once.<locals>._once_listener at 0x0000023CAC3D6660>]}
{'my_event_1': [], 'my_event_2': []}
-------------- once、listenerCount、getEventListeners --------------
event_names: ['my_event_1', 'my_event_2', 'my_event_3']
{'my_event_1': [], 'my_event_2': [], 'my_event_3': [<function EventEmitter.prepend_once_listener.<locals>._once_listener at 0x0000023CAC3D6700>, <function listener_3 at 0x0000023CAC3D6520>]}

dart 语言版本

class EventEmitter {
  Map<String, List<Function>> _events = {};
  int _maxListeners = 10;
  /**
   * 向事件添加侦听器
   */
  EventEmitter addListener(String event, Function listener) {
    // 如果事件尚不存在,为其创建一个空列表
    if (!this._events.containsKey(event)) {
      this._events[event] = [];
    }
    // 将监听器添加到事件列表中
    this._events[event]?.add(listener);
    if (this._events[event]?.length == this._maxListeners) {
      throw 'Max listeners exceeded';
    }
    return this;
  }
  /**
   * 从事件中删除侦听器
   */
  EventEmitter removeListener(String event, Function listener) {
    // 如果事件存在,请从列表中删除侦听器
    if (_events.containsKey(event)) {
      this._events[event]?.remove(listener);
    }
    return this;
  }
  /**
   * 从事件中删除所有侦听器
   */
  EventEmitter removeAllListeners(String event) {
    // If the event exists, remove its list of listeners
    if (this._events.containsKey(event)) {
      this._events.remove(event);
    }
    return this;
  }
  _onceListener(String event, Function listener) {
    this.removeListener(event, _onceListener);
    return listener();
  }
  /**
   * 为名为 event 的事件添加 一次性 的 listener 函数
   */
  EventEmitter once(String event, Function listener) {
    // 创建一个新的侦听器,该侦听器在被调用后移除自身
    // 将包装侦听器添加到事件列表的末尾
    this.addListener(event, this._onceListener);
    return this;
  }
  /**
   * 获取指定事件的监听器数量
   */
  int listenerCount(String event) {
    // 如果事件存在,则返回其侦听器列表的长度
    if (this._events.containsKey(event)) {
      return this._events[event]!.length;
    }
    return 0;
  }
  /**
   * 获取指定事件的监听器数组
   */
  List<Function>? getEventListeners(String event) {
    // 如果事件存在,则返回其侦听器列表
    if (this._events.containsKey(event)) {
      return this._events[event];
    }
    // 否则,返回一个空列表
    return [];
  }
  /**
   * 获取事件侦听器数组
   */
  List<Function>? listeners(String event) {
    return this.getEventListeners(event);
  }
  /**
   * 获取事件侦听器数组的副本
   */
  List<Function> rawListeners(String event) {
    // 如果事件存在,则返回其侦听器列表的副本
    if (_events.containsKey(event)) {
      return List<Function>.from(this._events[event]!);
    }
    // 否则,返回一个空列表
    return [];
  }
  /**
   * 设置事件的最大侦听器数量
   */
  EventEmitter setMaxListeners(int maxListeners) {
    this._maxListeners = maxListeners;
    return this;
  }
  /**
   * 获取事件的最大侦听器数量
   */
  int getMaxListeners() {
    return this._maxListeners;
  }
  /**
   * 用给定的参数发出事件
   */
  EventEmitter emit(String event, [List<dynamic> args = const []]) {
    // 如果事件存在,用给定的参数调用它的所有侦听器
    if (this._events.containsKey(event)) {
      for (Function listener in this._events[event]!) {
        listener();
      }
    }
    return this;
  }
  /**
   * 将侦听器添加到事件侦听器列表的开头
   */
  EventEmitter prependListener(String event, Function listener) {
    // 如果事件尚不存在,为其创建一个空列表
    if (!_events.containsKey(event)) {
      this._events[event] = [];
    }
    if (this._events[event]?.length == this._maxListeners) {
      print('Max listeners exceeded');
    }
    // 将侦听器添加到事件列表的开头
    this._events[event]?.insert(0, listener);
    return this;
  }
  /**
   * 将一次性侦听器添加到事件侦听器列表的开头
   */
  EventEmitter prependOnceListener(String event, Function listener) {
    // 创建一个新的侦听器,该侦听器在被调用后移除自身
    // 将包装侦听器添加到事件列表的开头
    this.prependListener(event, this._onceListener);
    return this;
  }
  /**
   * 获取所有具有侦听器的事件的名称
   */
  List<String> eventNames() {
    // 返回 _events 中键的副本
    return List<String>.from(_events.keys);
  }
}

用法举例:

void main(List<String> args) {
  EventEmitter emitter = EventEmitter();
  void listener_1() => print("执行 listener_1");
  void listener_2() => print("执行 listener_2");
  void listener_3() => print("执行 listener_3");
  print('-------------- addListener、emit、removeListener --------------');
  emitter.addListener("my_event_1", listener_1);
  emitter.addListener("my_event_1", listener_2);
  emitter.emit("my_event_1");
  emitter.removeListener("my_event_1", listener_1);
  emitter.emit("my_event_1");
  print(emitter._events);
  print('-------------- once、listenerCount、getEventListeners --------------');
  emitter.once("my_event_2", listener_3); // 为 "my_event_1" 事件添加一次性监听器
  int num_listeners =
      emitter.listenerCount("my_event_2"); // 获取 "my_event" 事件的侦听器数量
  List<Function>? listeners =
      emitter.getEventListeners("my_event_2"); // 获取 "my_event" 事件的侦听器
  emitter.setMaxListeners(3); // 设置 EventEmitter 实例的最大侦听器数量
  int max_listeners = emitter.getMaxListeners(); // 获取 EventEmitter 实例的最大侦听器数量
  print(emitter._events);
  emitter.removeAllListeners("my_event_1"); // 移除所有事件的监听器
  emitter.removeAllListeners("my_event_2"); // 移除所有事件的监听器
  print(emitter._events);
  print('-------------- once、listenerCount、getEventListeners --------------');
  List<Function>? eventListeners =
      emitter.listeners("my_event_3"); // 获取 "my_event" 事件的侦听器
  emitter.prependListener("my_event_3", listener_3); // 在 "my_event" 事件前添加一个侦听器
  emitter.prependOnceListener(
      "my_event_3", listener_3); // 在 "my_event" 事件前添加一个一次性侦听器
  List<String> eventNames = emitter.eventNames(); // 获取所有具有侦听器的事件的名称
  print('event_names: ${eventNames}');
  print('emitter._events = ${emitter._events}');
}

输出结果如下:

-------------- addListener、emit、removeListener --------------
执行 listener_1
执行 listener_2
执行 listener_2
{my_event_1: [Closure: () => void]}
-------------- once、listenerCount、getEventListeners --------------
{my_event_1: [Closure: () => void], my_event_2: [Closure: (String, Function) => dynamic from Function '_onceListener@18138388':.]}
{}
-------------- once、listenerCount、getEventListeners --------------
event_names: [my_event_3]
emitter._events = {my_event_3: [Closure: (String, Function) => dynamic from Function '_onceListener@18138388':., Closure: () => void]}

powershell 语言版本

class EventEmitter {
    $listeners = @{}
    $maxListeners = 10
    # 添加 listener
    addListener([string]$event, $listener) {
        if (!$this.listeners.ContainsKey([string]$event)) {
            $this.listeners[$event] = @()
        }
        $this.listeners[$event] += $listener
    }
    # 移除指定事件的 listener
    removeListener([string]$event, $listener) {
        if ($this.listeners.ContainsKey([string]$event)) {
            $this.listeners[$event] = $this.listeners[$event] | Where-Object { $_ -ne $listener }
        }
    }
    # 移除所有的 Listener
    removeAllListeners([string]$event) {
        if ($this.listeners.ContainsKey([string]$event)) {
            $this.listeners.Remove([string]$event)
        }
    }
    # 设定最大 Listener 数量
    setMaxListeners([string]$event, $maxListeners) {
        $this.$maxListeners = $maxListeners
    }
    # 获取最大 Listener 数量
    getMaxListeners([string]$event) {
        return $this.$maxListeners
    }
    # 获取指定事件的监听器数组
    listeners([string]$event) {
        if ($this.listeners.ContainsKey([string]$event)) {
            return $this.listeners[$event]
        }
    }
    # 触发指定的事件
    emit([string]$event, $args) {
        if ($this.listeners.ContainsKey([string]$event)) {
            foreach ($listener in $this.listeners[$event]) {
                & $listener $args
            }
        }
    }
    # 返回侦听名为 event的事件的 listeners 数量。
    listenerCount([string]$event) {
        if ($this.listeners.ContainsKey([string]$event)) {
            return $this.listeners[$event].Count
        }
    }
    # 将 listener 函数从前端添加到名为 event 的事件的listeners数组。不检查是否已经添加了 listener。
    prependListener([string]$event, $listener) {
        # If the event doesn't exist in the dictionary, add it with an empty array as its value
        if (!$this.listeners.ContainsKey([string]$event)) {
            $this.listeners[$event] = @()
        }
        # Prepend the listener to the event's array
        $this.listeners[$event] = $listener + $this.listeners[$event]
    }
    # 将名为 event 的事件的一次性 的 listener 函数添加到 listeners 数组的前方。
    prependOnceListener([string]$event, $listener) {
        $onceListener = {
            & $listener $args
            $this.removeListener([string]$event, $onceListener)
        }
        $this.addListener([string]$event, $onceListener)
    }
    # 为名为 event 的事件添加 一次性 的 listener 函数。
    once([string]$event, $listener) {
        $onceListener = {
            & $listener $args
            $this.removeListener([string]$event, $onceListener)
        }
        $this.addListener([string]$event, $onceListener)
    }
}
目录
相关文章
|
4月前
|
消息中间件 存储 Cloud Native
揭秘发布订阅模式:让消息传递更高效
揭秘发布订阅模式:让消息传递更高效
揭秘发布订阅模式:让消息传递更高效
|
2月前
|
消息中间件 负载均衡 监控
【ZMQ PUB模式指南】深入探究ZeroMQ的PUB-SUB模式:C++编程实践、底层原理与最佳实践
【ZMQ PUB模式指南】深入探究ZeroMQ的PUB-SUB模式:C++编程实践、底层原理与最佳实践
194 1
|
23天前
|
消息中间件 Java BI
RabbitMQ的四种消息传递模式与演示代码
RabbitMQ的四种消息传递模式与演示代码
34 0
|
2月前
|
消息中间件 存储 监控
【ZeroMQ的SUB视角】深入探讨订阅者模式、C++编程实践与底层机制
【ZeroMQ的SUB视角】深入探讨订阅者模式、C++编程实践与底层机制
123 1
|
6月前
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
36 0
|
7月前
|
设计模式 Go
Go语言事件系统设计解析:发布-订阅模式实战
Go语言事件系统设计解析:发布-订阅模式实战
175 0
|
12月前
|
设计模式 资源调度 Dart
发布订阅模式原理及其应用
本文介绍发布订阅模式原理及其应用
127 0
|
12月前
|
消息中间件 存储 缓存
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
|
设计模式 前端开发 调度
简化理解:发布订阅
简化理解:发布订阅
|
前端开发
前端学习案例2-发布订阅者模式2
前端学习案例2-发布订阅者模式2
55 0
前端学习案例2-发布订阅者模式2