diff --git a/src/classes/job.ts b/src/classes/job.ts index 5b3b3e7c30..50b88dfd20 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -609,11 +609,13 @@ export class Job< Date.now() + delay, token, ); - (multi).moveToDelayed(args); + this.scripts.execCommand(multi, 'moveToDelayed', args); command = 'delayed'; } else { // Retry immediately - (multi).retryJob( + this.scripts.execCommand( + multi, + 'retryJob', this.scripts.retryJobArgs(this.id, this.opts.lifo, token), ); command = 'retryJob'; @@ -631,7 +633,7 @@ export class Job< token, fetchNext, ); - (multi).moveToFinished(args); + this.scripts.execCommand(multi, 'moveToFinished', args); finishedOn = args[13]; command = 'failed'; } @@ -1110,7 +1112,7 @@ export class Job< err?.message, ); - (multi).saveStacktrace(args); + this.scripts.execCommand(multi, 'saveStacktrace', args); } } diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 8e58be675d..839bc10143 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -9,6 +9,7 @@ import { isRedisCluster, isRedisInstance, isRedisVersionLowerThan, + readPackageJson, } from '../utils'; import * as scripts from '../scripts'; @@ -153,13 +154,18 @@ export class RedisConnection extends EventEmitter { return this.initializing; } - protected loadCommands(providedScripts?: Record): void { + protected loadCommands( + version?: string, + providedScripts?: Record, + ): void { const finalScripts = providedScripts || (scripts as Record); for (const property in finalScripts as Record) { // Only define the command if not already defined - if (!(this._client)[finalScripts[property].name]) { - (this._client).defineCommand(finalScripts[property].name, { + const commandName = `${finalScripts[property].name}:${version}`; + + if (!(this._client)[commandName]) { + (this._client).defineCommand(commandName, { numberOfKeys: finalScripts[property].keys, lua: finalScripts[property].content, }); @@ -178,8 +184,10 @@ export class RedisConnection extends EventEmitter { this._client.on('ready', this.handleClientReady); + const { version } = readPackageJson(); + await RedisConnection.waitUntilReady(this._client); - this.loadCommands(); + this.loadCommands(version); this.version = await this.getRedisVersion(); if (this.opts && this.opts.skipVersionCheck !== true && !this.closing) { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 2bbdb7a3ad..a141c084e6 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -32,17 +32,26 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; +import { + array2obj, + getParentKey, + isRedisVersionLowerThan, + readPackageJson, +} from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; export class Scripts { + protected version; + moveToFinishedKeys: (string | undefined)[]; constructor(protected queue: MinimalQueue) { const queueKeys = this.queue.keys; + this.version = readPackageJson().version; + this.moveToFinishedKeys = [ queueKeys.wait, queueKeys.active, @@ -59,13 +68,22 @@ export class Scripts { ]; } + public execCommand( + client: RedisClient | ChainableCommander, + commandName: string, + args: any[], + ) { + const commandNameWithVersion = `${commandName}:${this.version}`; + return (client)[commandNameWithVersion](args); + } + async isJobInList(listKey: string, jobId: string): Promise { const client = await this.queue.client; let result; if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) { - result = await (client).isJobInList([listKey, jobId]); + result = await this.execCommand(client, 'isJobInList', [listKey, jobId]); } else { - result = await (client).lpos(listKey, jobId); + result = await client.lpos(listKey, jobId); } return Number.isInteger(result); } @@ -127,7 +145,7 @@ export class Scripts { keys.push(pack(args), job.data, encodedOpts); - const result = await (client).addJob(keys); + const result = await this.execCommand(client, 'addJob', keys); if (result < 0) { throw this.finishedErrors(result, parentOpts.parentKey, 'addJob'); @@ -152,7 +170,11 @@ export class Scripts { keys.push(this.queue.keys.events); - return (client).pause(keys.concat([pause ? 'paused' : 'resumed'])); + return this.execCommand( + client, + 'pause', + keys.concat([pause ? 'paused' : 'resumed']), + ); } private removeRepeatableArgs( @@ -175,14 +197,14 @@ export class Scripts { const client = await this.queue.client; const args = this.removeRepeatableArgs(repeatJobId, repeatJobKey); - return (client).removeRepeatable(args); + return this.execCommand(client, 'removeRepeatable', args); } async remove(jobId: string): Promise { const client = await this.queue.client; const keys = [''].map(name => this.queue.toKey(name)); - return (client).removeJob(keys.concat([jobId])); + return this.execCommand(client, 'removeJob', keys.concat([jobId])); } async extendLock( @@ -199,7 +221,7 @@ export class Scripts { duration, jobId, ]; - return (client).extendLock(args); + return this.execCommand(client, 'extendLock', args); } async updateData( @@ -211,7 +233,11 @@ export class Scripts { const keys = [this.queue.toKey(job.id)]; const dataJson = JSON.stringify(data); - const result = await (client).updateData(keys.concat([dataJson])); + const result = await this.execCommand( + client, + 'updateData', + keys.concat([dataJson]), + ); if (result < 0) { throw this.finishedErrors(result, job.id, 'updateData'); @@ -227,7 +253,9 @@ export class Scripts { const keys = [this.queue.toKey(job.id), this.queue.keys.events]; const progressJson = JSON.stringify(progress); - const result = await (client).updateProgress( + const result = await this.execCommand( + client, + 'updateProgress', keys.concat([job.id, progressJson]), ); @@ -309,7 +337,7 @@ export class Scripts { ) { const client = await this.queue.client; - const result = await (client).moveToFinished(args); + const result = await this.execCommand(client, 'moveToFinished', args); if (result < 0) { throw this.finishedErrors(result, jobId, 'moveToFinished', 'active'); } else { @@ -366,7 +394,7 @@ export class Scripts { const client = await this.queue.client; const args = this.drainArgs(delayed); - return (client).drain(args); + return this.execCommand(client, 'drain', args); } private getRangesArgs( @@ -396,7 +424,7 @@ export class Scripts { const client = await this.queue.client; const args = this.getRangesArgs(types, start, end, asc); - return (client).getRanges(args); + return this.execCommand(client, 'getRanges', args); } private getCountsArgs(types: JobType[]): (string | number)[] { @@ -416,7 +444,7 @@ export class Scripts { const client = await this.queue.client; const args = this.getCountsArgs(types); - return (client).getCounts(args); + return this.execCommand(client, 'getCounts', args); } moveToCompletedArgs( @@ -469,7 +497,9 @@ export class Scripts { return this.queue.toKey(key); }); - return (client).isFinished( + return this.execCommand( + client, + 'isFinished', keys.concat([jobId, returnValue ? '1' : '']), ); } @@ -490,16 +520,16 @@ export class Scripts { }); if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) { - return (client).getState(keys.concat([jobId])); + return this.execCommand(client, 'getState', keys.concat([jobId])); } - return (client).getStateV2(keys.concat([jobId])); + return this.execCommand(client, 'getStateV2', keys.concat([jobId])); } async changeDelay(jobId: string, delay: number): Promise { const client = await this.queue.client; const args = this.changeDelayArgs(jobId, delay); - const result = await (client).changeDelay(args); + const result = await this.execCommand(client, 'changeDelay', args); if (result < 0) { throw this.finishedErrors(result, jobId, 'changeDelay', 'delayed'); } @@ -535,7 +565,7 @@ export class Scripts { const client = await this.queue.client; const args = this.changePriorityArgs(jobId, priority, lifo); - const result = await (client).changePriority(args); + const result = await this.execCommand(client, 'changePriority', args); if (result < 0) { throw this.finishedErrors(result, jobId, 'changePriority'); } @@ -645,7 +675,7 @@ export class Scripts { const client = await this.queue.client; const args = this.moveToDelayedArgs(jobId, timestamp, token); - const result = await (client).moveToDelayed(args); + const result = await this.execCommand(client, 'moveToDelayed', args); if (result < 0) { throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active'); } @@ -670,7 +700,11 @@ export class Scripts { const client = await this.queue.client; const args = this.moveToWaitingChildrenArgs(jobId, token, opts); - const result = await (client).moveToWaitingChildren(args); + const result = await this.execCommand( + client, + 'moveToWaitingChildren', + args, + ); switch (result) { case 0: @@ -699,7 +733,7 @@ export class Scripts { ): Promise { const client = await this.queue.client; - return (client).cleanJobsInSet([ + return this.execCommand(client, 'cleanJobsInSet', [ this.queue.toKey(set), this.queue.toKey('events'), this.queue.toKey(''), @@ -769,7 +803,7 @@ export class Scripts { const args = this.retryJobsArgs(state, count, timestamp); - return (client).retryJobs(args); + return this.execCommand(client, 'retryJobs', args); } /** @@ -807,7 +841,11 @@ export class Scripts { state, ]; - const result = await (client).reprocessJob(keys.concat(args)); + const result = await this.execCommand( + client, + 'reprocessJob', + keys.concat(args), + ); switch (result) { case 1: @@ -845,7 +883,9 @@ export class Scripts { }), ]; - const result = await (client).moveToActive( + const result = await this.execCommand( + client, + 'moveToActive', (<(string | number | boolean | Buffer)[]>keys).concat(args), ); @@ -866,7 +906,7 @@ export class Scripts { const args = [this.queue.toKey(''), jobId]; - return (client).promote(keys.concat(args)); + return this.execCommand(client, 'promote', keys.concat(args)); } /** @@ -898,7 +938,7 @@ export class Scripts { Date.now(), opts.stalledInterval, ]; - return (client).moveStalledJobsToWait(keys.concat(args)); + return this.execCommand(client, 'moveStalledJobsToWait', keys.concat(args)); } /** @@ -928,7 +968,11 @@ export class Scripts { const args = [jobId, token, this.queue.toKey(jobId)]; - const pttl = await (client).moveJobFromActiveToWait(keys.concat(args)); + const pttl = await this.execCommand( + client, + 'moveJobFromActiveToWait', + keys.concat(args), + ); return pttl < 0 ? 0 : pttl; } @@ -942,7 +986,11 @@ export class Scripts { ]; const args = [opts.count, opts.force ? 'force' : null]; - const result = await (client).obliterate(keys.concat(args)); + const result = await this.execCommand( + client, + 'obliterate', + keys.concat(args), + ); if (result < 0) { switch (result) { case -1: