阅读node源码后,如何写出让面试官满意的发布订阅模式?

简介: 阅读node源码后,如何写出让面试官满意的发布订阅模式?

前言


什么是发布订阅模式呐? 基于一个事件(主题)通道,希望接收通知的对象 Subscriber 通过自定义事件订阅主题,被激活事件的对象 Publisher 通过发布主题事件的方式通知各个订阅该主题的 Subscriber 对象。


举个通俗的栗子——追剧。某平台上线了一部小包特别喜欢的电视剧,每时每刻都想看到最新进度,但打工人小包还是非常繁忙的,总不能每时每刻刷新平台吧。平台发现了这个问题,提供了订阅功能,小包选择订阅该电视剧,更新后,平台便会第一时间发送消息通知小包。小包便可以愉快的追剧了。


上述案例中,电视剧就是发布者 Publisher ,小包就是订阅者 Subscriber ,平台则承担了事件通道(Event Channel) 中介作用。


前几个月,小包写了一篇 观察者模式 vs 发布订阅模式,千万不要再混淆了 ,通过武侠的角度讲解了观察者模式与发布订阅模式的区别,衍化的方式有可能增加了某些方面的理解成本,文章也引起了部分争议,小包感觉当初的发布订阅模式代码实现也并不完美。


恰巧小包最近在学习 nodejsnodejs 提供了 event.EventEmitter 模块,该模块的核心就是事件触发与事件监听器功能的封装。基于 EventEmitter 模块可以比较便捷的实现发布订阅模式,因此小包决定吸收 EventEmitter 的源码精髓,完善发布订阅模式。


学习本文,你能收获:


  • 🌟 掌握发布订阅模式
  • 🌟 了解 NodeEventEmitter 的实现及使用
  • 🌟 掌握手写发布订阅模式


EventEmitter


首先小包就带大家阅读一下 EventEmitter 的源码,源码内容非常多,小包本文只讲解有关于发布订阅部分的代码。


init 方法


发布订阅模式中有三大对象,事件(主题)通道负责维护某一事件下的处理函数队列。因此我们首先需要维护一个事件通道,将其定义在构造函数中。


// 事件通道的存储格式
const EventChannel = {
  event1: [func1, func2],
  event2: [func3, func4],
};
复制代码


EventEmitter 使用 EventEmitter.init 方法初始化事件通道属性,可以发现 init 方法中并没有直接将 _events 初始化为 {},而是初始化为 ObjectCreate(null) —— Object.create


那为什么会这样实现呐?Object.create(null) 创建的空对象没有原型方法,是纯粹的对象,可以避免原型的污染。而对象字面量 {} 创造的空对象和 new Object() 方式是相同的,可以继承 Object 对象的属性。


function EventEmitter(opts) {
  EventEmitter.init.call(this, opts);
}
EventEmitter.init = function (opts) {
  if (
    this._events === undefined ||
    this._events === ObjectGetPrototypeOf(this)._events
  ) {
    this._events = ObjectCreate(null);
    this._eventsCount = 0;
  }
};
复制代码


addListener/on


addListener/on 方法是为指定事件注册一个监听器,接受一个字符串 event 和一个回调函数。


很有意思的是,EventEmitter 提供了两对实现订阅和取消订阅的方法:


addListener/onremoveListener/off 。在学习该模块时,小包还特地纠结了一下,但是阅读到源码时,一切豁然开朗:这两对方法本质都是相同的。


EventEmitter.prototype.on = EventEmitter.prototype.addListener;
EventEmitter.prototype.off = EventEmitter.prototype.removeListener;
复制代码


on 方法在内部是基于 _addListener 方法,因此小包主要解读 _addListener 方法,下面先来铺垫一下源码中出现的 newListenerprepend 属性。


知识 1:newListener 事件


newListenernodejs 中人为规定的事件,该事件在添加新监视器时被触发。使用方式与普通绑定监视器相同,只不过监视名强制设定为 newListener


var events = require("events");
var eventEmitter = new events.EventEmitter();
eventEmitter.on("newListener", () => {
  console.log("绑定了新事件");
});
eventEmitter.on("click", () => {
  console.log("click");
});
// 输出结果:绑定了新事件
复制代码


知识 2:prepend 属性


prepend 中文含义是预置或者前置,该属性控制的是同一事件不同处理函数的顺序问题。我们来举个栗子:(该属性并没有暴露给外部使用)


// prepend 为 false
event.on("click", fn1);
event.on("click", fn2);
event.on("click", fn3);
// 那么此时事件通道中 click 事件的三个处理函数应该是自上往下的
{
  click: [fn1, fn2, fn3];
}
复制代码


// prepend 为 true
// 这里只是为了举栗子
event.on("click", fn1, true);
event.on("click", fn2, true);
event.on("click", fn3, true);
// 那么此时事件通道中 click 事件的三个处理函数应该是自上往下的
{
  click: [fn3, fn2, fn1];
}
复制代码


下面来解读源码:


