Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gracefullier close & add dedicated workers connection #17

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/controllers/http/queue-http-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import { HttpHandlerOpts } from "../../interfaces/http-handler-opts";
import { validateJob, validatePagination, validateQueueName } from "../../validators";
import { getQueue } from "../../utils/queue-factory";

type QueueHttpControllerOpts = Omit<HttpHandlerOpts, "workersRedisClient">;

export const QueueHttpController = {
/**
* addJobs
* @param opts
* @returns
*/
addJobs: async (opts: HttpHandlerOpts) => {
addJobs: async (opts: QueueHttpControllerOpts) => {
const queueName = opts.params.queueName;
try {
validateQueueName(queueName);
Expand Down Expand Up @@ -49,7 +51,7 @@ export const QueueHttpController = {
Uses "start" and "length" as query parameters.
and optional "statuses" query parameter to filter by status.
*/
getJobs: async (opts: HttpHandlerOpts) => {
getJobs: async (opts: QueueHttpControllerOpts) => {
const queueName = opts.params.queueName;
let start;
let length;
Expand Down Expand Up @@ -91,7 +93,7 @@ export const QueueHttpController = {
* @param opts
* @returns
*/
getJob: async (opts: HttpHandlerOpts) => {
getJob: async (opts: QueueHttpControllerOpts) => {
const queueName = opts.params.queueName;
try {
validateQueueName(queueName);
Expand All @@ -110,5 +112,4 @@ export const QueueHttpController = {

return new Response(JSON.stringify(job), { status: 200 });
}

}
30 changes: 24 additions & 6 deletions src/controllers/http/worker-http-controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const fakeAddValidReq = {
describe('WorkerHttpController.init', () => {

it('should initialize workers from Redis metadata', async () => {
await expect(WorkerHttpController.init(new Redis({

await expect(WorkerHttpController.init(new Redis(), new Redis({
maxRetriesPerRequest: null,
}))).resolves.toBeUndefined;
});
Expand All @@ -43,7 +44,12 @@ describe('WorkerHttpController.addWorker', () => {

it('should add a worker with valid metadata', async () => {

const response = await WorkerHttpController.addWorker({ req: fakeAddValidReq, redisClient, params: {} });
const response = await WorkerHttpController.addWorker({
req: fakeAddValidReq,
redisClient,
workersRedisClient: redisClient,
params: {}
});
expect(response).toBeDefined();
expect(await response.text()).toBe("OK");
expect(response!.status).toBe(200); // Assuming 200 is the success status code
Expand All @@ -54,7 +60,12 @@ describe('WorkerHttpController.addWorker', () => {
json: () => Promise.resolve({}) // Invalid metadata
} as Request;

const response = await WorkerHttpController.addWorker({ req: fakeReq, redisClient, params: {} });
const response = await WorkerHttpController.addWorker({
req: fakeReq,
redisClient,
workersRedisClient: redisClient,
params: {}
});
expect(response).toBeDefined();
expect(response!.status).toBe(400);
});
Expand All @@ -72,10 +83,16 @@ describe('WorkerHttpController.removeWorker', () => {
const opts = {
req: {} as Request,
params: { queueName: 'validQueue' },
redisClient
redisClient,
workersRedisClient: redisClient,
};

const responseAdd = await WorkerHttpController.addWorker({ req: fakeAddValidReq, redisClient, params: {} });
const responseAdd = await WorkerHttpController.addWorker({
req: fakeAddValidReq,
redisClient,
workersRedisClient: redisClient,
params: {},
});
expect(responseAdd).toBeDefined();

const responseRemove = await WorkerHttpController.removeWorker(opts);
Expand All @@ -87,7 +104,8 @@ describe('WorkerHttpController.removeWorker', () => {
const opts = {
req: {} as Request,
params: { queueName: 'non-existing-queue' },
redisClient
redisClient,
workersRedisClient: redisClient,
};

const responseRemove = await WorkerHttpController.removeWorker(opts);
Expand Down
23 changes: 12 additions & 11 deletions src/controllers/http/worker-http-controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import { Job, Worker } from "bullmq";
import { Redis, Cluster } from "ioredis";
import { debug } from "../../utils/log";
import { debug, info } from "../../utils/log";

import { HttpHandlerOpts, WorkerMetadata } from "../../interfaces";
import { validateWorkerMetadata } from "../../validators";
Expand All @@ -13,12 +13,13 @@ const workers: { [queueName: string]: Worker } = {};

const workerMetadataKey = config.workerMetadataKey;

// Gracefully close all workers
process.on('exit', async () => {
for (const queueName in workers) {
await workers[queueName].close();
}
});
export const gracefulShutdownWorkers = async () => {
info(`Closing workers...`);

const closingWorkers = Object.keys(workers).map(async (queueName) => workers[queueName].close());
await Promise.all(closingWorkers);
info('Workers closed');
}

const workerFromMetadata = (queueName: string, workerMetadata: WorkerMetadata, connection: Redis | Cluster): Worker => {
const { endpoint: workerEndpoint, opts: workerOptions } = workerMetadata;
Expand Down Expand Up @@ -72,7 +73,7 @@ const workerFromMetadata = (queueName: string, workerMetadata: WorkerMetadata, c
};

export const WorkerHttpController = {
init: (redisClient: Redis | Cluster) => {
init: (redisClient: Redis | Cluster, workersRedisClient: Redis | Cluster) => {
// Load workers from Redis and start them
debugEnabled && debug('Loading workers from Redis...');
const stream = redisClient.hscanStream(workerMetadataKey, { count: 10 });
Expand All @@ -82,7 +83,7 @@ export const WorkerHttpController = {
const value = result[i + 1];

const workerMetadata = JSON.parse(value) as WorkerMetadata;
workers[queueName] = workerFromMetadata(queueName, workerMetadata, redisClient);
workers[queueName] = workerFromMetadata(queueName, workerMetadata, workersRedisClient);
}
});

Expand Down Expand Up @@ -122,11 +123,11 @@ export const WorkerHttpController = {
}

const { queue: queueName } = workerMetadata;
const { redisClient } = opts;
const { redisClient, workersRedisClient } = opts;

// Replace worker if it already exists
const existingWorker = workers[queueName];
const worker = workerFromMetadata(queueName, workerMetadata, redisClient);
const worker = workerFromMetadata(queueName, workerMetadata, workersRedisClient);
workers[queueName] = worker;

// Upsert worker metadata in Redis for the worker to be able to reconnect after a restart
Expand Down
24 changes: 16 additions & 8 deletions src/controllers/http/worker-job-http-controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { WorkerJobHttpController } from './worker-job-http-controller';
import { config } from '../../config';

let redisClient: Redis;
let workersRedisClient: Redis;

const queuePrefix = config.defaultQueuePrefix;

beforeAll(async () => {
redisClient = new Redis({
redisClient = new Redis();
workersRedisClient = new Redis({
maxRetriesPerRequest: null
});
});
Expand All @@ -29,14 +31,16 @@ describe('WorkerJobHttpController.updateProgress', () => {
req: {
json: () => Promise.resolve({ progress: 50 })
} as Request,
redisClient: new Redis({
maxRetriesPerRequest: null
})
redisClient,
workersRedisClient
};

const response = await WorkerJobHttpController.updateProgress(opts);
expect(response.status).toBe(500);
expect(await response.text()).toBe('Missing key for job 1. updateProgress');

await opts.redisClient.quit();
await opts.workersRedisClient.quit();
});

it('updates job progress and returns a 200 response', async () => {
Expand All @@ -49,7 +53,8 @@ describe('WorkerJobHttpController.updateProgress', () => {
req: {
json: () => Promise.resolve({ progress: 50 })
} as Request,
redisClient
redisClient,
workersRedisClient
};

await redisClient.hset(`${queuePrefix}:valid:1`, 'progress', 0);
Expand Down Expand Up @@ -90,7 +95,8 @@ describe('WorkerJobHttpController.addLog', () => {
req: {
json: () => Promise.resolve(logMessage)
} as Request,
redisClient
redisClient,
workersRedisClient
};

await redisClient.hset(`${queuePrefix}:valid:${jobId}`, 'progress', 0);
Expand All @@ -115,7 +121,8 @@ describe('WorkerJobHttpController.getLogs', () => {
},
searchParams: new URLSearchParams('start=invalid&length=invalid'),
req: {} as Request,
redisClient
redisClient,
workersRedisClient
};

const response = await WorkerJobHttpController.getLogs(opts);
Expand All @@ -137,7 +144,8 @@ describe('WorkerJobHttpController.getLogs', () => {
},
searchParams: new URLSearchParams('start=0&length=2'),
req: {} as Request,
redisClient
redisClient,
workersRedisClient
};

const response = await WorkerJobHttpController.getLogs(opts);
Expand Down
2 changes: 1 addition & 1 deletion src/e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe("e2e", () => {
});

it("process a job updating progress and adding logs", async () => {
const proxy = await startProxy(0, redisClient, { skipInitWorkers: true });
const proxy = await startProxy(0, redisClient, redisClient, { skipInitWorkers: true });
const proxyPort = proxy.port;

let server: Server;
Expand Down
106 changes: 57 additions & 49 deletions src/fetch-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,59 +33,67 @@ routeMatcher.addWebSocketRoute<{ queueName: string }>(
"/ws/queues/:queueName/events", QueueEventsController
);

export const fetchHandler = (connection: Redis | Cluster, authTokens: string[] = []) => async (req: Request, server: Server) => {
const url = new URL(req.url);
const { searchParams } = url;
const { method } = req;
export const fetchHandler = (
connection: Redis | Cluster,
workersConnection: Redis | Cluster) => async (req: Request, server: Server) => {
const url = new URL(req.url);
const { searchParams } = url;
const { method } = req;

if (url.pathname === "/" && method === "GET") {
return new Response(
`${asciiArt}\nBullMQ Proxy (c) ${new Date().getFullYear()} Taskforce.sh Inc. v${pkg.version
}`,
{ status: 200 }
);
}
if (url.pathname === "/" && method === "GET") {
return new Response(
`${asciiArt}\nBullMQ Proxy (c) ${new Date().getFullYear()} Taskforce.sh Inc. v${pkg.version
}`,
{ status: 200 }
);
}

// Choose controller based on path
const route = routeMatcher.match(url.pathname, method);
if (!route) {
warn(
`Not found request to path ${url.pathname.toString()} from ${req.headers.get("x-forwarded-for") || req.headers.get("host")
}`
);
return new Response("Not found", { status: 404 });
}
// Choose controller based on path
const route = routeMatcher.match(url.pathname, method);
if (!route) {
warn(
`Not found request to path ${url.pathname.toString()} from ${req.headers.get("x-forwarded-for") || req.headers.get("host")
}`
);
return new Response("Not found", { status: 404 });
}

if (route.auth && !await route.auth(req, url, route.params, connection)) {
return new Response("Unauthorized", { status: 401 });
}
if (route.auth && !await route.auth(req, url, route.params, connection)) {
return new Response("Unauthorized", { status: 401 });
}

const queueName = route.params?.queueName;
let controller;
let events;
const concurrency = parseInt(route.params.concurrency, 10) || 1;
const queueName = route.params?.queueName;
let controller;
let events;
const concurrency = parseInt(route.params.concurrency, 10) || 1;

if (route.websocketHandler) {
controller = route.websocketHandler;
if (
server.upgrade(req, {
data: {
route,
controller,
queueName,
concurrency,
connection,
events,
searchParams
},
})
) {
return; // Do not return a Response to signal that the upgrade is successful
if (route.websocketHandler) {
controller = route.websocketHandler;
if (
server.upgrade(req, {
data: {
route,
controller,
queueName,
concurrency,
connection,
events,
searchParams
},
})
) {
return; // Do not return a Response to signal that the upgrade is successful
}
return new Response("Upgrade failed :(", { status: 500 });
} else if (route.httpHandler) {
return route.httpHandler({
req,
params: route.params,
searchParams,
redisClient: connection,
workersRedisClient: workersConnection
});
} else {
return new Response("Not found", { status: 404 });
}
return new Response("Upgrade failed :(", { status: 500 });
} else if (route.httpHandler) {
return route.httpHandler({ req, params: route.params, searchParams, redisClient: connection });
} else {
return new Response("Not found", { status: 404 });
}
}
Loading