Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a queue registry #3030

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-3.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-9.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")),
Expand Down Expand Up @@ -424,15 +424,25 @@ def pause(self, pause: bool = True):

async def obliterate(self, count: int, force: bool = False):
"""
Remove a queue completely
Remove a queue completely.
This command calls a Lua script that obliterates the queue
and removes all associated keys, including from bullmq:registry.
"""
keys = self.getKeys(['meta', ''])
result = await self.commands["obliterate"](keys, args=[count, force or ""])
if (result < 0):
if (result == -1):
raise Exception("Cannot obliterate non-paused queue")
if (result == -2):
raise Exception("Cannot obliterate queue with active jobs")
keys = ['bullmq:registry', *self.getKeys(['meta', ''])]

# Convert force=True to "1", force=False to "", matching the Lua script logic
force_arg = '1' if force else ''

# Pass "args" as expected by your commands["obliterate"] method
result = await self.commands["obliterate"](keys, args=[count, force_arg])

# If the script returns a negative code, raise an exception
if result < 0:
if result == -1:
raise Exception("Cannot obliterate a non-paused queue")
elif result == -2:
raise Exception("Cannot obliterate a queue with active jobs")

return result

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
Expand Down
21 changes: 20 additions & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';
import { JobScheduler } from './job-scheduler';
import { version } from '../version';
import { BullMQRegistryKey } from '../consts/bullmq-registry-key';

export interface ObliterateOpts {
/**
Expand Down Expand Up @@ -179,7 +180,10 @@ export class Queue<
this.waitUntilReady()
.then(client => {
if (!this.closing && !opts?.skipMetasUpdate) {
return client.hmset(this.keys.meta, this.metaValues);
const multi = client.multi();
multi.hmset(this.keys.meta, this.metaValues);
multi.zadd(BullMQRegistryKey, Date.now(), this.qualifiedName);
return multi.exec();
}
})
.catch(err => {
Expand All @@ -188,6 +192,21 @@ export class Queue<
});
}

/**
* Returns the queues that are available in the registry.
* @param start - zero based index from where to start returning jobs.
* @param end - zero based index where to stop returning jobs.
*/
static getRegistry(
client: {
zrange: (key: string, start: number, end: number) => Promise<string[]>;
},
start = 0,
end = -1,
): Promise<string[]> {
return client.zrange(BullMQRegistryKey, start, end);
}

emit<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(
event: U,
...args: Parameters<
Expand Down
2 changes: 2 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
} from '../utils';
import { ChainableCommander } from 'ioredis';
import { version as packageVersion } from '../version';
import { BullMQRegistryKey } from '../consts/bullmq-registry-key';
export type JobData = [JobJsonRaw | number, string?];

export class Scripts {
Expand Down Expand Up @@ -1464,6 +1465,7 @@ export class Scripts {
const client = await this.queue.client;

const keys: (string | number)[] = [
BullMQRegistryKey,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have this class for handling queue keys, https://github.com/taskforcesh/bullmq/blob/master/src/classes/queue-keys.ts we can add a new method to handle global keys

Copy link
Contributor Author

@manast manast Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is not really a Queue key, so I am not sure it fits so well on a class that generates queue keys :/

this.queue.keys.meta,
this.queue.toKey(''),
];
Expand Down
12 changes: 8 additions & 4 deletions src/commands/obliterate-2.lua → src/commands/obliterate-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
however this behaviour can be overrided using the 'force' option.

Input:
KEYS[1] meta
KEYS[2] base
KEYS[1] registry key
KEYS[2] meta
KEYS[3] base

ARGV[1] count
ARGV[2] force
]]

local maxCount = tonumber(ARGV[1])
local baseKey = KEYS[2]
local baseKey = KEYS[3]

local rcall = redis.call

Expand All @@ -33,7 +34,7 @@ local function removeLockKeys(keys)
end

-- 1) Check if paused, if not return with error.
if rcall("HEXISTS", KEYS[1], "paused") ~= 1 then
if rcall("HEXISTS", KEYS[2], "paused") ~= 1 then
return -1 -- Error, NotPaused
end

Expand Down Expand Up @@ -99,6 +100,9 @@ if(maxCount <= 0) then
return 1
end