Step1: 获取事件通道及待注册事件的监听器


events = target._events;
// 判断事件通道是否存在
if (events === undefined) {
  events = target._events = ObjectCreate(null);
} else {
  // 如果已经注册了 newListener 事件,后续注册事件前都会触发 newListener 事件
  if (events.newListener !== undefined) {
    target.emit(
      "newListener",
      type,
      // 这里等到 once 部分做详解
      listener.listener ? listener.listener : listener
    );
    events = target._events;
  }
  // 获取该事件的监听器
  existing = events[type];
}
复制代码


Step2: 给该事件添加新的监听器


// 此前未有该事件的订阅出现
if (existing === undefined) {
  // 源码认为如果只有一个处理函数,没有必要声明数组
  events[type] = listener;
} else {
  if (typeof existing === "function") {
    // 将新处理函数压入到数组中
    // prepend 决定压入顺序
    existing = events[type] = prepend
      ? [listener, existing]
      : [existing, listener];
  } else if (prepend) {
    existing.unshift(listener);
  } else {
    existing.push(listener);
  }
}
复制代码


removeListener/off


removeListener/off 是移除指定事件的某个监听器,监听器必须是该事件已经注册过的监听器。


newListener 事件相对应,nodejs 也设置了 removeListener 事件,当删除监听器时触发该事件。


移除监听器的代码比较简单,我们直接在源码上进行注释讲解。


EventEmitter.prototype.removeListener = function removeListener(
  type,
  listener
) {
  const events = this._events;
  // 没有事件通道
  if (events === undefined) return this;
  const list = events[type];
  // 该事件未注册处理函数
  if (list === undefined) return this;
  // 当前事件只有一个监听器
  // 这里处理了两种情况,on 注册监听器的删除及 once 注册监听器的删除,once 处会详细讲到
  if (list === listener || list.listener === listener) {
    delete events[type];
    // 触发 removeListener 事件
    if (events.removeListener)
      this.emit("removeListener", type, list.listener || listener);
    // 从数组中删除监听器
  } else if (typeof list !== "function") {
    for (let i = list.length - 1; i >= 0; i--) {
      if (list[i] === listener || list[i].listener === listener) {
        position = i;
        break;
      }
    }
    if (position < 0) return this;
    if (position === 0) list.shift();
    else {
      if (spliceOne === undefined)
        spliceOne = require("internal/util").spliceOne;
      spliceOne(list, position);
    }
    // 如果只有一个监听器,无需使用数组存储
    if (list.length === 1) events[type] = list[0];
    if (events.removeListener !== undefined)
      this.emit("removeListener", type, listener);
  }
  return this;
};
复制代码


once


once 为指定事件注册一个单次监听器,即监听器最多只会触发一次,触发后立刻解除该监听器。


once 处有个坑,我们需要注意,once 执行一次后会解除监听器,但我们同样可以在 once 的事件执行前解除此监听器,因此 once 处我们要处理两种情况。


Case1:监听器执行完毕后解除


once 方法与 on 方法的区别在于 once 只执行监听器一次然后移除,因此我们设计 once 时候可以借用 on 方法,传入一个包含监听器方法及移除该监听器的包裹函数 wrapFn


eventEmitter.on(event, (...args) => {
  listener(...args);
  eventEmitter.off(event, listener);
});
复制代码


Case2:调用 removeListener/off 方法解除监听器


如果直接调用 removeListener/off 移除监听器,则与 on 方法添加的监听器移除是类似的,但 Case1 中,我们监听的是当前监听器与移除监听器的包裹函数 wrapFn ,调用 removeListener/off 移除方法时,我们传入的是 listener 监听器方法,所以无法删除成功。


因此我们为了适应这种情况,给包裹函数 wrapFn 身上挂载一个标识,标识值是监听器(wrapFn.listener = listener)。因此我们在调用移除方法时,同时判断 listenerlistener.listener 即可。


once 方法源码:


EventEmitter.prototype.once = function once(type, listener) {
  checkListener(listener);
  // 调用了 _onceWrap 方法,这里实现了上面的包裹功能
  this.on(type, _onceWrap(this, type, listener));
  return this;
};
复制代码


function _onceWrap(target, type, listener) {
  const state = { fired: false, wrapFn: undefined, target, type, listener };
  const wrapped = onceWrapper.bind(state);
  // Case2: 调用 off 方法移除监听器,在包裹函数上挂载listener
  wrapped.listener = listener;
  state.wrapFn = wrapped;
  return wrapped;
}
复制代码


function onceWrapper() {
  if (!this.fired) {
    // Case1 监听器执行与监听器移除
    this.target.removeListener(this.type, this.wrapFn);
    this.fired = true;
    if (arguments.length === 0) return this.listener.call(this.target);
    return this.listener.apply(this.target, arguments);
  }
}
复制代码


emit 方法


emit 方法按监听器的顺序执行执行每个监听器,如果事件有注册监听返回 true,否则返回 false


emit 方法实现起来比较简单,获取对应事件的监听器,传入参数执行即可。


