Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(event-catalog): local emit & broadcast handlers changed from async to event-driven #1096

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ module.exports = {
FAILED_SEND_PONG_PACKET: "failedSendPongPacket",
/** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/
FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket",
/** @type {String} Emitted when broker fails to handler broadcast event*/
FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent",
/** @type {String} Emitted when broker fails to stop all services*/
FAILED_STOPPING_SERVICES: "failedServicesStop",
/** @type {String} Emitted when broker fails to stop all services*/
Expand Down
32 changes: 23 additions & 9 deletions src/registry/event-catalog.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ const utils = require("../utils");
const Strategies = require("../strategies");
const EndpointList = require("./endpoint-list");
const EventEndpoint = require("./endpoint-event");
const EventEmitter = require("events");
const { FAILED_HANDLER_BROADCAST_EVENT } = require("../constants");

/**
* Catalog for events
*
* @class EventCatalog
*/
class EventCatalog {
class EventCatalog extends EventEmitter {
/**
* Creates an instance of EventCatalog.
*
Expand All @@ -27,6 +29,7 @@ class EventCatalog {
* @memberof EventCatalog
*/
constructor(registry, broker, StrategyFactory) {
super();
this.registry = registry;
this.broker = broker;
this.logger = registry.logger;
Expand All @@ -35,6 +38,20 @@ class EventCatalog {
this.events = [];

this.EndpointFactory = EventEndpoint;

this.on("broker.event", ctx => {
Copy link
Member

@icebob icebob Jun 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should use an event emitter to call a non-await-ed method? I feel it's too expensive. Not enough just skip the await in the callEventHandler

ctx.endpoint.event
.handler(ctx)
.catch(error =>
this.broker.broadcastLocal("$broker.error", {
error,
module: "broker",
type: FAILED_HANDLER_BROADCAST_EVENT
})
)
// catch unresolved error
.catch(err => this.logger.error(err));
});
}

/**
Expand Down Expand Up @@ -153,11 +170,7 @@ class EventCatalog {
/**
* Call local service handlers
*
* @param {String} eventName
* @param {any} payload
* @param {Array<String>?} groupNames
* @param {String} nodeID
* @param {boolean} broadcast
* @param {Context} ctx
* @returns {Promise<any>}
*
* @memberof EventCatalog
Expand All @@ -171,8 +184,9 @@ class EventCatalog {
this.events.forEach(list => {
if (!utils.match(ctx.eventName, list.name)) return;
if (
// null or undefined
ctx.eventGroups == null ||
ctx.eventGroups.length == 0 ||
ctx.eventGroups.length === 0 ||
ctx.eventGroups.indexOf(list.group) !== -1
) {
if (isBroadcast) {
Expand Down Expand Up @@ -205,7 +219,7 @@ class EventCatalog {
* @memberof EventCatalog
*/
callEventHandler(ctx) {
return ctx.endpoint.event.handler(ctx);
return this.emit("broker.event", ctx);
}

/**
Expand All @@ -229,7 +243,7 @@ class EventCatalog {
*/
remove(eventName, nodeID) {
this.events.forEach(list => {
if (list.name == eventName) list.removeByNodeID(nodeID);
if (list.name === eventName) list.removeByNodeID(nodeID);
});
}

Expand Down
8 changes: 4 additions & 4 deletions test/integration/tracing.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ describe("Test Tracing feature with actions", () => {

await Promise.all(
posts.map(async post => {
const author = await ctx.call("users.get", { id: post.author });
post.author = author; //eslint-disable-line
post.author = await ctx.call("users.get", { id: post.author });
return post;
})
);
Expand Down Expand Up @@ -124,7 +123,7 @@ describe("Test Tracing feature with actions", () => {
},

async handler(ctx) {
let user = USERS.find(user => user.id == ctx.params.id);
let user = USERS.find(user => user.id === ctx.params.id);
if (user) {
const span = ctx.startSpan("cloning", {
tags: {
Expand Down Expand Up @@ -223,7 +222,8 @@ describe("Test Tracing feature with actions", () => {
}
});

await Promise.delay(500);
// event loop lag <10ms
await Promise.delay(510);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this issue is the next tick in node v10, and the test fails. The timeout should be 510ms (although it works with 501ms) because the action already has a 500ms timeout.
It is likely that after v10, optimization was carried out for placing events in a queue, and on later versions, the test passed successfully with the same delay of 500 ms.


STORE.sort((a, b) => a.startTicks - b.startTicks);

Expand Down
22 changes: 12 additions & 10 deletions test/unit/registry/event-catalog.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ let EventCatalog = require("../../../src/registry/event-catalog");
let EndpointList = require("../../../src/registry/endpoint-list");
let EventEndpoint = require("../../../src/registry/endpoint-event");
let ServiceBroker = require("../../../src/service-broker");
const { protectReject } = require("../utils");

describe("Test EventCatalog constructor", () => {
let broker = new ServiceBroker({ logger: false });
Expand Down Expand Up @@ -521,38 +520,41 @@ describe("Test EventCatalog.callEventHandler", () => {
ctx.eventGroups = ["mail", "payment"];
ctx.eventType = "emit";

it("should add catch handler to result", () => {
it("should add catch handler to result", async () => {
let resolver;
ctx.endpoint.event.handler = jest.fn(() => new Promise(res => (resolver = res)));

const p = catalog.callEventHandler(ctx);
catalog.callEventHandler(ctx);

await broker.Promise.delay(10);

expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1);
expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx);

expect(errorHandler).toHaveBeenCalledTimes(0);

resolver();

return p;
});

it("should catch error", () => {
it("should catch error", async () => {
let rejecter;

const spy = jest.spyOn(broker.localBus, "emit");
ctx.endpoint.event.handler = jest.fn(() => new Promise((res, rej) => (rejecter = rej)));
broker.logger.error = jest.fn();

const p = catalog.callEventHandler(ctx);
catalog.callEventHandler(ctx);

expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1);
expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx);

const err = new Error("Something went wrong");
rejecter(err);

return p.then(protectReject).catch(e => {
expect(e).toBe(err);
});
await broker.Promise.delay(10);
expect(spy.mock.calls[0][1].error).toBe(err);

spy.mockRestore();
});
});

Expand Down