使用Promise并发限制

背景

我们在需要保证代码在多个异步处理之后执行,我们通常会使用

Promise.all(promises: []).then(fun: function);

Promise.all可以保证,promises数组中所有promise对象都达到resolve状态,才执行then回调

那么会出现的情况是,你在瞬间发出几十万http请求(tcp连接数不足可能造成等待),或者堆积了无数调用栈导致内存溢出.

这个时候需要我们对HTTP的连接数做限制。

内容

//promise并发限制
class PromisePool {
    constructor(max, fn) {
        this.max = max; //最大并发量
        this.fn = fn; //自定义的请求函数
        this.pool = []; //并发池
        this.urls = []; //剩余的请求地址
    }
    start(urls) {
        this.urls = urls; //先循环把并发池塞满
        while (this.pool.length < this.max) {
            let url = this.urls.shift();
            this.setTask(url);
        }
        //利用Promise.race方法来获得并发池中某任务完成的信号
        let race = Promise.race(this.pool);
        return this.run(race);
    }
    run(race) {
        race
            .then(res => {
                //每当并发池跑完一个任务,就再塞入一个任务
                let url = this.urls.shift();
                this.setTask(url);
                return this.run(Promise.race(this.pool));
            })
    }
    setTask(url) {
        if (!url) return
        let task = this.fn(url);
        this.pool.push(task); //将该任务推入pool并发池中
        console.log(`\x1B[43m ${url} 开始,当前并发数:${this.pool.length}`)
        task.then(res => {
            //请求结束后将该Promise任务从并发池中移除
            this.pool.splice(this.pool.indexOf(task), 1);
            console.log(`\x1B[43m ${url} 结束,当前并发数:${this.pool.length}`);
        })
    }
}
//test
const URLS = [
    'bytedance.com',
    'tencent.com',
    'alibaba.com',
    'microsoft.com',
    'apple.com',
    'hulu.com',
    'amazon.com'
]
//自定义请求函数
var requestFn = url => {
    return new Promise(resolve => {
        setTimeout(() => {
            resolve(`任务${url}完成`)
        }, 1000)
    }).then(res => {
        console.log('外部逻辑', res);
    })
}
const pool = new PromisePool(5, requestFn); //并发数为5
pool.start(URLS)

从上面可以看出,思路如下:定义一个 PromisePool 对象,初始化一个 pool 作为并发池,然后先循环把并发池塞满,不断地调用 setTask 然后通过自己自定义的任务函数(任务函数可以是网络请求封装的 promise 对象,或者是其他的),而且每个任务是一个Promise对象包装的,执行完就 pop 出连接池, 任务push 进并发池 pool 中。

 //利用Promise.race方法来获得并发池中某任务完成的信号
let race = Promise.race(this.pool);
return this.run(race);
 run(race) {
    race
        .then(res => {
            //每当并发池跑完一个任务,就再塞入一个任务
            let url = this.urls.shift();
            this.setTask(url);
            return this.run(Promise.race(this.pool));
        })
}

这个地方就是不断通过递归的方式,每当并发池跑完一个任务,就再塞入一个任务

其他

npm中有很多实现这个功能的第三方包,比如async-pool、es6-promise-pool、p-limit,也可以直接拿来用

上一篇:实现Runnable接口(龟兔赛跑完整代码)


下一篇:P4149 [IOI2011]Race