Skip to content

Commit

Permalink
worker: refactor stdio to improve performance
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <hello@matteocollina.com>
PR-URL: #56630
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Paolo Insogna <paolo@cowtech.it>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
mcollina authored Jan 23, 2025
1 parent a4895e2 commit d978610
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
6 changes: 4 additions & 2 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,11 @@ class Worker extends EventEmitter {
{
const { stream, chunks } = message;
const readable = this[kParentSideStdio][stream];
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
// This is a hot path, use a for(;;) loop
for (let i = 0; i < chunks.length; i++) {
const { chunk, encoding } = chunks[i];
readable.push(chunk, encoding);
});
}
return;
}
case messageTypes.STDIO_WANTS_MORE_DATA:
Expand Down
35 changes: 22 additions & 13 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
'use strict';

const {
ArrayPrototypeForEach,
ArrayPrototypeMap,
ArrayPrototypePush,
Array,
FunctionPrototypeBind,
FunctionPrototypeCall,
ObjectAssign,
Expand Down Expand Up @@ -77,7 +75,7 @@ const kOnMessage = Symbol('kOnMessage');
const kOnMessageError = Symbol('kOnMessageError');
const kPort = Symbol('kPort');
const kWaitingStreams = Symbol('kWaitingStreams');
const kWritableCallbacks = Symbol('kWritableCallbacks');
const kWritableCallback = Symbol('kWritableCallback');
const kStartedReading = Symbol('kStartedReading');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kCurrentlyReceivingPorts =
Expand Down Expand Up @@ -282,20 +280,29 @@ class WritableWorkerStdio extends Writable {
super({ decodeStrings: false });
this[kPort] = port;
this[kName] = name;
this[kWritableCallbacks] = [];
this[kWritableCallback] = null;
}

_writev(chunks, cb) {
const toSend = new Array(chunks.length);

// We avoid .map() because it's a hot path
for (let i = 0; i < chunks.length; i++) {
const { chunk, encoding } = chunks[i];
toSend[i] = { chunk, encoding };
}

this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunks: ArrayPrototypeMap(chunks,
({ chunk, encoding }) => ({ chunk, encoding })),
chunks: toSend,
});
if (process._exiting) {
cb();
} else {
ArrayPrototypePush(this[kWritableCallbacks], cb);
// Only one writev happens at any given time,
// so we can safely overwrite the callback.
this[kWritableCallback] = cb;
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
Expand All @@ -311,11 +318,13 @@ class WritableWorkerStdio extends Writable {
}

[kStdioWantsMoreDataCallback]() {
const cbs = this[kWritableCallbacks];
this[kWritableCallbacks] = [];
ArrayPrototypeForEach(cbs, (cb) => cb());
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
this[kPort].unref();
const cb = this[kWritableCallback];
if (cb) {
this[kWritableCallback] = null;
cb();
if (--this[kPort][kWaitingStreams] === 0)
this[kPort].unref();
}
}
}

Expand Down

0 comments on commit d978610

Please sign in to comment.