异步请求的节流

section1

为什么要限制异步请求数量?

如果做过爬虫的小老弟可能知道,如果对服务器依次发出大量的 HTTP 请求,即使不考虑目标服务器是否会宕机,大量的请求也可能会使自己的服务器浪费大量资源甚至宕机,不利于对其他任务的处理,

同时也要预防对方封掉自己的 IP,所以对异步请求限流是一个十分重要的需求。

ES6 以后,JS 为用户提供了链式处理异步的 Promise,可以帮助我们更好的处理这个需求,本文参考async-pool

通过讲解async-pool的源码,来论证这个处理过程。

section2

依旧测试驱动开发:

我们先整理需求:

  • 限制服务器同时发送的异步请求的数量

然后我们整理思路:

  1. 如果直接生成大量 Promise 显然无法对其中的请求进行限制
  2. 所以这里我们需要动态生成 Promise
  3. 其次需要一个队列,同时限制这个队列长度
  4. 依次生成 Promise,塞入执行中的 Promise 的队列
  5. 如果队列长度超过这个限制值,则阻塞继续生成 Promise 对象
  6. 当有任务完成时候,需要解除阻塞,继续 4,直到所有请求完成
  7. 使用返回所有获取的数据

所以我们需要调用的效果是这样的:

1
const result = await requestThrottle([url1,url2....], 5, requestFn)

section3

照例先贴代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async function requestThrottle(arr, max, cb) {
const ret = [];
const execating = [];
if (max < arr.length) {
for (let item of arr) {
const p = Promise.resolve(cb(item));
ret.push(p);
if (execating.length < max) {
const e = p.then(() => execating.splice(execating.indexOf(e), 1));
execating.push(e);
}
if (execating.length >= max) {
await Promise.race(execating);
}
}
}

return Promise.all(ret);
}

其实挺短的,但是要求对 Promise 有一定深入的了解,如果不了解 Promise 的实现,可以去看我以前的一篇博客。手把手教你写一个符合 PromiseA+规范的 Promise。

下面开始说说我第一次看想到的几个问题:

其他地方都满好理解的,但是那个 e 的处理是如何解释?

事实上,then 中的函数在上一个 Promise 没有执行结束时候,返回的是个 Pending 状态的 Promise

贴个图,应该可以看出来:

promise

所以在异步请求 resolve 之前,e 是一个 pending 状态的回调。

这个 e 中,接到的值是什么无所谓,他的作用使用控制什么时候应该初始化新的 Promise,这里用 Promise.race 监控了 p 调用 then 的时机。当 p 的 then 被调用时候,就把 e 从数组中移除,为数组空出新的空间。然后继续初始化 Promise 做出新的请求。

最后,使用 Promise.all 包裹所有的 Promise 请求(包含 pending 的 Promise)

返回 Promise.all 的值即可。

效果看图:

数组

section4

如果你对上面的理解了,我这里再贴一个我自己实现的思路,虽然没有 async-pool 活用 race 那么精巧,不过也可以当是为看官活跃一下思路吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class SyncValve {
constructor(max) {
this.cache = [];
this.ret = [];
this.count = 0;
this.blocking = null;
this.max = max;
}

addTask(task, cb) {
this.cache.push(() => cb(task));
}

async run() {
for (let i of this.cache) {
console.log(this.count);
this.count++;
this.ret.push(
i().then((d) => {
this.count--;
this.blocking && this.blocking();
this.blocking = null;
return d;
})
);

if (this.count < this.max) {
continue;
} else {
await this.getBlocking();
}
}
console.log(this.ret);
return Promise.all(this.ret);
}

getBlocking() {
return new Promise((res, rej) => {
this.blocking = res;
});
}
}

这里利用了 Promise 的特性,通过保存延迟执行 Promise 的行为来实现了对主线程的阻塞。

这个 Promise 的使用技巧应当是可以使用在很多地方,这里有待看官自己发掘了

写个测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function request(time) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(time);
}, time);
});
}

async function main() {
const sv = new SyncValve(3);
const p = new Promise((res, rej) => {
[1000, 2000, 3000, 2000, 5000].forEach((it) => {
sv.addTask(it, request);
});
});
try {
const ret = await sv.run();
console.log(ret);
} catch (e) {
console.log(e);
}
}

main();

跑一下证明证明:

section end

以上

查看评论