ZEROMAKE | keep codeing and thinking!
2017-07-010 | source

promise-and-co-make

前言

  1. 上篇博客写着写着没动力,然后就拖了一个月。
  2. 现在打算在一周内完成。
  3. 这篇讲Promiseco的原理+实现。

一、Promise 的原理

Promise 的规范有很多,其中ECMAScript 6采用的是Promises/A+.
想要了解更多最好仔细读完Promises/A+,顺便说下Promise是依赖于异步实现。

JavaScript 中的异步队列

而在JavaScript中有两种异步宏任务macro-task和微任务micro-task.

在挂起任务时,JS 引擎会将所有任务按照类别分到这两个队列中,首先在 macrotask 的队列(这个队列也被叫做 task queue)中取出第一个任务,执行完毕后取出 microtask 队列中的所有任务顺序执行;之后再取 macrotask 任务,周而复始,直至两个队列的任务都取完。

常见的异步代码实现

  • macro-task: script(整体代码), setTimeout, setInterval, setImmediate, I/O, UI rendering
  • micro-task: process.nextTick, Promises(原生 Promise), Object.observe(api 已废弃), MutationObserver

以上的知识摘查于Promises/A+
顺便说下一个前段时间看到的一个js面试题

1
setTimeout(function() {
2
console.log(1);
3
}, 0);
4
new Promise(function executor(resolve) {
5
console.log(2);
6
for (var i = 0; i < 10000; i++) {
7
i == 9999 && resolve();
8
}
9
console.log(3);
10
}).then(function() {
11
console.log(4);
12
});
13
console.log(5);

在 node v8.13 使用原生的Promise是: “2 3 5 4 1”。
但是如果使用bluebird的第三方Promise就是: “2 3 5 1 4”。
这个原因是因为bluebird在这个环境下优先使用setImmediate代码
然后再看上面的代码执行顺序.

  1. 第一次整体代码进入macro-taskmicro-task为空。
  2. macro-task执行整体代码,setTimeout加入下一次的macro-taskPromise执行打出2 3then加入micro-task, 最后打出5
  3. micro-task执行then被执行所以打出4
  4. 重新执行macro-task所以打出1

但是在bluebird里的then使用setImmediate所以上面的步骤会变成:

  • 步骤 2thensetTimeout后加入macro-task
  • 步骤 3 会因为micro-task为空跳过。
  • 步骤 4 执行setTimeoutthen打出1 4

Promise 执行流程

  1. new Promise(func:(resolve, reject)=> void 0), 这里的 func 方法被同步执行。
  2. Promise 会有三种状态PENDING(执行)FULFILLED(执行成功),REJECTED(执行失败)
  3. resolvereject均未调用且未发生异常时状态为PENDING
  4. resolve调用为FULFILLED,reject调用或者发生异常为REJECTED
  5. 在给Promise实例调用then(callFulfilled, callRejected)来设置回调,状态不为PENDING时会根据状态调用callFulfilledcallRejected
  6. then需要返回一个新的Promise实例.
  7. 状态为PENDING则会把callFulfilledcallRejected放入当前Promise实例的回调队列中,队列还会存储新的Promise实例。
  8. 在状态改变为FULFILLEDREJECTED时会回调当前Promise实例的队列。

二、Promise 的简易实现

  • Promise Api

下面是Promise的所有开放 api 这里为了区分与原生的所以类名叫Appoint
顺便为了学习typescript

1
class Appoint {
2
public constructor(resolver: Function) {}
3
public then(onFulfilled, onRejected): Appoint {}
4
public catch(onRejected): Appoint {}
5
public static resolve(value): Appoint {}
6
public static reject(error): Appoint {}
7
public static all(iterable: Appoint[]): Appoint {}
8
public static race(iterable): Appoint {}
9
}

Appoint