EventEmitter.prototype.emit = function emit(type, ...args) {
  const events = this._events;
  if (events !== undefined) {
  // 获取监听器,监听器有三种情况: 1.没有(返回false) 2.只有一个(函数形式) 3.多个(数组形式)
  const handler = events[type];
  // Case1 没有值
  if (handler === undefined) return false;
  // Case2 函数形式
  if (typeof handler === "function") {
    const result = ReflectApply(handler, this, args);
  } else { // Case3 数组形式
    const len = handler.length;
    const listeners = arrayClone(handler);
    for (let i = 0; i < len; ++i) {
      const result = ReflectApply(listeners[i], this, args);
    }
  }
  return true;
};
复制代码


源码收获


上面小包带领大家阅读了 Nodejs EventEmitter 模块的部分源码,我们能从中学到那些东西来完善我们的发布订阅模式呐?


  1. 初始值使用 Object.create(null) 可以避免原型污染
  2. 事件只存在一个监听器时,无需使用数组
  3. once 方法的两种情况处理
  4. off 方法边界情况的处理及两种删除情况的处理


发布订阅实现


有了阅读源码的基础,我们就可以来实现完善的发布订阅模式。


EventEmitter 构造函数


function EventEmitter() {
  this._events = Object.create(null);
}
复制代码


on 方法


EventEmitter.prototype.on = function (type, listener) {
  // 获取事件通道
  let events = this._events;
  if (events === undefined) {
    events = this._events = Object.create(null);
  }
  // 判断是否监听了 newListener 事件,如果监听则执行 newListener 的回调函数
  if (type !== "newListener") {
    if (events.newListener) {
      ethis.emit("newListener", type);
    }
  }
  // 对于单个监听器是否使用数组小包认为影响不大,因此小包继续使用数组
  if (!events[type]) {
    events[type] = [listener];
  } else {
    events[type].push(listener);
  }
};
复制代码


off 方法


off 方法我们要处理好 on 注册监听器的移除及 once 注册监听器的移除,同时做好边界情况处理。


EventEmitter.prototype.off = function (type, listener) {
  const events = this._events;
  // 边界情况
  if (events === undefined) {
    return this;
  }
  const listenerList = events[type];
  if (listenerList === undefined) {
    return this;
  }
  // 处理两种情况
  events[type] = events[type].filter((fn) => {
    return fn !== listener && fn.listener !== listener;
  });
};
复制代码


once 方法


源码处我们讲过,once 要处理两种情况。


EventEmitter.prototype.once = function (type, listener) {
  // 监听器执行后移除
  const onceApply = (...args) => {
    listener.call(this, ...args);
    this.off(type, listener);
  };
  // 绑定标识,标识为 listener
  onceApply.listener = listener;
  // 注册监听器
  this.on(type, onceApply);
};
复制代码


emit 方法


EventEmitter.prototype.emit = function (type, ...args) {
  const events = this._events[type];
  // 边界情况处理
  if (events === undefined) {
    return false;
  }
  const handler = events[type];
  if (handler === undefined) {
    return false;
  }
  // 执行 emit 事件对应的监听器
  handler.forEach((fn) => {
    fn.call(this, ...args);
  });
  return true;
};




相关文章
|
1天前
|
JavaScript 前端开发 算法
【面试题】 用vue想要拿20k,面试题要这样回答(源码版)
【面试题】 用vue想要拿20k,面试题要这样回答(源码版)
|
1天前
|
JavaScript 前端开发 算法
用vue想要拿20k,面试题要这样回答(源码版)
用vue想要拿20k,面试题要这样回答(源码版)
|
1天前
|
存储 安全 Java
面试题:用过ThreadLocal吗?ThreadLocal是在哪个包下的?看过ThreadLocal源码吗?讲一下ThreadLocal的get和put是怎么实现的?
字节面试题:用过ThreadLocal吗?ThreadLocal是在哪个包下的?看过ThreadLocal源码吗?讲一下ThreadLocal的get和put是怎么实现的?
37 0
|
1天前
|
存储 Java 中间件
《吊打面试官系列》从源码全面解析 ThreadLocal 关键字的来龙去脉
《吊打面试官系列》从源码全面解析 ThreadLocal 关键字的来龙去脉
|
1天前
|
监控 Java 应用服务中间件
Spring Boot 源码面试知识点
【5月更文挑战第12天】Spring Boot 是一个强大且广泛使用的框架,旨在简化 Spring 应用程序的开发过程。深入了解 Spring Boot 的源码,有助于开发者更好地使用和定制这个框架。以下是一些关键的知识点:
20 6
|
1天前
|
存储 NoSQL Redis
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群(下)
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群
10 1
|
1天前
|
监控 NoSQL Redis
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群(上)
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群
25 0
|
1天前
|
存储 NoSQL 调度
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(下)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
8 0
|
1天前
|
NoSQL 安全 Unix
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(中)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
15 0
|
1天前
|
存储 NoSQL API
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(上)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
15 1