-- Remove from BullMQ registry. baseKey has an ending colon that needs to be removed
rcall("ZREM", KEYS[1], string.sub(baseKey, 1, -2))

if(maxCount > 0) then
rcall("DEL",
baseKey .. 'events',
Expand Down
1 change: 1 addition & 0 deletions src/consts/bullmq-registry-key.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const BullMQRegistryKey = 'bullmq:registry';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use bull as prefix for all our keys, do we really need to change that pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because this is a global key, if we used a prefix then we would not be able to auto-discover the queues defeating the whole purpose. The idea is that a UI can easily get all the queues available in a Redis instance.

132 changes: 132 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { v4 } from 'uuid';
import { FlowProducer, Job, Queue, Worker } from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';
import { version as currentPackageVersion } from '../src/version';
import { BullMQRegistryKey } from '../src/consts/bullmq-registry-key';

describe('queues', function () {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -715,4 +716,135 @@ describe('queues', function () {
await worker.close();
});
});

describe('Queue registry', () => {
let client: IORedis;

beforeEach(async function () {
client = new IORedis();
await client.del(BullMQRegistryKey);
});

afterEach(async function () {
await client.quit();
});

it('should add the queue to the registry ZSET when created', async () => {
const queue = new Queue('test-registry');

// Wait a tick for the queue’s init (if needed)
await queue.waitUntilReady();

// Check if the queue is in the registry
const result = await client.zscore(
BullMQRegistryKey,
queue.qualifiedName,
);
expect(result).to.not.be.null;

// Clean up
await queue.obliterate();
await queue.close();
});

it('should NOT add the queue to the registry if skipMetasUpdate is true', async () => {
const queue = new Queue('test-registry-skip', {
skipMetasUpdate: true,
connection: client,
});

await queue.waitUntilReady();

// If skipMetasUpdate is true, we expect no entry in the registry
const result = await client.zscore(
BullMQRegistryKey,
queue.qualifiedName,
);
expect(result).to.be.null;

await queue.obliterate();
await queue.close();
});

it('should remove the queue from registry after obliterating a paused queue', async () => {
const queue = new Queue('test-registry-remove');
await queue.waitUntilReady();

// Pause the queue so obliterate can work normally in BullMQ
await queue.pause();

// Add it to ensure it’s in the registry
let score = await client.zscore(BullMQRegistryKey, queue.qualifiedName);
expect(score).to.not.be.null;

// Obliterate
await queue.obliterate();

// The queue should now be gone from the registry
score = await client.zscore(BullMQRegistryKey, queue.qualifiedName);
expect(score).to.be.null;

await queue.close();
});

it('should return paginated queue names via getRegistry()', async () => {
const queueNames = ['registry-a', 'registry-b', 'registry-c'];
const queues: Queue[] = [];

// Create multiple queues
for (const name of queueNames) {
const queue = new Queue(name);
queues.push(queue);
}

await Promise.all(queues.map(q => q.waitUntilReady()));

await delay(100);

const results = await Queue.getRegistry(client, 0, -1);
expect(results).to.have.lengthOf(3);
expect(results).to.include.members(
queueNames.map(name => `bull:${name}`),
);

// Let’s do partial pagination: only the first 2
const paginatedResults = await Queue.getRegistry(client, 0, 1);
// Because ZRANGE end index is inclusive, "0,1" means 2 items
expect(paginatedResults).to.have.lengthOf(2);
expect(queueNames.map(name => `bull:${name}`)).to.include.members(
paginatedResults,
);

// Clean up
for (const queue of queues) {
await queue.obliterate();
await queue.close();
}
});

// This test should pass however it seems that the paused logic in obliterate is
// not working as expected.
it.skip('should fail to obliterate if queue is not paused', async () => {
const queue = new Queue('test-registry-not-paused');
await queue.waitUntilReady();

let errorCode: number | null = null;
try {
const result = await queue.obliterate();
console.log(result);
} catch (err: any) {
console.log(err);
errorCode = err.message.includes('-1') ? -1 : null;
}

// Verify that it actually was not removed from the registry
const score = await client.zscore(BullMQRegistryKey, queue.qualifiedName);
expect(score).to.not.be.null;

// If your script / code is returning an error or an error code, check it
expect(errorCode).to.eql(-1);

await queue.close();
});
});
});
Loading