Skip to content

Commit

Permalink
refactor websocket code to reduce circular dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
kadamidev committed Jan 9, 2025
1 parent 196c295 commit b5610d1
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 122 deletions.
7 changes: 4 additions & 3 deletions desci-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { ensureUserIfPresent } from './middleware/ensureUserIfPresent.js';
import { errorHandler } from './middleware/errorHandler.js';
import { extractAuthToken, extractUserFromToken } from './middleware/permissions.js';
import routes from './routes/index.js';
import { initializeWebSocketServer } from './websocketServer.js';
import { initializeWebSockets, getIO } from './services/websocketService.js';
import { SubmissionQueueJob } from './workers/doiSubmissionQueue.js';
import { runWorkerUntilStopped } from './workers/publish.js';

Expand Down Expand Up @@ -179,8 +179,8 @@ class AppServer {

while (retries < maxRetries) {
try {
this._io = await initializeWebSocketServer(this.server);
this.app.set('io', this._io);
await initializeWebSockets(this.server);
this.app.set('io', getIO());
logger.info('WebSocket server initialized successfully');
return;
} catch (error) {
Expand Down Expand Up @@ -295,4 +295,5 @@ function getRemoteAddress(req) {
return req.socket.remoteAddress;
}
}

export const server = new AppServer();
2 changes: 1 addition & 1 deletion desci-server/src/services/NotificationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import { CreateNotificationSchema } from '../controllers/notifications/create.js
import { GetNotificationsQuerySchema, PaginatedResponse } from '../controllers/notifications/index.js';
import { logger as parentLogger } from '../logger.js';
import { server } from '../server.js';
import { emitWebsocketEvent, WebSocketEventType } from '../utils/websocketHelpers.js';
import { ensureUuidEndsWithDot } from '../utils.js';

import { attestationService } from './Attestation.js';
import { getDpidFromNode, getDpidFromNodeUuid } from './node.js';
import { PublishServices } from './PublishServices.js';
import { emitWebsocketEvent, WebSocketEventType } from './websocketService.js';

type GetNotificationsQuery = z.infer<typeof GetNotificationsQuerySchema>;
export type CreateNotificationData = z.infer<typeof CreateNotificationSchema>;
Expand Down
1 change: 0 additions & 1 deletion desci-server/src/services/PublishServices.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { DataType, EmailType, Node, NodeContribution, NodeVersion, Prisma, PublishStatus, User } from '@prisma/client';
import sgMail from '@sendgrid/mail';
import { update } from 'lodash';

import { prisma } from '../client.js';
import { getNodeVersion } from '../controllers/communities/util.js';
Expand Down
136 changes: 136 additions & 0 deletions desci-server/src/services/websocketService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// services/websocketService.ts
import { Server as HttpServer } from 'http';

import { createAdapter } from '@socket.io/redis-adapter';
import { ExtendedError, Server as SocketIOServer, Socket } from 'socket.io';

import { logger as parentLogger } from '../logger.js';
import { AuthenticatedSocket, socketsEnsureUser } from '../middleware/permissions.js';
import { redisClient } from '../redisClient.js';

const logger = parentLogger.child({ module: 'websocketService' });

export enum WebSocketEventType {
NOTIFICATION = 'notification',
}

export type WebsocketEventPayload = {
type: WebSocketEventType;
data?: any;
};

let io: SocketIOServer | null = null;

const setupRedisAdapter = async (socketServer: SocketIOServer): Promise<void> => {
const useRedis = !!process.env.REDIS_HOST;

if (!useRedis) {
logger.info('Redis host not configured. Skipping Redis adapter initialization.');
return;
}

try {
logger.info('Redis host configured. Initializing with Redis adapter.');

if (!redisClient.isOpen) {
logger.info('Waiting for Redis client to connect...');
await new Promise<void>((resolve) => {
redisClient.on('connect', () => {
logger.info('Redis client connected');
resolve();
});
});
}

const pubClient = redisClient;
const subClient = pubClient.duplicate();
await subClient.connect();

socketServer.adapter(createAdapter(pubClient, subClient));

logger.info(
{ redisHost: process.env.REDIS_HOST, redisPort: process.env.REDIS_PORT },
'Redis adapter connected for WebSocket',
);
} catch (error) {
logger.error({ error }, 'Failed to connect to Redis for WebSocket adapter. Continuing without Redis.');
}
};

const setupEventHandlers = (socketServer: SocketIOServer) => {
socketServer.on('error', () => {
logger.info('websockets error');
});

socketServer.on('connection', (socket: AuthenticatedSocket) => {
logger.info('New socket connection');
const { userId } = socket.data;
const clientIp = socket.handshake.headers['x-real-ip'] || socket.handshake.address;
logger.info({ userId, clientIp }, 'User connected');

socket.on('authenticate', (userId: string) => {
logger.info({ socketId: socket.id, userId }, `User ${userId} authenticated`);
socket.join(`user-${userId}`);
});

socket.on('disconnect', (reason) => {
logger.info({ userId, reason }, 'User disconnected');
});

socket.on('error', (error) => {
logger.error({ userId, error }, 'Socket error occurred');
});
});

socketServer.on('connect_error', (error) => {
logger.error({ error }, 'Connection error occurred');
});
};

export const initializeWebSockets = async (httpServer: HttpServer): Promise<void> => {
if (io) {
logger.warn('WebSocket server already initialized');
return;
}

try {
io = new SocketIOServer(httpServer, {
cors: {
origin: true,
credentials: true,
},
});

// Set up authentication middleware
io.use((socket: Socket, next: (err?: ExtendedError) => void) => {
socketsEnsureUser(socket, next);
});

// Set up Redis adapter if configured
await setupRedisAdapter(io);

// Set up event handlers
setupEventHandlers(io);

logger.info('WebSocket server initialized successfully');
} catch (error) {
logger.error({ error }, 'Failed to initialize WebSocket server');
throw error;
}
};

export const emitWebsocketEvent = (userId: number, payload: WebsocketEventPayload): void => {
if (!io) {
logger.error('WebSocket server not initialized');
return;
}

try {
io.to(`user-${userId}`).emit(payload.type, payload.data);
logger.info({ userId, payload }, 'WebSocket event emitted successfully');
} catch (error) {
logger.error({ userId, payload, error }, 'Failed to emit WebSocket event');
}
};

export const getIO = (): SocketIOServer | null => io;
37 changes: 0 additions & 37 deletions desci-server/src/utils/websocketHelpers.ts

This file was deleted.

80 changes: 0 additions & 80 deletions desci-server/src/websocketServer.ts

This file was deleted.

0 comments on commit b5610d1

Please sign in to comment.