From 0ff0f9d562fb3fdc6977774ff24aac0adabdd4ef Mon Sep 17 00:00:00 2001 From: Anton Burdin Date: Tue, 31 May 2022 20:46:23 +0400 Subject: [PATCH 1/5] fix(service-broker): Awaiting ctx.emit behave differently locally and remotely issue #1065 --- src/constants.js | 4 ++++ src/service-broker.js | 31 +++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/constants.js b/src/constants.js index 660e62cf6..878605d60 100644 --- a/src/constants.js +++ b/src/constants.js @@ -34,6 +34,10 @@ module.exports = { FAILED_SEND_PONG_PACKET: "failedSendPongPacket", /** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/ FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket", + /** @type {String} Emitted when broker fails to handler balanced event*/ + FAILED_HANDLER_BALANCED_EVENT: "failedHandlerBalancedEvent", + /** @type {String} Emitted when broker fails to handler broadcast event*/ + FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent", /** @type {String} Emitted when broker fails to stop all services*/ FAILED_STOPPING_SERVICES: "failedServicesStop", /** @type {String} Emitted when broker fails to stop all services*/ diff --git a/src/service-broker.js b/src/service-broker.js index c7d9ed913..b77955bcc 100644 --- a/src/service-broker.js +++ b/src/service-broker.js @@ -1355,6 +1355,7 @@ class ServiceBroker { if (opts.groups && !Array.isArray(opts.groups)) opts.groups = [opts.groups]; const promises = []; + const localHandlers = []; const ctx = this.ContextFactory.create(this, null, payload, opts); ctx.eventName = eventName; @@ -1384,7 +1385,7 @@ class ServiceBroker { if (ep.id === this.nodeID) { // Local service, call handler const newCtx = ctx.copy(ep); - promises.push(this.registry.events.callEventHandler(newCtx)); + localHandlers.push(this.registry.events.callEventHandler(newCtx)); } else { // Remote service const e = groupedEP[ep.id]; @@ -1406,6 +1407,20 @@ class ServiceBroker { }); } + // invoke local handlers + setImmediate(() => + Promise.allSettled(localHandlers).then(results => { + results + .filter(r => r.status === "rejected") + .forEach(({ reason: error }) => + this.broadcastLocal("$broker.error", { + error, + module: "broker", + type: C.FAILED_HANDLER_BALANCED_EVENT + }) + ); + }) + ); return this.Promise.all(promises); } else if (this.transit) { // Disabled balancer case @@ -1488,7 +1503,19 @@ class ServiceBroker { } // Send to local services - promises.push(this.broadcastLocal(eventName, payload, opts)); + setImmediate(() => + this.Promise.resolve() + .then(() => this.broadcastLocal(eventName, payload, opts)) + .catch(error => + this.broadcastLocal("$broker.error", { + error, + module: "broker", + type: C.FAILED_HANDLER_BROADCAST_EVENT + }) + ) + // catch unresolved error + .catch(err => this.logger.error(err)) + ); return this.Promise.all(promises); } From 5e393f5b756bea47bfdbf467e61eb0b36f22ca6f Mon Sep 17 00:00:00 2001 From: Anton Burdin Date: Wed, 1 Jun 2022 19:25:43 +0400 Subject: [PATCH 2/5] Revert "fix(service-broker): Awaiting ctx.emit behave differently locally and remotely issue #1065" This reverts commit 0ff0f9d562fb3fdc6977774ff24aac0adabdd4ef. --- src/constants.js | 4 ---- src/service-broker.js | 31 ++----------------------------- 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/src/constants.js b/src/constants.js index 878605d60..660e62cf6 100644 --- a/src/constants.js +++ b/src/constants.js @@ -34,10 +34,6 @@ module.exports = { FAILED_SEND_PONG_PACKET: "failedSendPongPacket", /** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/ FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket", - /** @type {String} Emitted when broker fails to handler balanced event*/ - FAILED_HANDLER_BALANCED_EVENT: "failedHandlerBalancedEvent", - /** @type {String} Emitted when broker fails to handler broadcast event*/ - FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent", /** @type {String} Emitted when broker fails to stop all services*/ FAILED_STOPPING_SERVICES: "failedServicesStop", /** @type {String} Emitted when broker fails to stop all services*/ diff --git a/src/service-broker.js b/src/service-broker.js index b77955bcc..c7d9ed913 100644 --- a/src/service-broker.js +++ b/src/service-broker.js @@ -1355,7 +1355,6 @@ class ServiceBroker { if (opts.groups && !Array.isArray(opts.groups)) opts.groups = [opts.groups]; const promises = []; - const localHandlers = []; const ctx = this.ContextFactory.create(this, null, payload, opts); ctx.eventName = eventName; @@ -1385,7 +1384,7 @@ class ServiceBroker { if (ep.id === this.nodeID) { // Local service, call handler const newCtx = ctx.copy(ep); - localHandlers.push(this.registry.events.callEventHandler(newCtx)); + promises.push(this.registry.events.callEventHandler(newCtx)); } else { // Remote service const e = groupedEP[ep.id]; @@ -1407,20 +1406,6 @@ class ServiceBroker { }); } - // invoke local handlers - setImmediate(() => - Promise.allSettled(localHandlers).then(results => { - results - .filter(r => r.status === "rejected") - .forEach(({ reason: error }) => - this.broadcastLocal("$broker.error", { - error, - module: "broker", - type: C.FAILED_HANDLER_BALANCED_EVENT - }) - ); - }) - ); return this.Promise.all(promises); } else if (this.transit) { // Disabled balancer case @@ -1503,19 +1488,7 @@ class ServiceBroker { } // Send to local services - setImmediate(() => - this.Promise.resolve() - .then(() => this.broadcastLocal(eventName, payload, opts)) - .catch(error => - this.broadcastLocal("$broker.error", { - error, - module: "broker", - type: C.FAILED_HANDLER_BROADCAST_EVENT - }) - ) - // catch unresolved error - .catch(err => this.logger.error(err)) - ); + promises.push(this.broadcastLocal(eventName, payload, opts)); return this.Promise.all(promises); } From 4a1f7d182798458c80837b5ee2855a7915c90ea2 Mon Sep 17 00:00:00 2001 From: Anton Burdin Date: Wed, 1 Jun 2022 19:33:19 +0400 Subject: [PATCH 3/5] fix(service-broker): event emitter implementation issue #1065 --- src/constants.js | 2 ++ src/registry/event-catalog.js | 19 ++++++++++++++++++- test/unit/registry/event-catalog.spec.js | 22 ++++++++++++---------- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/constants.js b/src/constants.js index 660e62cf6..f1f383c27 100644 --- a/src/constants.js +++ b/src/constants.js @@ -34,6 +34,8 @@ module.exports = { FAILED_SEND_PONG_PACKET: "failedSendPongPacket", /** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/ FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket", + /** @type {String} Emitted when broker fails to handler broadcast event*/ + FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent", /** @type {String} Emitted when broker fails to stop all services*/ FAILED_STOPPING_SERVICES: "failedServicesStop", /** @type {String} Emitted when broker fails to stop all services*/ diff --git a/src/registry/event-catalog.js b/src/registry/event-catalog.js index db9e6317e..c6dee63b6 100644 --- a/src/registry/event-catalog.js +++ b/src/registry/event-catalog.js @@ -11,6 +11,8 @@ const utils = require("../utils"); const Strategies = require("../strategies"); const EndpointList = require("./endpoint-list"); const EventEndpoint = require("./endpoint-event"); +const EventEmitter = require("events"); +const { FAILED_HANDLER_BROADCAST_EVENT } = require("../constants"); /** * Catalog for events @@ -35,6 +37,21 @@ class EventCatalog { this.events = []; this.EndpointFactory = EventEndpoint; + + this._localBus = new EventEmitter(); + this._localBus.on("broker.event", ctx => { + ctx.endpoint.event + .handler(ctx) + .catch(error => + this.broker.broadcastLocal("$broker.error", { + error, + module: "broker", + type: FAILED_HANDLER_BROADCAST_EVENT + }) + ) + // catch unresolved error + .catch(err => this.logger.error(err)); + }); } /** @@ -205,7 +222,7 @@ class EventCatalog { * @memberof EventCatalog */ callEventHandler(ctx) { - return ctx.endpoint.event.handler(ctx); + return this._localBus.emit("broker.event", ctx); } /** diff --git a/test/unit/registry/event-catalog.spec.js b/test/unit/registry/event-catalog.spec.js index 8c3ab2296..e96824427 100644 --- a/test/unit/registry/event-catalog.spec.js +++ b/test/unit/registry/event-catalog.spec.js @@ -7,7 +7,6 @@ let EventCatalog = require("../../../src/registry/event-catalog"); let EndpointList = require("../../../src/registry/endpoint-list"); let EventEndpoint = require("../../../src/registry/endpoint-event"); let ServiceBroker = require("../../../src/service-broker"); -const { protectReject } = require("../utils"); describe("Test EventCatalog constructor", () => { let broker = new ServiceBroker({ logger: false }); @@ -521,11 +520,13 @@ describe("Test EventCatalog.callEventHandler", () => { ctx.eventGroups = ["mail", "payment"]; ctx.eventType = "emit"; - it("should add catch handler to result", () => { + it("should add catch handler to result", async () => { let resolver; ctx.endpoint.event.handler = jest.fn(() => new Promise(res => (resolver = res))); - const p = catalog.callEventHandler(ctx); + catalog.callEventHandler(ctx); + + await broker.Promise.delay(10); expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1); expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx); @@ -533,16 +534,16 @@ describe("Test EventCatalog.callEventHandler", () => { expect(errorHandler).toHaveBeenCalledTimes(0); resolver(); - - return p; }); - it("should catch error", () => { + it("should catch error", async () => { let rejecter; + + const spy = jest.spyOn(broker.localBus, "emit"); ctx.endpoint.event.handler = jest.fn(() => new Promise((res, rej) => (rejecter = rej))); broker.logger.error = jest.fn(); - const p = catalog.callEventHandler(ctx); + catalog.callEventHandler(ctx); expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1); expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx); @@ -550,9 +551,10 @@ describe("Test EventCatalog.callEventHandler", () => { const err = new Error("Something went wrong"); rejecter(err); - return p.then(protectReject).catch(e => { - expect(e).toBe(err); - }); + await broker.Promise.delay(10); + expect(spy.mock.calls[0][1].error).toBe(err); + + spy.mockRestore(); }); }); From c8bdd4b7f1d927b1391aebd614098488f640175f Mon Sep 17 00:00:00 2001 From: Anton Burdin Date: Thu, 2 Jun 2022 11:47:36 +0400 Subject: [PATCH 4/5] fix(service-broker): event emitter implementation issue #1065 --- src/registry/event-catalog.js | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/registry/event-catalog.js b/src/registry/event-catalog.js index c6dee63b6..f792cbe6f 100644 --- a/src/registry/event-catalog.js +++ b/src/registry/event-catalog.js @@ -19,7 +19,7 @@ const { FAILED_HANDLER_BROADCAST_EVENT } = require("../constants"); * * @class EventCatalog */ -class EventCatalog { +class EventCatalog extends EventEmitter { /** * Creates an instance of EventCatalog. * @@ -29,6 +29,7 @@ class EventCatalog { * @memberof EventCatalog */ constructor(registry, broker, StrategyFactory) { + super(); this.registry = registry; this.broker = broker; this.logger = registry.logger; @@ -38,8 +39,7 @@ class EventCatalog { this.EndpointFactory = EventEndpoint; - this._localBus = new EventEmitter(); - this._localBus.on("broker.event", ctx => { + this.on("broker.event", ctx => { ctx.endpoint.event .handler(ctx) .catch(error => @@ -170,11 +170,7 @@ class EventCatalog { /** * Call local service handlers * - * @param {String} eventName - * @param {any} payload - * @param {Array?} groupNames - * @param {String} nodeID - * @param {boolean} broadcast + * @param {Context} ctx * @returns {Promise} * * @memberof EventCatalog @@ -188,8 +184,9 @@ class EventCatalog { this.events.forEach(list => { if (!utils.match(ctx.eventName, list.name)) return; if ( + // null or undefined ctx.eventGroups == null || - ctx.eventGroups.length == 0 || + ctx.eventGroups.length === 0 || ctx.eventGroups.indexOf(list.group) !== -1 ) { if (isBroadcast) { @@ -222,7 +219,7 @@ class EventCatalog { * @memberof EventCatalog */ callEventHandler(ctx) { - return this._localBus.emit("broker.event", ctx); + return this.emit("broker.event", ctx); } /** @@ -246,7 +243,7 @@ class EventCatalog { */ remove(eventName, nodeID) { this.events.forEach(list => { - if (list.name == eventName) list.removeByNodeID(nodeID); + if (list.name === eventName) list.removeByNodeID(nodeID); }); } From 4bb2a8702d6efb6b2419fa2a39bf28bdbc5605bc Mon Sep 17 00:00:00 2001 From: Anton Burdin Date: Mon, 6 Jun 2022 22:08:53 +0400 Subject: [PATCH 5/5] fix(service-broker): event emitter implementation issue #1065 --- test/integration/tracing.spec.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/integration/tracing.spec.js b/test/integration/tracing.spec.js index 7ea3b6f6b..2f2d13230 100644 --- a/test/integration/tracing.spec.js +++ b/test/integration/tracing.spec.js @@ -81,8 +81,7 @@ describe("Test Tracing feature with actions", () => { await Promise.all( posts.map(async post => { - const author = await ctx.call("users.get", { id: post.author }); - post.author = author; //eslint-disable-line + post.author = await ctx.call("users.get", { id: post.author }); return post; }) ); @@ -124,7 +123,7 @@ describe("Test Tracing feature with actions", () => { }, async handler(ctx) { - let user = USERS.find(user => user.id == ctx.params.id); + let user = USERS.find(user => user.id === ctx.params.id); if (user) { const span = ctx.startSpan("cloning", { tags: { @@ -223,7 +222,8 @@ describe("Test Tracing feature with actions", () => { } }); - await Promise.delay(500); + // event loop lag <10ms + await Promise.delay(510); STORE.sort((a, b) => a.startTicks - b.startTicks);