diff --git a/src/Indomitable.ts b/src/Indomitable.ts index 5829a94..8313a95 100644 --- a/src/Indomitable.ts +++ b/src/Indomitable.ts @@ -9,24 +9,24 @@ import { ShardClient } from './client/ShardClient'; import { ClusterManager } from './manager/ClusterManager'; import { IpcServer } from './ipc/IpcServer'; import { + AbortableData, Chunk, FetchSessions, - MakeAbortableRequest, - AbortableData, InternalOps, InternalOpsData, LibraryEvents, + MakeAbortableRequest, + Sendable, SessionObject, - Transportable, - Sendable + Transportable } from './Util'; /** * Options to control Indomitable behavior */ export interface IndomitableOptions { - clusterCount?: number|'auto'; - shardCount?: number|'auto'; + clusterCount?: number | 'auto'; + shardCount?: number | 'auto'; clientOptions?: DiscordJsClientOptions; clusterSettings?: ClusterSettings; ipcTimeout?: number; @@ -43,6 +43,7 @@ export interface ReconfigureOptions { clusters?: number; shards?: number; } + export interface ShardEventData { clusterId: number, shardId?: number, @@ -56,84 +57,119 @@ export declare interface Indomitable { * @eventProperty */ on(event: 'debug', listener: (message: string) => void): this; + /** * Emitted when an IPC message is received * @eventProperty */ - on(event: 'message', listener: (message: Message|unknown) => void): this; + on(event: 'message', listener: (message: Message | unknown) => void): this; + /** * Emitted when an error occurs * @eventProperty */ on(event: 'error', listener: (error: unknown) => void): this; + /** * Emitted when a new worker process is forked * @eventProperty */ on(event: 'workerFork', listener: (cluster: ClusterManager) => void): this; + /** * Emitted when a worker process is ready * @eventProperty */ on(event: 'workerReady', listener: (cluster: ClusterManager) => void): this; + /** * Emitted when a worker process exits * @eventProperty */ - on(event: 'workerExit', listener: (code: number|null, signal: string|null, cluster: ClusterManager) => void): this; + on(event: 'workerExit', listener: (code: number | null, signal: string | null, cluster: ClusterManager) => void): this; + /** * Emitted when a Discord.js shard is ready * @eventProperty */ on(event: 'shardReady', listener: (event: ShardEventData) => void): this; + /** * Emitted when a Discord.js shard is reconnecting * @eventProperty */ on(event: 'shardReconnect', listener: (event: ShardEventData) => void): this; + /** * Emitted when a Discord.js shard resumes * @eventProperty */ on(event: 'shardResume', listener: (event: ShardEventData) => void): this; + /** * Emitted when a Discord.js shard disconnects * @eventProperty */ on(event: 'shardDisconnect', listener: (event: ShardEventData) => void): this; + /** * Emitted when a Discord.js client is ready * @eventProperty */ on(event: 'clientReady', listener: (event: ShardEventData) => void): this; + /** * Emitted on every ipc message the handler receives * @eventProperty */ on(event: 'raw', listener: (event: unknown) => void): this; + once(event: 'debug', listener: (message: string) => void): this; - once(event: 'message', listener: (message: Message|unknown) => void): this; + + once(event: 'message', listener: (message: Message | unknown) => void): this; + once(event: 'error', listener: (error: unknown) => void): this; + once(event: 'workerFork', listener: (cluster: ClusterManager) => void): this; + once(event: 'workerReady', listener: (cluster: ClusterManager) => void): this; - once(event: 'workerExit', listener: (code: number|null, signal: string|null, cluster: ClusterManager) => void): this; + + once(event: 'workerExit', listener: (code: number | null, signal: string | null, cluster: ClusterManager) => void): this; + once(event: 'shardReady', listener: (event: ShardEventData) => void): this; + once(event: 'shardReconnect', listener: (event: ShardEventData) => void): this; + once(event: 'shardResume', listener: (event: ShardEventData) => void): this; + once(event: 'shardDisconnect', listener: (event: ShardEventData) => void): this; + once(event: 'clientReady', listener: (event: ShardEventData) => void): this; + once(event: 'raw', listener: (event: unknown) => void): this; + off(event: 'debug', listener: (message: string) => void): this; - off(event: 'message', listener: (message: Message|unknown) => void): this; + + off(event: 'message', listener: (message: Message | unknown) => void): this; + off(event: 'error', listener: (error: unknown) => void): this; + off(event: 'workerFork', listener: (cluster: ClusterManager) => void): this; + off(event: 'workerReady', listener: (cluster: ClusterManager) => void): this; - off(event: 'workerExit', listener: (code: number|null, signal: string|null, cluster: ClusterManager) => void): this; + + off(event: 'workerExit', listener: (code: number | null, signal: string | null, cluster: ClusterManager) => void): this; + off(event: 'shardReady', listener: (event: ShardEventData) => void): this; + off(event: 'shardReconnect', listener: (event: ShardEventData) => void): this; + off(event: 'shardResume', listener: (event: ShardEventData) => void): this; + off(event: 'shardDisconnect', listener: (event: ShardEventData) => void): this; + off(event: 'clientReady', listener: (event: ShardEventData) => void): this; + off(event: 'raw', listener: (event: unknown) => void): this; } @@ -141,8 +177,8 @@ export declare interface Indomitable { * The main Indomitable class, exposing all functionality. */ export class Indomitable extends EventEmitter { - public clusterCount: number|'auto'; - public shardCount: number|'auto'; + public clusterCount: number | 'auto'; + public shardCount: number | 'auto'; public cachedSession?: SessionObject; public concurrencyServer?: ConcurrencyServer; public ipcServer?: IpcServer; @@ -159,6 +195,7 @@ export class Indomitable extends EventEmitter { private readonly spawnQueue: ClusterManager[]; private readonly token: string; private busy: boolean; + /** * @param [options.clusterCount=auto] The amount of clusters to spawn. Expects a number or 'auto' * @param [options.shardCount=auto] The number of shards to create. Expects a number or 'auto' @@ -177,7 +214,7 @@ export class Indomitable extends EventEmitter { super(); this.clusterCount = options.clusterCount || 'auto'; this.shardCount = options.shardCount || 'auto'; - this.clientOptions = options.clientOptions || { intents: [ 1 << 0 ] }; + this.clientOptions = options.clientOptions || {intents: [1 << 0]}; this.clusterSettings = options.clusterSettings || {}; this.ipcTimeout = options.ipcTimeout ?? 30000; this.spawnTimeout = options.spawnTimeout ?? 60000; @@ -254,14 +291,14 @@ export class Indomitable extends EventEmitter { this.clusterCount = this.shardCount; this.emit(LibraryEvents.DEBUG, `Starting ${this.shardCount} websocket shards across ${this.clusterCount} clusters`); - const shards = [ ...Array(this.shardCount).keys() ]; + const shards = [...Array(this.shardCount).keys()]; const chunks = Chunk(shards, Math.round(this.shardCount / this.clusterCount)); - Cluster.setupPrimary({ ...{ serialization: 'json' }, ...this.clusterSettings }); + Cluster.setupPrimary({...{serialization: 'json'}, ...this.clusterSettings}); for (let id = 0; id < this.clusterCount; id++) { const chunk = chunks.shift()!; - const cluster = new ClusterManager({ id, shards: chunk, manager: this }); + const cluster = new ClusterManager({id, shards: chunk, manager: this}); this.clusters.set(id, cluster); } @@ -284,7 +321,7 @@ export class Indomitable extends EventEmitter { * Restart all clusters if this instance is the primary process * @returns A promise that resolves to void */ - public async restartAll(): Promise { + public async restartAll(): Promise { if (!Cluster.isPrimary) return; await this.addToSpawnQueue(...this.clusters.values()); } @@ -293,11 +330,11 @@ export class Indomitable extends EventEmitter { * Sends a message to a specific cluster * @returns A promise that resolves to undefined or an unknown value depending on how you reply to it */ - public async send(id: number, sendable: Sendable): Promise { + public async send(id: number, sendable: Sendable): Promise { const cluster = this.clusters.get(id); if (!cluster) throw new Error('Invalid cluster id provided'); - let abortableData: AbortableData|undefined; + let abortableData: AbortableData | undefined; if (this.ipcTimeout !== Infinity && sendable.reply) { abortableData = MakeAbortableRequest(this.ipcTimeout); } @@ -325,7 +362,7 @@ export class Indomitable extends EventEmitter { * @returns An array of promise that resolves to undefined or an unknown value depending on how you reply to it */ public broadcast(sendable: Sendable): Promise { - const clusters = [ ...this.clusters.keys() ]; + const clusters = [...this.clusters.keys()]; return Promise.all(clusters.map(id => this.send(id, sendable))); } @@ -343,17 +380,17 @@ export class Indomitable extends EventEmitter { this.emit(LibraryEvents.DEBUG, `Reconfigured Indomitable to use ${this.shardCount} shard(s)`); const oldClusterCount = Number(this.clusters.size); this.clusterCount = options.clusters || this.clusters.size; - const shards = [ ...Array(this.shardCount).keys() ]; + const shards = [...Array(this.shardCount).keys()]; const chunks = Chunk(shards, Math.round(this.shardCount as number / this.clusterCount)); if (oldClusterCount < this.clusterCount) { const count = this.clusterCount - oldClusterCount; for (let id = this.clusterCount - 1; id < count; id++) { - const cluster = new ClusterManager({ id, shards: [], manager: this }); + const cluster = new ClusterManager({id, shards: [], manager: this}); this.clusters.set(id, cluster); } } if (oldClusterCount > this.clusterCount) { - const keys = [ ...this.clusters.keys() ].reverse(); + const keys = [...this.clusters.keys()].reverse(); const range = keys.slice(0, oldClusterCount - this.clusterCount); for (const key of range) { const cluster = this.clusters.get(key); @@ -392,7 +429,7 @@ export class Indomitable extends EventEmitter { data: {}, internal: true }; - await this.send(id, { content, reply: true }); + await this.send(id, {content, reply: true}); } /** diff --git a/src/Util.ts b/src/Util.ts index 72c985c..ef0030c 100644 --- a/src/Util.ts +++ b/src/Util.ts @@ -201,14 +201,14 @@ export interface RawIpcMessage extends InternalData { * Data structure representing a Discord session */ export interface SessionObject { - url: string; - shards: number; - session_start_limit: { - total: number; - remaining: number; - reset_after: number; + url: string; + shards: number; + session_start_limit: { + total: number; + remaining: number; + reset_after: number; max_concurrency: number; - }; + }; } export interface FetchResponse { @@ -243,7 +243,7 @@ export function Fetch(url: string, options: RequestOptions): Promise { const code = response.statusCode ?? 500; const body = chunks.join(''); - resolve({ code, body, message: response.statusMessage ?? '' }); + resolve({code, body, message: response.statusMessage ?? ''}); }); }); @@ -261,7 +261,7 @@ export async function FetchSessions(token: string): Promise { const url = new URL('https://discord.com/api/v10/gateway/bot'); const response = await Fetch(url.toString(), { method: 'GET', - headers: { authorization: `Bot ${token}` } + headers: {authorization: `Bot ${token}`} }); if (response.code >= 200 && response.code <= 299) return JSON.parse(response.body); @@ -278,7 +278,7 @@ export async function FetchSessions(token: string): Promise { export function Chunk(original: any[], chunks: number): any[] { const array = []; for (let i = 0; i < original.length; i += chunks) - array.push(original.slice(i , i + chunks)); + array.push(original.slice(i, i + chunks)); return array; } @@ -303,5 +303,5 @@ export function MakeAbortableRequest(delay: number): AbortableData { () => controller.abort(new Error(`The request has been aborted in ${seconds} second(s)`)), delay ); - return { controller, timeout }; + return {controller, timeout}; } diff --git a/src/client/ShardClient.ts b/src/client/ShardClient.ts index a8054bf..1f918ef 100644 --- a/src/client/ShardClient.ts +++ b/src/client/ShardClient.ts @@ -1,6 +1,6 @@ import type { Client, ClientOptions as DiscordJsClientOptions } from 'discord.js'; import { Indomitable } from '../Indomitable'; -import { EnvProcessData, ClientEvents, ClientEventData, Delay } from '../Util'; +import { ClientEventData, ClientEvents, EnvProcessData } from '../Util'; import { ShardClientUtil } from './ShardClientUtil'; import { ConcurrencyClient } from '../concurrency/ConcurrencyClient'; @@ -43,7 +43,7 @@ export class ShardClient { this.concurrency = new ConcurrencyClient(); if (!clientOptions.ws) clientOptions.ws = {}; clientOptions.ws.buildIdentifyThrottler = () => Promise.resolve(this.concurrency!); - } + } this.client = new manager.client(clientOptions); // @ts-expect-error: Override shard client util with indomitable shard client util this.client.shard = new ShardClientUtil(this.client, manager); @@ -62,12 +62,24 @@ export class ShardClient { this.client.emit('debug', `[Indomitable]: Identify server responded and is working, Trip Latency: ${Math.round(Date.now() - date)}ms`); } // attach listeners - this.client.once('ready', () => this.send({ op: ClientEvents.READY, data: { clusterId: this.clusterId }})); - this.client.on('shardReady', (shardId: number) => this.send({ op: ClientEvents.SHARD_READY, data: { clusterId: this.clusterId, shardId }})); - this.client.on('shardReconnecting', (shardId: number) => this.send({ op: ClientEvents.SHARD_RECONNECT, data: { clusterId: this.clusterId, shardId }})); - this.client.on('shardResume', (shardId: number, replayed: number) => this.send({ op: ClientEvents.SHARD_RESUME, data: { clusterId: this.clusterId, shardId, replayed }})); + this.client.once('ready', () => this.send({op: ClientEvents.READY, data: {clusterId: this.clusterId}})); + this.client.on('shardReady', (shardId: number) => this.send({ + op: ClientEvents.SHARD_READY, + data: {clusterId: this.clusterId, shardId} + })); + this.client.on('shardReconnecting', (shardId: number) => this.send({ + op: ClientEvents.SHARD_RECONNECT, + data: {clusterId: this.clusterId, shardId} + })); + this.client.on('shardResume', (shardId: number, replayed: number) => this.send({ + op: ClientEvents.SHARD_RESUME, + data: {clusterId: this.clusterId, shardId, replayed} + })); // @ts-ignore -- Discord.JS faulty typings? - this.client.on('shardDisconnect', (event: CloseEvent, shardId: number) => this.send({ op: ClientEvents.SHARD_DISCONNECT, data: { clusterId: this.clusterId, shardId, event }})); + this.client.on('shardDisconnect', (event: CloseEvent, shardId: number) => this.send({ + op: ClientEvents.SHARD_DISCONNECT, + data: {clusterId: this.clusterId, shardId, event} + })); await this.client.login(token); } @@ -79,9 +91,9 @@ export class ShardClient { private send(partial: PartialInternalEvents): void { // @ts-ignore -- our own class const shardClientUtil = this.client.shard as ShardClientUtil; - const content: ClientEventData = { ...partial, internal: true }; + const content: ClientEventData = {...partial, internal: true}; shardClientUtil - .send({ content, reply: false }) + .send({content, reply: false}) .catch((error: unknown) => this.client.emit(ClientEvents.ERROR, error as Error)); } } diff --git a/src/client/ShardClientUtil.ts b/src/client/ShardClientUtil.ts index 6f5ac89..028323e 100644 --- a/src/client/ShardClientUtil.ts +++ b/src/client/ShardClientUtil.ts @@ -5,11 +5,11 @@ import EventEmitter from 'node:events'; import { clearTimeout } from 'timers'; import { ClientSocket } from '../ipc/ClientSocket'; import { - EnvProcessData, - MakeAbortableRequest, AbortableData, + EnvProcessData, InternalOps, InternalOpsData, + MakeAbortableRequest, SessionObject, Transportable } from '../Util'; @@ -19,9 +19,11 @@ export declare interface ShardClientUtil { * Emitted when an IPC message from parent process is received * @eventProperty */ - on(event: 'message', listener: (message: Message|unknown) => void): this; - once(event: 'message', listener: (message: Message|unknown) => void): this; - off(event: 'message', listener: (message: Message|unknown) => void): this; + on(event: 'message', listener: (message: Message | unknown) => void): this; + + once(event: 'message', listener: (message: Message | unknown) => void): this; + + off(event: 'message', listener: (message: Message | unknown) => void): this; } /** @@ -29,12 +31,12 @@ export declare interface ShardClientUtil { */ export class ShardClientUtil extends EventEmitter { public client: Client; - private readonly manager: Indomitable; public readonly clusterId: number; public readonly clusterCount: number; public readonly shardIds: number[]; public readonly shardCount: number; public readonly ipc: ClientSocket; + private readonly manager: Indomitable; constructor(client: Client, manager: Indomitable) { super(); @@ -60,7 +62,7 @@ export class ShardClientUtil extends EventEmitter { internal: true }; const start = process.hrtime.bigint(); - const end = await this.send({ content, reply: true }) as number; + const end = await this.send({content, reply: true}) as number; return Number(BigInt(end) - start); } @@ -74,7 +76,7 @@ export class ShardClientUtil extends EventEmitter { data: `(${script.toString()})(this, ${JSON.stringify(context)})`, internal: true }; - return this.send({ content, reply: true }) as Promise; + return this.send({content, reply: true}) as Promise; } /** @@ -93,10 +95,10 @@ export class ShardClientUtil extends EventEmitter { public fetchSessions(update: boolean = false): Promise { const content: InternalOpsData = { op: InternalOps.SESSION_INFO, - data: { update }, + data: {update}, internal: true }; - return this.send({ content, reply: true }) as Promise; + return this.send({content, reply: true}) as Promise; } /** @@ -106,10 +108,10 @@ export class ShardClientUtil extends EventEmitter { public restart(clusterId: number): Promise { const content: InternalOpsData = { op: InternalOps.RESTART, - data: { clusterId }, + data: {clusterId}, internal: true }; - return this.send({ content }) as Promise; + return this.send({content}) as Promise; } /** @@ -122,14 +124,14 @@ export class ShardClientUtil extends EventEmitter { data: {}, internal: true }; - return this.send({ content }) as Promise; + return this.send({content}) as Promise; } /** * Sends a message to primary process * @returns A promise that resolves to void or a repliable object */ - public async send(transportable: Transportable): Promise { + public async send(transportable: Transportable): Promise { let abortableData: AbortableData | undefined; if (!transportable.signal && (this.manager.ipcTimeout !== Infinity && transportable.reply)) { abortableData = MakeAbortableRequest(this.manager.ipcTimeout); diff --git a/src/concurrency/AsyncQueue.ts b/src/concurrency/AsyncQueue.ts index 1404d3c..8f22d8f 100644 --- a/src/concurrency/AsyncQueue.ts +++ b/src/concurrency/AsyncQueue.ts @@ -6,6 +6,7 @@ export declare interface AsyncQueueWaitOptions { export class AsyncQueue { private readonly queue: NodeJS.EventEmitter[]; + constructor() { this.queue = []; } @@ -14,10 +15,10 @@ export class AsyncQueue { return this.queue.length; } - public wait({ signal }: AsyncQueueWaitOptions): Promise { + public wait({signal}: AsyncQueueWaitOptions): Promise { + + const next = this.remaining ? once(this.queue[this.remaining - 1], 'resolve', {signal}) : Promise.resolve([]); - const next = this.remaining ? once(this.queue[this.remaining - 1], 'resolve', { signal }) : Promise.resolve([]); - const emitter = new EventEmitter() as NodeJS.EventEmitter; this.queue.push(emitter); diff --git a/src/concurrency/ConcurrencyClient.ts b/src/concurrency/ConcurrencyClient.ts index e2dbfca..49cb9b1 100644 --- a/src/concurrency/ConcurrencyClient.ts +++ b/src/concurrency/ConcurrencyClient.ts @@ -30,7 +30,7 @@ export class ConcurrencyClient { Fetch(url.toString(), { method: 'DELETE', - headers: { authorization: this.password } + headers: {authorization: this.password} }).catch(() => null); } @@ -39,7 +39,7 @@ export class ConcurrencyClient { const response = await Fetch(url.toString(), { method: 'POST', - headers: { authorization: this.password } + headers: {authorization: this.password} }); if (response.code === 202 || response.code === 204) { @@ -65,7 +65,7 @@ export class ConcurrencyClient { url.searchParams.append('shardId', '0'); const response = await Fetch(url.toString(), { method: 'POST', - headers: { authorization: this.password } + headers: {authorization: this.password} }); return Number(response.body); } diff --git a/src/concurrency/ConcurrencyManager.ts b/src/concurrency/ConcurrencyManager.ts index 9f12f53..10fb46a 100644 --- a/src/concurrency/ConcurrencyManager.ts +++ b/src/concurrency/ConcurrencyManager.ts @@ -9,6 +9,7 @@ export class ConcurrencyManager { private readonly queues: ExtendedMap; private readonly signals: Map; private readonly concurrency: number; + constructor(concurrency: number) { this.queues = new ExtendedMap(); this.signals = new Map(); @@ -31,9 +32,9 @@ export class ConcurrencyManager { resets: Number.POSITIVE_INFINITY }; }); - + try { - await state.queue.wait({ signal: abort.signal }); + await state.queue.wait({signal: abort.signal}); const difference = state.resets - Date.now(); diff --git a/src/concurrency/ConcurrencyServer.ts b/src/concurrency/ConcurrencyServer.ts index 2a5f642..31b9e0b 100644 --- a/src/concurrency/ConcurrencyServer.ts +++ b/src/concurrency/ConcurrencyServer.ts @@ -50,6 +50,15 @@ export class ConcurrencyServer { return this.server.address() as AddressInfo; } + /** + * Starts this server + */ + public start(): Promise { + return new Promise((resolve) => { + this.server.listen(0, '127.0.0.1', () => resolve(this.info)); + }) + } + /** * Handles the incoming requests * @param request @@ -117,13 +126,4 @@ export class ConcurrencyServer { response.statusMessage = 'Not Found'; return void response.end(); } - - /** - * Starts this server - */ - public start(): Promise { - return new Promise((resolve) => { - this.server.listen(0 , '127.0.0.1', () => resolve(this.info)); - }) - } } \ No newline at end of file diff --git a/src/concurrency/ExtendedMap.ts b/src/concurrency/ExtendedMap.ts index 4b58458..dded4a2 100644 --- a/src/concurrency/ExtendedMap.ts +++ b/src/concurrency/ExtendedMap.ts @@ -5,7 +5,7 @@ export class ExtendedMap extends Map { value = generator(key, this); this.set(key, value) - + return value; } } \ No newline at end of file diff --git a/src/ipc/BaseSocket.ts b/src/ipc/BaseSocket.ts index 84a1613..cc78991 100644 --- a/src/ipc/BaseSocket.ts +++ b/src/ipc/BaseSocket.ts @@ -1,8 +1,8 @@ -import type {Socket} from 'node:net'; +import type { Socket } from 'node:net'; import { InternalAbortSignal, InternalPromise, - IpcErrorData, IpcIdentify, + IpcErrorData, RawIpcMessage, RawIpcMessageType, SavePromiseOptions, @@ -11,10 +11,10 @@ import { import { randomUUID } from 'node:crypto'; export class Message { + public readonly content: any; private readonly socket: Socket; private readonly nonce: string; private readonly reply: boolean; - public readonly content: any; constructor(socket: Socket, data: RawIpcMessage) { this.socket = socket; @@ -70,7 +70,7 @@ export abstract class BaseSocket { * Raw send method without abort controller handling * @param transportable Data to send */ - public send(transportable: Transportable): Promise { + public send(transportable: Transportable): Promise { return new Promise((resolve, reject) => { if (this.socket.destroyed) { return resolve(undefined); @@ -85,10 +85,16 @@ export abstract class BaseSocket { }; this.socket.write(JSON.stringify(data)); if (!data.reply) return resolve(undefined); - this.waitForPromise({ nonce, resolve, reject, signal: transportable.signal }); + this.waitForPromise({nonce, resolve, reject, signal: transportable.signal}); }); } + protected abstract handleMessage(message: Message): Promise; + + protected abstract handleError(error: Error): void; + + protected abstract handleClose(): void; + /** * Rejects all the pending promises */ @@ -112,7 +118,7 @@ export abstract class BaseSocket { if (typeof data.internal !== 'boolean' && !data.internal) return; - switch(data.type) { + switch (data.type) { case RawIpcMessageType.MESSAGE: return void this .handleMessage(new Message(this.socket, data)) @@ -129,7 +135,7 @@ export abstract class BaseSocket { } private waitForPromise(options: SavePromiseOptions): void { - let controller: InternalAbortSignal|undefined; + let controller: InternalAbortSignal | undefined; if (options.signal) { @@ -145,7 +151,11 @@ export abstract class BaseSocket { controller.signal.addEventListener('abort', listener); } - this.promises.set(options.nonce, { resolve: options.resolve, reject: options.reject, controller } as InternalPromise); + this.promises.set(options.nonce, { + resolve: options.resolve, + reject: options.reject, + controller + } as InternalPromise); } private handlePromise(data: RawIpcMessage): void { @@ -169,8 +179,4 @@ export abstract class BaseSocket { promise.resolve(data.content); } - - protected abstract handleMessage(message: Message): Promise; - protected abstract handleError(error: Error): void; - protected abstract handleClose(): void; } \ No newline at end of file diff --git a/src/ipc/ClientSocket.ts b/src/ipc/ClientSocket.ts index d184e2e..666adf7 100644 --- a/src/ipc/ClientSocket.ts +++ b/src/ipc/ClientSocket.ts @@ -8,6 +8,7 @@ const internalOpsValues = Object.values(InternalOps); export class ClientSocket extends BaseSocket { public readonly shard: ShardClientUtil; private readonly serverId: string; + constructor(shard: ShardClientUtil, serverId: string) { super(new Socket()); this.shard = shard; @@ -15,8 +16,8 @@ export class ClientSocket extends BaseSocket { } public connect(): void { - this.socket.connect({ path: `./indomitable-${this.serverId}` }, () => { - this.identify({ clusterId: this.shard.clusterId, serverId: this.serverId }) + this.socket.connect({path: `./indomitable-${this.serverId}`}, () => { + this.identify({clusterId: this.shard.clusterId, serverId: this.serverId}) .catch(() => null); }); @@ -28,7 +29,7 @@ export class ClientSocket extends BaseSocket { internal: true, data }; - return this.send({ content, reply: true }) as Promise; + return this.send({content, reply: true}) as Promise; } protected handleClose(): void { diff --git a/src/ipc/IpcServer.ts b/src/ipc/IpcServer.ts index 62f94bd..bba0a10 100644 --- a/src/ipc/IpcServer.ts +++ b/src/ipc/IpcServer.ts @@ -2,15 +2,15 @@ import type { Indomitable } from '../Indomitable'; import type { ServerSocket } from './ServerSocket'; import { randomUUID } from 'node:crypto'; -import { createServer, Server, Socket } from 'node:net'; +import { createServer, Server } from 'node:net'; import { LibraryEvents } from '../Util'; export class IpcServer { + public readonly serverId: string; private readonly manager: Indomitable; private readonly server: Server; private readonly sockets: Map; - public readonly serverId: string; constructor(manager: Indomitable) { this.manager = manager; diff --git a/src/ipc/ServerSocket.ts b/src/ipc/ServerSocket.ts index 30866cd..b414a88 100644 --- a/src/ipc/ServerSocket.ts +++ b/src/ipc/ServerSocket.ts @@ -1,8 +1,8 @@ -import type {Socket} from 'node:net'; -import type {Indomitable} from '../Indomitable'; -import {IpcServer} from './IpcServer'; -import {BaseSocket, Message} from './BaseSocket'; -import {ClientEventData, ClientEvents, InternalOps, InternalOpsData, IpcIdentify, LibraryEvents,} from '../Util'; +import type { Socket } from 'node:net'; +import type { Indomitable } from '../Indomitable'; +import { IpcServer } from './IpcServer'; +import { BaseSocket, Message } from './BaseSocket'; +import { ClientEventData, ClientEvents, InternalOps, InternalOpsData, LibraryEvents, } from '../Util'; const internalOpsValues = Object.values(InternalOps); const clientEventsValues = Object.values(ClientEvents); @@ -34,7 +34,7 @@ export class ServerSocket extends BaseSocket { if (internalOpsValues.includes(message.content.op)) { - switch(content.op) { + switch (content.op) { case InternalOps.IDENTIFY: { if (content.data.serverId !== this.server.serverId) { this.socket.destroy(); @@ -74,7 +74,7 @@ export class ServerSocket extends BaseSocket { } else if (clientEventsValues.includes(message.content.op)) { - switch(content.op) { + switch (content.op) { case ClientEvents.READY: this.manager.emit(LibraryEvents.CLIENT_READY, content.data); break; diff --git a/src/manager/ClusterManager.ts b/src/manager/ClusterManager.ts index 37dcce4..06d3b4f 100644 --- a/src/manager/ClusterManager.ts +++ b/src/manager/ClusterManager.ts @@ -99,7 +99,7 @@ export class ClusterManager { /** * Remove all listeners on attached worker process and free from memory */ - private cleanup(code: number|null, signal: string|null) { + private cleanup(code: number | null, signal: string | null) { this.worker?.removeAllListeners(); this.worker = undefined; this.ready = false;