1
function INTERNAL() {}
2
enum AppointState {
3
PENDING,
4
FULFILLED,
5
REJECTED
6
}
7
class Appoint {
8
public handled: boolean;
9
public value: any;
10
public queue: QueueItem[];
11
private state: AppointState;
12
public constructor(resolver: Function) {
13
if (!isFunction(resolver)) {
14
throw new TypeError("resolver must be a function");
15
}
16
// 设置当前实例状态
17
this.state = AppointState.PENDING;
18
this.value = void 0;
19
// 初始化回调队列
20
this.queue = [];
21
// true代表没有设置then
22
this.handled = true;
23
if (resolver !== INTERNAL) {
24
// 安全执行传入的函数
25
safelyResolveThen(this, resolver);
26
}
27
}
28
// state 的getset
29
public setState(state: AppointState) {
30
if (this.state === AppointState.PENDING && this.state !== state) {
31
this.state = state;
32
}
33
}
34
public getState(): AppointState {
35
return this.state;
36
}
37
}

safelyResolveThen

1
function safelyResolveThen(self: Appoint, then: (arg: any) => any) {
2
let called: boolean = false;
3
try {
4
then(
5
function resolvePromise(value: any) {
6
if (called) {
7
return;
8
}
9
// 保证doResolve,doReject只执行一次
10
called = true;
11
// 改变当前状态以及调用回调队列
12
doResolve(self, value);
13
},
14
function rejectPromise(error: Error) {
15
if (called) {
16
return;
17
}
18
// 同上
19
called = true;
20
doReject(self, error);
21
}
22
);
23
} catch (error) {
24
// 特别捕捉错误
25
if (called) {
26
return;
27
}
28
called = true;
29
doReject(self, error);
30
}
31
}

doResolve, doReject

