diff --git a/src/transit.js b/src/transit.js index c7dd87a07..2f8e4f593 100644 --- a/src/transit.js +++ b/src/transit.js @@ -895,22 +895,23 @@ class Transit { } else { chunks.push(chunk); } - for (const ch of chunks) { - const copy = Object.assign({}, payload); - copy.seq = ++payload.seq; - copy.stream = true; - copy.params = ch; - - this.logger.debug( - `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` - ); - - this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch( - publishCatch - ); - } - stream.resume(); - return; + + return this.Promise.all( + chunks.map(ch => { + const copy = Object.assign({}, payload); + copy.seq = ++payload.seq; + copy.stream = true; + copy.params = ch; + + this.logger.debug( + `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` + ); + + return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)); + }) + ) + .then(() => stream.resume()) + .catch(publishCatch); }); stream.on("end", () => { @@ -1126,18 +1127,23 @@ class Transit { } else { chunks.push(chunk); } - for (const ch of chunks) { - const copy = Object.assign({}, payload); - copy.seq = ++payload.seq; - copy.stream = true; - copy.data = ch; - this.logger.debug(`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`); + return this.Promise.all( + chunks.map(ch => { + const copy = Object.assign({}, payload); + copy.seq = ++payload.seq; + copy.stream = true; + copy.data = ch; + + this.logger.debug( + `=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}` + ); - this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(publishCatch); - } - stream.resume(); - return; + return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)); + }) + ) + .then(() => stream.resume()) + .catch(publishCatch); }); stream.on("end", () => { diff --git a/test/unit/transit.spec.js b/test/unit/transit.spec.js index d8bfc3d8c..00eadd923 100644 --- a/test/unit/transit.spec.js +++ b/test/unit/transit.spec.js @@ -1931,6 +1931,10 @@ describe("Test Transit._sendRequest", () => { const resolve = jest.fn(); const reject = jest.fn(); + beforeEach(() => { + transit.publish = jest.fn(() => Promise.resolve().delay(40)); + }); + it("should send stream chunks", () => { transit.publish.mockClear(); @@ -2070,8 +2074,11 @@ describe("Test Transit._sendRequest", () => { transit.publish.mockClear(); stream.push(randomData); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) ); @@ -2163,8 +2170,11 @@ describe("Test Transit._sendRequest", () => { }); transit.publish.mockClear(); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) + 1 ); @@ -2715,6 +2725,10 @@ describe("Test Transit.sendResponse", () => { }); describe("with Stream", () => { + beforeEach(() => { + transit.publish = jest.fn(() => Promise.resolve().delay(40)); + }); + it("should send stream chunks", () => { transit.publish.mockClear(); @@ -2819,8 +2833,11 @@ describe("Test Transit.sendResponse", () => { transit.publish.mockClear(); stream.push("first chunk"); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes(1); expect(transit.publish).toHaveBeenCalledWith({ payload: { @@ -2889,8 +2906,11 @@ describe("Test Transit.sendResponse", () => { transit.publish.mockClear(); stream.push(randomData); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) );