From c3a7b7cdf908f4113844d2230df2bb2a84a9a382 Mon Sep 17 00:00:00 2001 From: Danil Kincharov Date: Mon, 21 Aug 2023 12:03:23 +0300 Subject: [PATCH 1/2] add an example for sse service with moleculer-web --- examples/sse/assets/index.html | 37 +++++++++++ examples/sse/assets/index.js | 56 +++++++++++++++++ examples/sse/assets/style.css | 65 +++++++++++++++++++ examples/sse/chat.service.js | 15 +++++ examples/sse/index.js | 110 +++++++++++++++++++++++++++++++++ 5 files changed, 283 insertions(+) create mode 100644 examples/sse/assets/index.html create mode 100644 examples/sse/assets/index.js create mode 100644 examples/sse/assets/style.css create mode 100644 examples/sse/chat.service.js create mode 100644 examples/sse/index.js diff --git a/examples/sse/assets/index.html b/examples/sse/assets/index.html new file mode 100644 index 00000000..4e2dc599 --- /dev/null +++ b/examples/sse/assets/index.html @@ -0,0 +1,37 @@ + + + + + + + + moleculer-web sse + + + +
+
Simple Chat
+ +
+ + +
+
+
Simple Chat
+ +
+ + +
+ + + + diff --git a/examples/sse/assets/index.js b/examples/sse/assets/index.js new file mode 100644 index 00000000..b41b2d49 --- /dev/null +++ b/examples/sse/assets/index.js @@ -0,0 +1,56 @@ +/* eslint-disable no-undef */ +document.addEventListener("DOMContentLoaded", () => { + const messages1 = document.getElementById("messages1"); + const messages2 = document.getElementById("messages2"); + + const sendMessage = async (userName, message) => { + return fetch("/api/chat/message", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + user: userName, + message: message, + }), + }); + }; + + const addMessage = (element, message) => { + const messageElement = document.createElement("div"); + messageElement.textContent = message; + element.appendChild(messageElement); + element.scrollTop = element.scrollHeight; + }; + + const setupSSE = (element) => { + const eventSource = new EventSource("/api/chat/message"); + + eventSource.addEventListener("chat.message", (event) => { + const data = JSON.parse(event.data); + addMessage(element, `${data.user}: ${data.message}`); + }); + + eventSource.addEventListener("error", (error) => { + console.error(error); + }); + }; + + document.querySelectorAll(".send-button").forEach((button) => { + button.addEventListener("click", () => { + const chatBox = button.closest(".chat-box"); + const userNameInput = chatBox.querySelector(".user-name"); + const messageInput = chatBox.querySelector(".message"); + const userName = userNameInput.value.trim(); + const message = messageInput.value.trim(); + + if (userName && message) { + sendMessage(userName, message); + messageInput.value = ""; + } + }); + }); + + setupSSE(messages1); + setupSSE(messages2); +}); diff --git a/examples/sse/assets/style.css b/examples/sse/assets/style.css new file mode 100644 index 00000000..12b4fba8 --- /dev/null +++ b/examples/sse/assets/style.css @@ -0,0 +1,65 @@ +body { + font-family: "Roboto", sans-serif; + margin: 0; + display: flex; + justify-content: center; + align-items: center; + min-height: 100vh; + background-color: #f0f0f0; +} + +.chat-box { + display: flex; + flex-direction: column; + background-color: #ffffff; + box-shadow: 0px 3px 6px rgba(0, 0, 0, 0.1); + border-radius: 4px; + width: 300px; + padding: 16px; + margin: 16px; + gap: 20px; +} + +.messages { + background-color: #f0f0f0; + border-radius: 4px; + height: 300px; + overflow-y: scroll; + padding: 8px; + margin-top: 8px; + box-shadow: inset 0px -3px 6px rgba(0, 0, 0, 0.1); +} + +.message { + padding: 4px; + margin: 4px 0; + border-radius: 4px; +} + +.user-input { + display: flex; + margin-top: 8px; +} + +.user-name, +.message { + flex-grow: 1; + border: none; + padding: 8px; + border-radius: 4px; + box-shadow: 0px 2px 3px rgba(0, 0, 0, 0.1); +} + +.send-button { + background-color: #1976d2; + color: #ffffff; + border: none; + border-radius: 4px; + padding: 8px 16px; + cursor: pointer; + transition: background-color 0.3s; +} + +.send-button:hover { + background-color: #1565c0; +} diff --git a/examples/sse/chat.service.js b/examples/sse/chat.service.js new file mode 100644 index 00000000..66a29e82 --- /dev/null +++ b/examples/sse/chat.service.js @@ -0,0 +1,15 @@ +module.exports = { + name: "chat", + actions: { + postMessage: { + params: { + message: "string", + user: "string", + }, + handler(context) { + const { params } = context; + context.emit("chat.message", params); + }, + }, + }, +}; diff --git a/examples/sse/index.js b/examples/sse/index.js new file mode 100644 index 00000000..61039908 --- /dev/null +++ b/examples/sse/index.js @@ -0,0 +1,110 @@ +"use strict"; + +const path = require("node:path"); +const { ServiceBroker, Errors } = require("moleculer"); +const ApiGatewayService = require("../../index"); +const ChatService = require("./chat.service"); + +const { MoleculerError } = Errors; + +const SSE_RETRY_TIMEOUT = 15000; // 15 seconds +const PORT = 3000; +const HOST = "0.0.0.0"; +const SSE_HEADERS = { + Connection: "keep-alive", + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", +}; + +// Create broker +const broker = new ServiceBroker({ + logger: console, + metrics: true, + validation: true, +}); + +broker.createService(ChatService); + +// Load API Gateway +broker.createService({ + name: "sse.gateway", + mixins: [ApiGatewayService], + settings: { + port: PORT, + + ip: HOST, + + assets: { + folder: path.join(__dirname, "assets"), + }, + + routes: [ + { + path: "/api/chat", + aliases: { + "POST message": "chat.postMessage", + "GET message"(request, response) { + response.writeHead(200, SSE_HEADERS); + response.$service.addSSEListener( + response, + "chat.message" + ); + }, + }, + }, + ], + }, + + events: { + "**"(context) { + const { eventName, params } = context; + if (!this.sseListeners.has(eventName)) return; + const listeners = this.sseListeners.get(eventName); + for (const listener of listeners.values()) { + const id = this.sseIds.get(listener) || 0; + const message = this.createSSEMessage(params, eventName, id); + listener.write(message); + this.sseIds.set(listener, id + 1); + } + }, + }, + + methods: { + addSSEListener(stream, event) { + if (!stream.write) + throw new MoleculerError("Only writable can listen to SSE."); + const listeners = this.sseListeners.get(event) || new Set(); + listeners.add(stream); + this.sseListeners.set(event, listeners); + this.sseIds.set(stream, 0); + stream.on("close", () => { + this.sseIds.delete(stream); + listeners.delete(stream); + }); + }, + + createSSEMessage(data, event, id) { + return `event: ${event}\ndata: ${JSON.stringify( + data + )}\nid: ${id}\nretry: ${this.sseRetry}\n\n`; + }, + }, + + started() { + this.sseListeners = new Map(); + this.sseIds = new WeakMap(); + this.sseRetry = SSE_RETRY_TIMEOUT; + }, + + stopped() { + for (const listeners of this.sseListeners.values()) { + for (const listener of listeners.values()) { + if (listener.end) listener.end(); + listeners.delete(listener); + } + } + }, +}); + +// Start server +broker.start(); From a898e72d1ab928af3242b3491728cf15cd707964 Mon Sep 17 00:00:00 2001 From: Danil Kincharov Date: Sat, 9 Sep 2023 10:42:18 +0300 Subject: [PATCH 2/2] remove node: from require, add more exact event name --- examples/sse/chat.service.js | 2 +- examples/sse/index.js | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/examples/sse/chat.service.js b/examples/sse/chat.service.js index 66a29e82..4465a5af 100644 --- a/examples/sse/chat.service.js +++ b/examples/sse/chat.service.js @@ -8,7 +8,7 @@ module.exports = { }, handler(context) { const { params } = context; - context.emit("chat.message", params); + context.emit("chat.sse.message", params); }, }, }, diff --git a/examples/sse/index.js b/examples/sse/index.js index 61039908..f3987182 100644 --- a/examples/sse/index.js +++ b/examples/sse/index.js @@ -1,6 +1,6 @@ "use strict"; -const path = require("node:path"); +const path = require("path"); const { ServiceBroker, Errors } = require("moleculer"); const ApiGatewayService = require("../../index"); const ChatService = require("./chat.service"); @@ -56,20 +56,25 @@ broker.createService({ }, events: { - "**"(context) { + "chat.sse*"(context) { + this.handleSSE(context); + }, + }, + + methods: { + handleSSE(context) { const { eventName, params } = context; - if (!this.sseListeners.has(eventName)) return; - const listeners = this.sseListeners.get(eventName); + const event = eventName.replace("sse.", ""); + if (!this.sseListeners.has(event)) return; + const listeners = this.sseListeners.get(event); for (const listener of listeners.values()) { const id = this.sseIds.get(listener) || 0; - const message = this.createSSEMessage(params, eventName, id); + const message = this.createSSEMessage(params, event, id); listener.write(message); this.sseIds.set(listener, id + 1); } }, - }, - methods: { addSSEListener(stream, event) { if (!stream.write) throw new MoleculerError("Only writable can listen to SSE.");