异步编程
函数式编程
- 高阶函数
// 把函数作为参数,或是将函数作为返回值的函数
function foo(x) {
return function () {
return x;
};
}
- 偏函数用法
// 创建一个调用另外一个部分——参数或变量已经预置的函数——的函数的用法
var toString = Object.prototype.toString;\
var isString = function (obj) {
return toString.call(obj) === '[object String]';
}
var isFunction = function (obj) {
return toString.call(obj) === '[object Function]';
}
// 通过指定部分参数来产生一个新的定制函数的形式就是偏函数
var isType = function (type) {
return function (obj) {
return toString.call(obj) === '[object ' + type + ']';
};
};
var isString = isType('String');
var isFunction = isType('Function');
异步编程的优势与难点
优势
- 基于事件驱动的非阻塞 I/O 模型可以使 CPU 与 I/O 并不相互依赖等待,让资源得到更好的利用
- 对于网络应用而言,并行带来想象空间更大,延展而开是分布式和云,并行使得各个单点之间能够更有效地组织起来
难点
- 异常处理
异步 I/O 的实现主要包含两个阶段:提交请求和处理结果。两个阶段中间有事件循环的调度,两者彼此不关联。异步方法通常在第一个阶段提交请求后立即返回,因为异常不一定在这个阶段发生,所以 try/catch 的功效在此不会发挥任何作用。
// Node 在处理异常上形成了一种约定,将异常作为回调函数的第一个实参传回,如果为空值,则表明异步调用没有异常 // async(function (err, results) { // // TODO //}); var async = function (callback) { process.nextTick(function () { var results = something; if (error) { return callback(error); } callback(null, results); }); };
- 函数嵌套 (回调地狱)
- 阻塞代码
var start = new Date(); while (new Date() - start < 1000) { // TODO } // 需要阻塞的代码 // 这段代码会持续占用 CPU 进行判断,与真正的线程睡眠相去甚远,破坏了事件循环的调度 // 由于 Node 单线程的原因,CPU 资源全都会用于为这段代码服务,导致其余任何请求都会得不到响应 // 遇到这样的需求时,在统一规划业务逻辑之后,调用 setTimeout() 效果会更好
- 多线程编程
child_process
- 异步转同步
异步编程解决方案
事件发布/订阅模式
事件监听器模式是一种广泛用于异步编程的模式,是回调函数的事件化,又称发布/订阅模式
// 订阅 emitter.on('event1', function(message) { console.log(message); }); // 发布 emitter.emit('event1', 'Hello World');
Node 对事件发布/订阅做了一些额外处理
- 如果对一个事件添加了超过 10 个侦听器,将会得到一条警告,防止内存泄露和过多占用 CPU
- 如果运行期间的错误触发了 error 事件,EventEmitter 会检查是否有对 error 事件添加过侦听器,如果添加就交给侦听器处理,否则该错误作为异常抛出,若没有捕获异常会引起线程退出
- 继承 events 模块
var events = require('events'); function Stream() { events.EventEmitter.call(this); } util.inherits(Stream, events.EventEmitter);
- 利用事件队列解决雪崩问题
var proxy = new events.EventEmitter(); var status = 'ready'; var select = function (callback) { proxy.once('selected', callback); if (status === 'ready') { status = 'pending'; db.select('SQL', function(results) { proxy.emit('selected', results); status = 'ready'; }); } };
- 多异步之间的协作方案
// 案例 var count = 0; var results = {}; var done = function (key, value) { results[key] = value; count++; if (count === 3) { // 渲染页面 render(results); } }; fs.readFile(template_path, 'utf-8', function(err, template) { done('template', template); }); db.query(sql, function(err, data) { done('data', data); }); l10n.get(function(err, resources) { done('resources', resources); }); // 利用偏函数来处理哨兵变量和第三方函数的关系 多对一 var after = function (times, callback) { var count = 0, results = {}; return function (key, value) { results[key] = value; count++; if (count === times) { callback(results); } }; }; var done = after(times, render); // 利用发布/订阅完成多对多 var emitter = new events.Emitter(); var done = after(times, render); emitter.on('done', done); emitter.on('done', other); fs.readFile(template_path, 'utf-8', function(err, template) { emitter.emit('done', 'template', template); }); db.query(sql, function(err, data) { emitter.emit('done', 'data', data); }); l10n.get(function(err, resources) { emitter.emit('done', 'resources', resources); });
Promise / Deferred 模式
Promises/A
- Promise 操作只会处在 3 种状态中的一种:未完成态、完成态和失败态
- Promise 状态只会出现从未完成到完成或失败转化,不能逆反,完成态和失败态不能互相转化
- Promise 的状态一旦转化,将不能被更改
then() 方法
- 接受完成态、错误态的回调方法,在操作完成或出错时,会调用对应方法
- 可选地支持 progress 事件回调作为第三个方法
- then() 方法只接受 function 对象,其余对象将被忽略
- then() 方法继续返回 Promise 对象,以实现链式调用
// then(fulfilledHandler, errorHandler, progressHandler) var Promise = function () { EventEmitter.call(this); }; util.inherits(Promise, EventEmitter); Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) { if (typeof fulfilledHandler === 'function') { // 利用 once() 方法,保证成功回调只执行一次 this.once('success', fulfilledHandler); } if (typeof errorHandler === 'function') { this.once('error', errorHandler); } if (typeof progressHandler === 'function') { this.on('progress', progressHandler); } return this; }; // 触发执行这些回调函数的对象 var Deferred = function () { this.state = 'unfulfilled'; this.promise = new Promise(); }; Deferred.prototype.resolve = function (obj) { this.state = 'fulfilled'; this.promise.emit('success', obj); }; Deferred.prototype.reject = function (err) { this.state = 'failed'; this.promise.emit('error', err); } Deferred.prototype.process = function (data) { this.promise.emit('process', data); } // 改造 var promisify = function (result) { var deferred = new Deferred(); var result = ''; res.on('data', function (chunk) { result += chunk; deferred.progress(chunk); }); res.on('end', function () { promise.resolve(result); }); res.on('error', function (err) { promise.reject(err); }); return deferred.promise; }; promisify(res).then(function () { // done }, function (err) { // error }, function (chunk) { // progress });
- Promise 中的多异步协作
Deferred.prototype.all = function (promises) { var count = promises.length; var that = this; var results = []; promises.forEach(function (promise, i) { promise.then(function (data) { count--; results[i] = data; if (count == 0) { that.resolve(results); } }, function (err) { that.reject(err); }); }); return this.promise; };
- 支持序列执行的 Promise
var Deferred = function () { this.promise = new Promise(); }; // 完成态 Deferred.prototype.resolve = function (obj) { var promise = this.promise; var handler; while ((handler = promise.queue.shift())) { if (handler && handler.fulfilled) { var ret = handler.fulfilled(obj); if (ret && ret.isPromise) { ret.queue = promise.queue; this.promise = ret; return; } } } } // 失败态 Deferred.prototype.reject = function (err) { var promise = this.promise; var handler; while ((handler = promise.queue.shift())) { if (handler && handler.error) { var ret = handler.error(err); if (ret && ret.isPromise) { ret.queue = promise.queue; this.promise = ret; return; } } } }; // 生成回调函数 Deferred.prototype.callback = function () { var that = this; return function(err, file) { if (err) { return that.reject(err); } that.resolve(file); }; }; var Promise = function () { this.queue = []; // 队列用于存储待执行的回调函数 this.isPromise = true; }; Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) { var handler = {}; if (typeof fulfilledHandler === 'function') { handler.fulfilled = fulfilledHandler; } if (typeof errorHandler === 'function') { handler.error = errorHandler; } this.queue.push(handler); return this; };