forked from sindresorhus/p-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
71 lines (64 loc) · 1.39 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
'use strict';
class PQueue {
constructor(opts) {
opts = Object.assign({
concurrency: Infinity
}, opts);
if (opts.concurrency < 1) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
this.queue = [];
this._pendingCount = 0;
this._concurrency = opts.concurrency;
this._resolveEmpty = () => {};
}
_next() {
this._pendingCount--;
for (var p = 0; p < this.queue.length; p++) {
if (this.queue[p].length > 0) {
this.queue[p].shift()();
return;
}
}
this._resolveEmpty();
}
add(fn, priority = 0) {
return new Promise((resolve, reject) => {
const run = () => {
this._pendingCount++;
fn().then(
val => {
resolve(val);
this._next();
},
err => {
reject(err);
this._next();
}
);
};
if (this._pendingCount < this._concurrency) {
run();
} else {
if (typeof this.queue[priority] === "undefined") queue[priority] = [];
this.queue[priority].push(run);
}
});
}
onEmpty() {
return new Promise(resolve => {
const existingResolve = this._resolveEmpty;
this._resolveEmpty = () => {
existingResolve();
resolve();
};
});
}
get size() {
return this.queue.reduce((len, arr) => len + arr.length, 0);
}
get pending() {
return this._pendingCount;
}
}
module.exports = PQueue;