-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
122 lines (105 loc) · 2.57 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
'use strict'
// speculum - transform concurrently
exports = module.exports = Speculum
const debug = require('util').debuglog('speculum')
const stream = require('readable-stream')
const util = require('util')
function Speculum (opts, create, x = 1) {
if (typeof x !== 'number') throw new Error('multiplier must be number')
x = Math.max(x, 1)
if (!(this instanceof Speculum)) {
return new Speculum(opts, create, x)
}
stream.Transform.call(this, opts)
this.create = create
this.x = x
this.writers = []
this.busy = new Set()
}
util.inherits(Speculum, stream.Transform)
Speculum.prototype.createWriter = function () {
const writer = this.create()
let ok = true
const write = () => {
let chunk
while (ok && (chunk = writer.read()) !== null) {
ok = this.push(chunk)
}
if (!ok) {
this.once('drain', () => {
ok = true
write()
})
}
}
writer.on('readable', write)
// TODO: Consider replacing the troubled writer with a new instance
writer.on('error', (er) => {
this.emit('error', er)
})
return writer
}
Speculum.prototype.rotate = function () {
const writer = this.writers.shift()
this.writers.push(writer)
return writer
}
Speculum.prototype.next = function () {
if (this.writers.length < this.x) {
const writer = this.createWriter()
this.writers.push(writer)
return writer
}
if (this.writers.length === 1) {
return this.writers[0]
}
let i = this.writers.length
while (i--) {
const writer = this.rotate()
if (!this.busy.has(writer)) {
return writer
}
}
return this.rotate()
}
Speculum.prototype._transform = function (chunk, enc, cb) {
const writer = this.next()
let ok = writer.write(chunk)
if (!ok) {
debug('saturated writer')
if (!this.busy.has(writer)) {
this.busy.add(writer)
writer.once('drain', () => {
this.busy.delete(writer)
})
}
}
cb()
}
Speculum.prototype._flush = function (cb) {
debug('_flush: %s', this.writers.length)
const done = () => {
debug('done')
this.writers = null
this.create = null
this.busy.clear()
this.busy = null
cb()
}
if (this.writers.length === 0) {
return done()
}
this.writers.forEach(writer => {
writer.on('finish', () => {
writer.removeAllListeners() // assuming we own these
this.writers = this.writers.filter(w => { return w !== writer })
if (this.writers.length === 0) {
return done()
}
})
writer.end()
})
}
if (process.mainModule.filename.match(/test/) !== null) {
exports.Speculum = Speculum
}