1
/**
2
* 如果value不是一个Promise,对Promise调用回调队列。
3
* 如果是就等待这个Promise回调
4
*/
5
function doResolve(self: Appoint, value: any) {
6
try {
7
// 判断是否为Promise
8
const then = getThen(value);
9
if (then) {
10
//
11
safelyResolveThen(self, then);
12
} else {
13
// 改变状态
14
self.setState(AppointState.FULFILLED);
15
self.value = value;
16
// 调用回调队列
17
self.queue.forEach((queueItem) => {
18
queueItem.callFulfilled(value);
19
});
20
}
21
return self;
22
} catch (error) {
23
return doReject(self, error);
24
}
25
}
26
/**
27
* 调用回调队列
28
*/
29
function doReject(self: Appoint, error: Error) {
30
// 改变状态
31
self.setState(AppointState.REJECTED);
32
self.value = error;
33
if (self.handled) {
34
// 未设置then回调
35
asap(() => {
36
// 创建一个异步任务保证代码都执行了再判断
37
if (self.handled) {
38
if (typeof process !== "undefined") {
39
// node 环境下触发unhandledRejection事件
40
process.emit("unhandledRejection", error, self);
41
} else {
42
// 浏览器环境直接打印即可
43
console.error(error);
44
}
45
}
46
});
47
}
48
self.queue.forEach((queueItem) => {
49
queueItem.callRejected(error);
50
});
51
return self;
52
}
53
/**
54
* 判断是否为Object且有then属性的方法,
55
* 有返回这个方法的绑定this
56
* 这种判断方式会发生如果
57
* resolve({ then: () => {} })的话就会丢失下次的then
58
* 原生的Promise也是相同
59
*/
60
function getThen(obj: any): Function {
61
const then = obj && obj.then;
62
if (obj && (isObject(obj) || isFunction(obj)) && isFunction(then){
63
return then.bind(obj);
64
}
65
return null;
66
}

Appoint().then, Appoint().catch

1
/**
2
* 使用micro-task的异步方案来执行方法
3
*/
4
function asap(callback) {
5
if (typeof process !== "undefined") {
6
process.nextTick(callback);
7
} else {
8
const BrowserMutationObserver = window.MutationObserver || window.WebKitMutationObserver
9
let iterations = 0;
10
const observer = new BrowserMutationObserver(callback);
11
const node: any = document.createTextNode("");
12
observer.observe(node, { characterData: true });
13
node.data = (iterations = ++iterations % 2);
14
}
15
}
16
/**
17
* 异步执行then,catch
18
*/
19
function unwrap(promise: Appoint, func: Function, value: any): void {
20
asap(() => {
21
let returnValue;
22
try {
23
// 执行then,catch回调获得返回值
24
returnValue = func(value);
25
} catch (error) {
26
// 发生异常直接触发该promise的Reject
27
return doReject(promise, error);
28
}
29
if (returnValue === promise) {
30
// 执行then,catch回调返回值不能为promise自己
31
doReject(promise, new TypeError("Cannot resolve promise with itself"));
32
} else {
33
// then,catch回调成功,直接触发该promise的Resolve
34
doResolve(promise, returnValue);
35
}
36
});
37
}
38
39
public then<U>(
40
onFulfilled?: (value?: any) => U,
41
onRejected?: (error?: any) => U,
42
): Appoint {
43
// 直接无视来做到值穿透
44
if (!isFunction(onFulfilled) &&
45
this.state === AppointState.FULFILLED ||
46
!isFunction(onRejected) &&
47
this.state === AppointState.REJECTED
48
) {
49
return this;
50
}
51
// 新建一个空的
52
const promise = new Appoint(INTERNAL);
53
// 当前实例已经被设置then
54
if (this.handled) {
55
this.handled = false;
56
}
57
if (this.getState() !== AppointState.PENDING) {
58
// 当前实例已经结束运行直接根据状态获取要回调的方法
59
const resolver = this.getState() === AppointState.FULFILLED ? onFulfilled : onRejected;
60
// 异步执行resolver,如果成功会触发新实例的then,catch
61
unwrap(promise, resolver, this.value);
62
} else {
63
// 如果Promise的任务还在继续就直接把生成一个QueueItem
64
// 并设置好新的Promise实例
65
this.queue.push(new QueueItem(promise, onFulfilled, onRejected));
66
}
67
// 返回新生成的
68
return promise;
69
}
70
public catch<U>(onRejected: (error?: any) => U): Appoint {
71
return this.then(null, onRejected);
72
}

QueueItem

1
export class QueueItem {
2
// 每次then|catch生成的新实例
3
public promise: Appoint;
4
// then回调
5
public callFulfilled: Function;
6
// catch回调
7
public callRejected: Function;
8
constructor(promise: Appoint, onFulfilled?: Function, onRejected?: Function {
9
this.promise = promise;
10
if (isFunction(onFulfilled)) {
11
this.callFulfilled = function callFulfilled(value: any) {
12
// 异步执行callFulfilled,后触发新实例的then,catch
13
unwrap(this.promise, onFulfilled, value);
14
};
15
} else {
16
this.callFulfilled = function callFulfilled(value: any) {
17
// 没有设置callFulfilled的话直接触发新实例的callFulfilled
18
/*
19
例如下面这种代码一次catch但是没有then而下面代码中的then是catch返回的新实例
20
所以需要直接
21
new Promise(() => {
22
})
23
.catch(() => {})
24
.then()
25
*/
26
doResolve(this.promise, value);
27
};
28
}
29
if (isFunction(onRejected)) {
30
this.callRejected = function callRejected(error: Error) {
31
// 异步执行callRejected,后会触发新实例的then,catch
32
unwrap(this.promise, onRejected, error);
33
};
34
} else {
35
this.callRejected = function callRejected(error: Error) {
36
// 没有设置callRejected的话直接触发新实例的callRejected
37
doReject(this.promise, error);
38
};
39
}
40
}
41
}

utils

1
export function isFunction(func: any): boolean {
2
return typeof func === "function";
3
}
4
export function isObject(obj: any): boolean {
5
return typeof obj === "object";
6
}
7
export function isArray(arr: any): boolean {
8
return Object.prototype.toString.call(arr) === "[object Array]";
9
}

Appoint.resolve, Appoint.reject

1
public static resolve(value: any): Appoint {
2
if (value instanceof Appoint) {
3
return value;
4
}
5
return doResolve(new Appoint(INTERNAL), value);
6
}
7
public static reject(error: any): Appoint {
8
if (error instanceof Appoint) {
9
return error;
10
}
11
return doReject(new Appoint(INTERNAL), error);
12
}

Appoint.all, Appoint.race

1
/**
2
* 传入一个Promise数组生成新的Promise所有Promise执行完后回调
3
*/
4
public static all(iterable: Appoint[]): Appoint {
5
const self = this;
6
if (!isArray(iterable)) {
7
return this.reject(new TypeError("must be an array"));
8
}
9
const len = iterable.length;
10
let called = false;
11
if (!len) {
12
return this.resolve([]);
13
}
14
const values = new Array(len);
15
let i: number = -1;
16
const promise = new Appoint(INTERNAL);
17
while (++i < len) {
18
allResolver(iterable[i], i);
19
}
20
return promise;
21
function allResolver(value: Appoint, index: number) {
22
self.resolve(value).then(resolveFromAll, (error: Error) => {
23
if (!called) {
24
called = true;
25
doReject(promise, error);
26
}
27
});
28
function resolveFromAll(outValue: any) {
29
values[index] = outValue;
30
if (index === len - 1 && !called) {
31
called = true;
32
doResolve(promise, values);
33
}
34
}
35
}
36
}
37
/**
38
* 与all类似但是,只要一个Promise回调的就回调
39
*/
40
public static race(iterable: Appoint[]): Appoint {
41
const self = this;
42
if (!isArray(iterable)) {
43
return this.reject(new TypeError("must be an array"));
44
}
45
const len = iterable.length;
46
let called = false;
47
if (!len) {
48
return this.resolve([]);
49
}
50
const values = new Array(len);
51
let i: number = -1;
52
const promise = new self(INTERNAL);
53
while (++i < len) {
54
resolver(iterable[i]);
55
}
56
return promise;
57
function resolver(value: Appoint) {
58
self.resolve(value).then((response: any) => {
59
if (!called) {
60
called = true;
61
doResolve(promise, response);
62
}
63
}, (error: Error) => {
64
if (!called) {
65
called = true;
66
doReject(promise, error);
67
}
68
});
69
}
70
}

三、co 原理

不使用 co 的话不停的 then,和 callback 明显会很难受。

1
function callback (null, name) {
2
console.log(name)
3
}
4
new Promise(function(resolve) {
5
resolve('<h1>test</h1>')
6
}).then(html => {
7
setTimeout(function(){
8
callback('test' + html)
9
}, 100)
10
})

改用 co 异步代码感觉和写同步代码一样。

1
const co = require("co");
2
co(function* test() {
3
const html = yield new Promise(function(resolve) {
4
resolve("<h1>test</h1>");
5
});
6
console.log("--------");
7
const name = yield function(callback) {
8
setTimeout(function() {
9
callback(null, "test" + html);
10
}, 100);
11
};
12
return name;
13
}).then(console.log);

这里不得不说下 Generator 了,直接看执行效果吧:

1
function* gen() {
2
const a = yield 1;
3
console.log("a: ", a);
4
const b = yield 2;
5
console.log("b: ", b);
6
return 3;
7
}
8
const test = gen();
9
test.next(); // Object { value: 1, done: false }
10
test.next(4); // a: 4\n Object { value: 2, done: false }
11
test.next(5); // b: 5\n Object { value: 3, done: true }

很明显除了第一次next的参数都会赋值到上一次的yield的左边变量。
最后一次的next返回的valuereturn的值,其它都是yield右边的变量。
co就是通过不停的next获取到支持的异步对象回调后把值放到下次的next中从而达到效果。

四、co 的实现

1
const slice = Array.prototype.slice;
2
const co: any = function co_(gen) {
3
const ctx = this;
4
const args = slice.call(arguments, 1);
5
return new Promise(function _(resolve, reject) {
6
// 把传入的方法执行一下并存下返回值
7
if (typeof gen === "function") {
8
gen = gen.apply(ctx, args);
9
}
10
// 1. 传入的是一个方法通过上面的执行获得的返回值,
11
// 如果不是一个有next方法的对象直接resolve出去
12
// 2. 传入的不是一个方法且不是一个next方法的对象直接resolve出去
13
if (!gen || typeof gen.next !== "function") {
14
return resolve(gen);
15
}
16
// 执行,第一次next不需要值
17
onFulfilled();
18
/**
19
* @param {Mixed} res
20
* @return {null}
21
*/
22
function onFulfilled(res?: any) {
23
let ret;
24
try {
25
// 获取next方法获得的对象,并把上一次的数据传递过去
26
ret = gen.next(res);
27
} catch (e) {
28
// generator 获取下一个yield值发生异常
29
return reject(e);
30
}
31
// 处理yield的值把它转换成promise并执行
32
next(ret);
33
return null;
34
}
35
/**
36
* @param {Error} err
37
* @return {undefined}
38
*/
39
function onRejected(err) {
40
let ret;
41
try {
42
// 把错误抛到generator里,并且接收下次的yield
43
ret = gen.throw(err);
44
} catch (e) {
45
// generator 获取下一个yield值发生异常
46
return reject(e);
47
}
48
// 处理yield的值
49
next(ret);
50
}
51
function next(ret) {
52
// generator执行完并把返回值resolve出去
53
if (ret.done) {
54
return resolve(ret.value);
55
}
56
// 把value转换成Promise
57
const value = toPromise(ctx, ret.value);
58
if (value && isPromise(value)) {
59
// 等待Promise执行
60
return value.then(onFulfilled, onRejected);
61
}
62
// yield的值不支持
63
return onRejected(
64
new TypeError(
65
"You may only yield a function, promise," +
66
" generator, array, or object, " +
67
'but the following object was passed: "' +
68
String(ret.value) +
69
'"'
70
)
71
);
72
}
73
});
74
};

toPromise

1
function toPromise(ctx: any, obj: any) {
2
if (!obj) {
3
return obj;
4
}
5
if (isPromise(obj)) {
6
return obj;
7
}
8
// 判断是 Generator 对象|方法 直接通过 co 转换为Promise
9
if (isGeneratorFunction(obj) || isGenerator(obj)) {
10
return co.call(ctx, obj);
11
}
12
// 判断是个回调方法
13
if ("function" === typeof obj) {
14
return thunkToPromise(ctx, obj);
15
}
16
// 判断是个数组
17
if (Array.isArray(obj)) {
18
return arrayToPromise(ctx, obj);
19
}
20
// 根据对象属性把所有属性转为一个Promise
21
if (isObject(obj)) {
22
return objectToPromise(ctx, obj);
23
}
24
// 基础数据类 1 , true
25
return obj;
26
}

转换方法这个懒得说了

1
function thunkToPromise(ctx, fn) {
2
return new Promise(function _p(resolve, reject) {
3
fn.call(ctx, function _(err, res) {
4
if (err) {
5
return reject(err);
6
}
7
if (arguments.length > 2) {
8
res = slice.call(arguments, 1);
9
}
10
resolve(res);
11
});
12
});
13
}
14
15
function arrayToPromise(ctx, obj: any[]) {
16
return Promise.all(obj.map(item => toPromise(ctx, item)));
17
}
18
19
function objectToPromise(ctx, obj) {
20
const results = {};
21
const keys = Object.keys(obj);
22
const promises = [];
23
for (let i = 0, len = keys.length; i < len; i++) {
24
const key = keys[i];
25
const val = obj[key];
26
const promise = toPromise(ctx, val);
27
if (promise && isPromise(promise)) {
28
promises.push(
29
promise.then(function _(res) {
30
results[key] = res;
31
})
32
);
33
} else {
34
results[key] = val;
35
}
36
}
37
return Promise.all(promises).then(function _() {
38
return results;
39
});
40
}

还有一些判断工具函数

1
function isPromise(obj: { then: Function) {
2
return "function" === typeof obj.then;
3
}
4
function isGenerator(obj) {
5
return "function" === typeof obj.next &&
6
"function" === typeof obj.throw;
7
}
8
function isGeneratorFunction(obj) {
9
const constructor = obj.constructor;
10
if (!constructor) { return false; }
11
if ("GeneratorFunction" === constructor.name ||
12
"GeneratorFunction" === constructor.displayName) {
13
return true;
14
}
15
return isGenerator(constructor.prototype);
16
}
17
function isObject(val) {
18
return Object === val.constructor;
19
}

五、资料

  1. 项目源代码
  2. 深入 Promise(一)——Promise 实现详解
  3. 英文版 Promise/A+
  4. 中文版 Promise/A+

六、后记

  1. 这次逼着自己写 3 天就写完了,果然就是懒。
  2. 接下来写一个系列文章preact的源码解析与实现。
  3. 尽量一周出一篇?看看情况吧。