Skip to content

Commit

Permalink
chore: fix compiler errors
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 15, 2025
1 parent 97156be commit 70c708d
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 50 deletions.
Binary file modified bun.lockb
Binary file not shown.
17 changes: 11 additions & 6 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/**
* LRU Cache - Least Recently Used Cache
*
*
* Simple cache implementation using Map.
*
*
*/
export type RemoveCallbackType<T> = (key: string, value: T) => Promise<void>;

Expand All @@ -11,7 +11,8 @@ export class LRUCache<T> {

constructor(
private capacity: number,
private removeCallback: RemoveCallbackType<T> = async () => { }) {
private removeCallback: RemoveCallbackType<T> = async () => {}
) {
this.capacity = capacity;
this.cache = new Map();
}
Expand All @@ -34,7 +35,9 @@ export class LRUCache<T> {
}
if (this.cache.size === this.capacity) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
if (firstKey) {
this.cache.delete(firstKey);
}
}
this.cache.set(key, value);
}
Expand All @@ -48,7 +51,9 @@ export class LRUCache<T> {
}

async clear() {
await Promise.all(Array.from(this.cache.keys()).map((key) => this.remove(key)));
await Promise.all(
Array.from(this.cache.keys()).map((key) => this.remove(key))
);
}

size() {
Expand All @@ -62,4 +67,4 @@ export class LRUCache<T> {
values() {
return this.cache.values();
}
}
}
9 changes: 3 additions & 6 deletions src/controllers/utils.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import { ServerWebSocket } from "bun";
import { BufferSource } from "../interfaces";

const RETRY_TIMEOUT = 1000;

export function respond(ws: ServerWebSocket<any>, id: number, data: any = {}) {
const response = JSON.stringify({
id,
data,
});
const response = JSON.stringify({ id, data });

send(ws, response);
}

