Skip to content

Commit

Permalink
fix: updated moleculer-timeout-middleware with streams
Browse files Browse the repository at this point in the history
  • Loading branch information
JS-AK committed Dec 15, 2024
1 parent 4014a8c commit 1a22443
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/middlewares/timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

"use strict";

const { Stream } = require("stream");
const { RequestTimeoutError } = require("../errors");
const { METRIC } = require("../metrics");

Expand Down Expand Up @@ -38,6 +39,11 @@ module.exports = function (broker) {
nodeID,
timeout: ctx.options.timeout
});

if (ctx.params instanceof Stream) {
ctx.params.emit('moleculer-timeout-middleware', ctx.options.timeout)
}

err = new RequestTimeoutError({ action: actionName, nodeID });

broker.metrics.increment(METRIC.MOLECULER_REQUEST_TIMEOUT_TOTAL, {
Expand Down
16 changes: 16 additions & 0 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,13 @@ class Transit {
pass.$pool = new Map();

this.pendingReqStreams.set(payload.id, { sender: payload.sender, stream: pass });

pass.on('moleculer-timeout-middleware', (timeout) => {
setTimeout(() => {
this.pendingReqStreams.delete(payload.id);
this._destroyStreamIfPossible(pass, `Pending request stream ${payload.id} have been closed by timeout ${timeout} ms`)
}, 1000);
})
}

if (payload.seq > pass.$prevSeq + 1) {
Expand Down Expand Up @@ -864,6 +871,14 @@ class Transit {
// Add to pendings
this.pendingRequests.set(ctx.id, request);

if (request.stream) {
const pass = request.ctx.params

pass.on('moleculer-timeout-middleware', (timeout) => {
this._destroyStreamIfPossible(pass, `Request stream ${ctx.id} have been closed by timeout ${timeout} ms`)
})
}

// Publish request
return this.publish(packet)
.then(() => {
Expand Down Expand Up @@ -1103,6 +1118,7 @@ class Transit {
*/
_destroyStreamIfPossible(stream, errorMessage) {
if (!stream.destroyed && stream.destroy) {
stream.on('error', (err) => this.logger.error(err.message))
stream.destroy(new Error(errorMessage));
}
}
Expand Down

0 comments on commit 1a22443

Please sign in to comment.