Skip to content

Commit

Permalink
bullmq added
Browse files Browse the repository at this point in the history
  • Loading branch information
arunkumar201 committed Feb 6, 2024
1 parent 4a442ec commit ec68a35
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 42 deletions.
16 changes: 15 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
{
"editor.suggest.snippetsPreventQuickSuggestions": false,
"aiXcoder.showTrayIcon": true
"aiXcoder.showTrayIcon": true,
"workbench.colorCustomizations": {
"commandCenter.border": "#e7e7e799",
"sash.hoverBorder": "#1f6fd0",
"statusBar.background": "#1857a4",
"statusBar.foreground": "#e7e7e7",
"statusBarItem.hoverBackground": "#1f6fd0",
"statusBarItem.remoteBackground": "#1857a4",
"statusBarItem.remoteForeground": "#e7e7e7",
"titleBar.activeBackground": "#1857a4",
"titleBar.activeForeground": "#e7e7e7",
"titleBar.inactiveBackground": "#1857a499",
"titleBar.inactiveForeground": "#e7e7e799"
},
"peacock.color": "#1857a4"
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
},
"dependencies": {
"body-parser": "^1.20.2",
"bullmq": "^5.1.9",
"cors": "^2.8.5",
"crypto": "^1.0.1",
"dotenv": "^16.3.1",
Expand Down
121 changes: 121 additions & 0 deletions src/load-balancers/WeightedJobLoadBalancer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { ConnectionOptions, Queue } from "bullmq";

const MicroServices = ["M-1", "M-2", "M-3", "M-4"];

export const REDIS_CONNECTION_OPTIONS: ConnectionOptions = {
port: parseInt(process.env.REDIS_PORT || "6379"),
host: process.env.REDIS_HOST,
username: "default",
password: process.env.REDIS_PASSWORD,
retryStrategy: () => {
console.timeLog("reconnectStrategy", "reconnectStrategy");
return 500;
},
};

//Load balancer which distribute the request based on least number of jobs in queue or where a server is
//more free
class AutoScalingLoadBalancer {
private queues: Record<string, Queue> = {};

/**
* Constructor for initializing queues for each service.
*/
constructor() {
for (const service of MicroServices) {
const queue = new Queue(`MQueue-${service}`, {
connection: REDIS_CONNECTION_OPTIONS,
});

this.queues[service] = queue;
}
}

/**
* A function to distribute a job.
*
* @param {any} jobData - the data of the job to be distributed
* @return {Promise<void>} a Promise that resolves to void
*/
public async distributeJob(jobData: any): Promise<void> {
const selectedQueue = await this.selectQueueDynamically();

console.debug(
"🚀 ~ AutoTradingLoadBalancer ~ distributeJob ~ selectedQueue:",
selectedQueue?.name
);

if (selectedQueue) {
await selectedQueue.add("PlaceOrderJob", jobData);
console.log(`Job distributed with : ${jobData}`);
} else {
console.error("No available queues to distribute the job.");
}
}

/**
* Asynchronously selects a optimal queue dynamically.
*
* @return {Promise<Queue | null>} A promise that resolves with the selected queue or null if no queue is available.
*/
private async selectQueueDynamically(): Promise<Queue | null> {
const availableQueues = await this.getAvailableQueues();

if (availableQueues.length === 0) {
return null;
}
return this.selectQueueBasedLeastJobs(availableQueues);
}

private async getAvailableQueues(): Promise<Queue[]> {
const queueNames = Object.keys(this.queues);
const queues = await Promise.all(
queueNames.map(async (queueName) => {
const queue = this.queues[queueName];
return queue;
})
);

return queues.filter((queue) => queue !== null) as Queue[];
}

/**
* Selects a queue based on the least number of waiting jobs.
*
* @param {Queue[]} queues - an array of Queue objects
* @return {Promise<Queue | null>} a Promise resolving to a Queue object or null
*/
private async selectQueueBasedLeastJobs(
queues: Queue[]
): Promise<Queue | null> {
if (queues.length === 0) {
return null;
}

const queueDetails = await Promise.all(
queues.map(async (queue) => ({
queue,
waitingJobCount: await queue.getWaitingCount(),
}))
);

// Sort the queues based on the number of waiting jobs
queueDetails.sort((a, b) => a.waitingJobCount - b.waitingJobCount);
// Find the smallest waiting job count
const smallestWaitingJobCount = queueDetails[0].waitingJobCount;

// Filter potential queues with the smallest waiting job count
const potentialQueues = queueDetails.filter(
(q) => q.waitingJobCount === smallestWaitingJobCount
);

// If there are multiple potential queues, choose randomly
const selectedQueue =
potentialQueues[Math.floor(Math.random() * potentialQueues.length)].queue;

return selectedQueue;
}
}

const autoScalingLoadBalancer = new AutoScalingLoadBalancer();
export { autoScalingLoadBalancer };
Loading

0 comments on commit ec68a35

Please sign in to comment.