export function send(ws: ServerWebSocket<any>, data: string | Buffer) {
export function send(ws: ServerWebSocket<any>, data: string | BufferSource) {
if (ws.readyState == 2 || ws.readyState == 3) {
return;
}
Expand All @@ -20,4 +18,3 @@ export function send(ws: ServerWebSocket<any>, data: string | Buffer) {
setTimeout(() => send(ws, data), RETRY_TIMEOUT);
}
}

25 changes: 9 additions & 16 deletions src/controllers/websocket/message-broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
*
*/

import { EventEmitter } from 'events';
import { EventEmitter } from "events";
import { BufferSource } from "../../interfaces";

export interface Message<T> {
id: number;
Expand All @@ -25,10 +26,8 @@ export class MessageBroker<T> extends EventEmitter {
} = {};

constructor(
private send: (msg: string | Buffer) => Promise<void>,
private opts = {
messageTimeout: 15000,
},
private send: (msg: string | BufferSource) => Promise<void>,
private opts = { messageTimeout: 15000 }
) {
super();
}
Expand All @@ -39,10 +38,7 @@ export class MessageBroker<T> extends EventEmitter {
}

sendData<K>(data: T, { noack }: { noack?: boolean } = {}): Promise<K> {
const msg = {
id: this.messageId++,
data,
};
const msg = { id: this.messageId++, data };

// TODO: Support binary data too.
this.send(JSON.stringify(msg));
Expand All @@ -52,20 +48,17 @@ export class MessageBroker<T> extends EventEmitter {
}

return new Promise((resolve, reject) => {
this.pendingMessages[msg.id] = {
resolve,
reject,
};
this.pendingMessages[msg.id] = { resolve, reject };

let responseHandler: (data: K) => void;

const timeout = setTimeout(() => {
delete this.pendingMessages[msg.id];
this.removeListener(`${msg.id}`, responseHandler);
reject(new Error('Timeout'));
reject(new Error("Timeout"));
}, this.opts.messageTimeout);

this.messageTimeouts[msg.id] = timeout; // Track the timeout
this.messageTimeouts[msg.id] = timeout; // Track the timeout

responseHandler = (data: K) => {
delete this.pendingMessages[msg.id];
Expand All @@ -79,7 +72,7 @@ export class MessageBroker<T> extends EventEmitter {

close() {
for (const key in this.pendingMessages) {
this.pendingMessages[key].reject(new Error('Connection closed'));
this.pendingMessages[key].reject(new Error("Connection closed"));
if (this.messageTimeouts[key]) {
clearTimeout(this.messageTimeouts[key]);
delete this.messageTimeouts[key];
Expand Down
20 changes: 7 additions & 13 deletions src/controllers/websocket/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ServerWebSocket } from "bun";
import { MessageBroker } from "./message-broker";
import { QueueEvents, ConnectionOptions, QueueEventsListener } from "bullmq";

import { WebSocketBehaviour } from "../../interfaces/websocket-behaviour";
import { WebSocketBehaviour, BufferSource } from "../../interfaces";
import { send } from "../utils";
import { info } from "../../utils/log";

Expand Down Expand Up @@ -30,21 +30,15 @@ export const openQueueEvents = async (
}));

const messageBroker = (ws.data.mb = new MessageBroker<object>(
async (msg: string | Buffer) => send(ws, msg)
async (msg: string | BufferSource) => send(ws, msg)
));

const events = ws.data.events || [];
const cleanUps = [];

events.forEach((event) => {
const eventHandler = async (...args: any[]) => {
await messageBroker.sendData(
{
event,
args,
},
{ noack: true }
);
await messageBroker.sendData({ event, args }, { noack: true });
};
info(`Subscribing to event: ${event}, for queue: ${queueName}`);
queueEvents.on(event, eventHandler);
Expand All @@ -58,17 +52,17 @@ export const QueueEventsController: WebSocketBehaviour = {
message: async (
_ws: ServerWebSocket<QueueEventsWebSocketData>,
_message: string | Buffer
) => { },
) => {},

drain: (_ws) => {
// console.log("WebSocket backpressure: " + ws.getBufferedAmount());
},

close: async (ws, code, message) => {
info(
`WebSocket closed for queue events (${ws.data.queueName}) with code ${code}${message ? `and message ${Buffer.from(
message
).toString()}` : ""}`
`WebSocket closed for queue events (${ws.data.queueName}) with code ${code}${
message ? `and message ${Buffer.from(message).toString()}` : ""
}`
);

const { queueEvents } = ws.data;
Expand Down
20 changes: 12 additions & 8 deletions src/controllers/websocket/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Queue, ConnectionOptions } from "bullmq";
import { Value } from "@sinclair/typebox/value";
import { ServerWebSocket } from "bun";

import { WebSocketBehaviour } from "../../interfaces/websocket-behaviour";
import { WebSocketBehaviour, BufferSource } from "../../interfaces";
import { respond, send } from "../utils";
import { info } from "../../utils/log";
import { QueueSchema, QueueSchemaType } from "../commands";
Expand All @@ -20,7 +20,9 @@ export const openQueue = async (ws: ServerWebSocket<QueueWebSocketData>) => {
info(`Queue connected for queue ${queueName}`);

ws.data.queue = new Queue(queueName, { connection });
ws.data.mb = new MessageBroker<object>(async (msg: string | Buffer) => send(ws, msg));
ws.data.mb = new MessageBroker<object>(async (msg: string | BufferSource) =>
send(ws, msg)
);
};

export const QueueController: WebSocketBehaviour = {
Expand All @@ -40,14 +42,16 @@ export const QueueController: WebSocketBehaviour = {
if (firstError) {
// The errors are difficult to read, so we'll just send a generic one
// until we can do something better.
respond(ws, parsedMessage.id, { err: { message: `Invalid message ${message}`, stack: "" } })
respond(ws, parsedMessage.id, {
err: { message: `Invalid message ${message}`, stack: "" },
});
return;
}

const queue = ws.data.queue;
const { fn, args }: { fn: string, args: any[] } = parsedMessage.data;
const { fn, args }: { fn: string; args: any[] } = parsedMessage.data;
try {
const queueMethod = (<any>queue)[fn] as Function
const queueMethod = (<any>queue)[fn] as Function;
const result = await queueMethod.apply(queue, args);
respond(ws, parsedMessage.id, { ok: result });
} catch (err) {
Expand All @@ -64,9 +68,9 @@ export const QueueController: WebSocketBehaviour = {

close: async (ws, code, message) => {
info(
`WebSocket closed for queue (${ws.data.queueName}) with code ${code}${message ? `and message ${Buffer.from(
message
).toString()}` : ""}`
`WebSocket closed for queue (${ws.data.queueName}) with code ${code}${
message ? `and message ${Buffer.from(message).toString()}` : ""
}`
);

const queue = ws.data.queue;
Expand Down
3 changes: 2 additions & 1 deletion src/controllers/websocket/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Worker, ConnectionOptions, Job } from "bullmq";

import { send } from "../utils";
import { info } from "../../utils/log";
import { BufferSource } from "../../interfaces";

export interface WorkerWebSocketData {
connection: ConnectionOptions;
Expand All @@ -20,7 +21,7 @@ export const openWorker = async (ws: ServerWebSocket<WorkerWebSocketData>) => {
`Worker connected for queue ${queueName} with concurrency ${concurrency}`
);

const mb = (ws.data.mb = new MessageBroker<object>(async (msg: string | Buffer) =>
const mb = (ws.data.mb = new MessageBroker<object>(async (msg: string | BufferSource) =>
send(ws, msg)
));

Expand Down
1 change: 1 addition & 0 deletions src/interfaces/buffer-source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type BufferSource = NodeJS.TypedArray | DataView | ArrayBufferLike;
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './http-handler-opts';
export * from './websocket-behaviour';
export * from './worker-endpoint';
export * from './worker-metadata';
export * from './buffer-source';

0 comments on commit 70c708d

Please sign in to comment.