diff --git a/docs/gitbook/bullmq-pro/changelog.md b/docs/gitbook/bullmq-pro/changelog.md index e23c49afa7..eedaa1da10 100644 --- a/docs/gitbook/bullmq-pro/changelog.md +++ b/docs/gitbook/bullmq-pro/changelog.md @@ -1,3 +1,142 @@ +## [7.26.3](https://github.com/taskforcesh/bullmq-pro/compare/v7.26.2...v7.26.3) (2025-01-26) + + +### Bug Fixes + +* **queue:** use same telemetry logic in add method as BullMQ ([#287](https://github.com/taskforcesh/bullmq-pro/issues/287)) ([214c0d9](https://github.com/taskforcesh/bullmq-pro/commit/214c0d979bd38519df3faa98e0f622ef6f813f68)) + +## [7.26.2](https://github.com/taskforcesh/bullmq-pro/compare/v7.26.1...v7.26.2) (2025-01-18) + + +### Bug Fixes + +* **job-scheduler:** use delayed job data when template data is not present ([#3010](https://github.com/taskforcesh/bullmq/issues/3010)) fixes [#3009](https://github.com/taskforcesh/bullmq/issues/3009) ([95edb40](https://github.com/taskforcesh/bullmq/commit/95edb4008fcd32f09ec0953d862692d4ac7608c0)) +* **job-scheduler:** add next delayed job only when prevMillis matches with producerId ([#3001](https://github.com/taskforcesh/bullmq/issues/3001)) ([4ea35dd](https://github.com/taskforcesh/bullmq/commit/4ea35dd9e16ff0197f204210696f41c0c5bd0e30)) +* **job-scheduler:** avoid duplicates when upserting in a quick sequence ([#2991](https://github.com/taskforcesh/bullmq/issues/2991)) ([e8cdb99](https://github.com/taskforcesh/bullmq/commit/e8cdb99881bc7cebbc48cb7834da5eafa289712f)) +* **dynamic-rate-limit:** validate job lock cases ([#2975](https://github.com/taskforcesh/bullmq/issues/2975)) ([8bb27ea](https://github.com/taskforcesh/bullmq/commit/8bb27ea4438cbd11e85fa4d0aa516bd1c0e7d51b)) + + +### Performance Improvements + +* **job-scheduler:** add delayed job and update scheduler in same script ([#2997](https://github.com/taskforcesh/bullmq/issues/2997)) ([9be28a0](https://github.com/taskforcesh/bullmq/commit/9be28a0c4a907798a447d02ca50662c12333dd82)) +* **job-scheduler:** add delayed job and scheduler in same script ([#2993](https://github.com/taskforcesh/bullmq/issues/2993)) ([95718e8](https://github.com/taskforcesh/bullmq/commit/95718e888ba64b4071f21bbe0823b55a51ab145c)) + +## [7.26.1](https://github.com/taskforcesh/bullmq-pro/compare/v7.26.0...v7.26.1) (2024-12-22) + + +### Bug Fixes + +* **sandbox:** fix issue where job could stay in active forever ([#2979](https://github.com/taskforcesh/bullmq/issues/2979)) ([c0a6bcd](https://github.com/taskforcesh/bullmq/commit/c0a6bcdf9594540ef6c8ec08df28550f4f5e1950)) +* **sandboxed:** fix detecting special errors by sending default messages ([#2967](https://github.com/taskforcesh/bullmq/issues/2967)) fixes [#2962](https://github.com/taskforcesh/bullmq/issues/2962) ([52b0e34](https://github.com/taskforcesh/bullmq/commit/52b0e34f0a38ac71ebd0667a5fa116ecd73ae4d2)) + +# [7.26.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.25.0...v7.26.0) (2024-12-17) + + +### Bug Fixes + +* **scripts:** make sure jobs fields are not empty before unpack ([4360572](https://github.com/taskforcesh/bullmq/commit/4360572745a929c7c4f6266ec03d4eba77a9715c)) +* guarantee every repeatable jobs are slotted ([9917df1](https://github.com/taskforcesh/bullmq/commit/9917df166aff2e2f143c45297f41ac8520bfc8ae)) +* **job-scheduler:** avoid duplicated delayed jobs when repeatable jobs are retried ([af75315](https://github.com/taskforcesh/bullmq/commit/af75315f0c7923f5e0a667a9ed4606b28b89b719)) +* **job-scheduler:** omit deduplication and debounce options from template options ([#2960](https://github.com/taskforcesh/bullmq/issues/2960)) ([b5fa6a3](https://github.com/taskforcesh/bullmq/commit/b5fa6a3208a8f2a39777dc30c2db2f498addb907)) + + +### Features + +* **telemetry:** add option to omit context propagation on jobs ([#2946](https://github.com/taskforcesh/bullmq/issues/2946)) ([6514c33](https://github.com/taskforcesh/bullmq/commit/6514c335231cb6e727819cf5e0c56ed3f5132838)) +* replace multi by lua scripts in moveToFailed ([#2958](https://github.com/taskforcesh/bullmq/issues/2958)) ([c19c914](https://github.com/taskforcesh/bullmq/commit/c19c914969169c660a3e108126044c5152faf0cd)) + +# [7.25.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.24.0...v7.25.0) (2024-12-17) + + +### Features + +* **queue:** enhance getJobSchedulers method to include template information ([#2956](https://github.com/taskforcesh/bullmq/issues/2956)) ref [#2875](https://github.com/taskforcesh/bullmq/issues/2875) ([5b005cd](https://github.com/taskforcesh/bullmq/commit/5b005cd94ba0f98677bed4a44f8669c81f073f26)) + +# [7.24.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.23.1...v7.24.0) (2024-12-07) + + +### Bug Fixes + +* **worker:** catch connection error when moveToActive is called ([#2952](https://github.com/taskforcesh/bullmq/issues/2952)) ([544fc7c](https://github.com/taskforcesh/bullmq/commit/544fc7c9e4755e6b62b82216e25c0cb62734ed59)) +* **scheduler-template:** remove console.log when getting template information ([#2950](https://github.com/taskforcesh/bullmq/issues/2950)) ([3402bfe](https://github.com/taskforcesh/bullmq/commit/3402bfe0d01e5e5205db74d2106cd19d7df53fcb)) +* **flow:** allow using removeOnFail and failParentOnFailure in parents ([#2947](https://github.com/taskforcesh/bullmq/issues/2947)) fixes [#2229](https://github.com/taskforcesh/bullmq/issues/2229) ([85f6f6f](https://github.com/taskforcesh/bullmq/commit/85f6f6f181003fafbf75304a268170f0d271ccc3)) +* **job-scheduler:** upsert template when same pattern options are provided ([#2943](https://github.com/taskforcesh/bullmq/issues/2943)) ref [#2940](https://github.com/taskforcesh/bullmq/issues/2940) ([b56c3b4](https://github.com/taskforcesh/bullmq/commit/b56c3b45a87e52f5faf25406a2b992d1bfed4900)) + + +### Features + +* **queue:** enhance getJobScheduler method to include template information ([#2929](https://github.com/taskforcesh/bullmq/issues/2929)) ref [#2875](https://github.com/taskforcesh/bullmq/issues/2875) ([cb99080](https://github.com/taskforcesh/bullmq/commit/cb990808db19dd79b5048ee99308fa7d1eaa2e9f)) +* **queue:** add getJobSchedulersCount method ([#2945](https://github.com/taskforcesh/bullmq/issues/2945)) ([38820dc](https://github.com/taskforcesh/bullmq/commit/38820dc8c267c616ada9931198e9e3e9d2f0d536)) + +## [7.23.1](https://github.com/taskforcesh/bullmq-pro/compare/v7.23.0...v7.23.1) (2024-12-06) + + +### Bug Fixes + +* **stalled:** move parent to group when needed ([#276](https://github.com/taskforcesh/bullmq-pro/issues/276)) ([8449a41](https://github.com/taskforcesh/bullmq-pro/commit/8449a41847aa19bcede07bd9dc71032f58ede420)) + +# [7.23.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.22.0...v7.23.0) (2024-11-26) + + +### Bug Fixes + +* **scheduler:** remove deprecation warning on immediately option ([#2923](https://github.com/taskforcesh/bullmq/issues/2923)) ([14ca7f4](https://github.com/taskforcesh/bullmq/commit/14ca7f44f31a393a8b6d0ce4ed244e0063198879)) + + +### Features + +* **telemetry:** add telemetry support ([#273](https://github.com/taskforcesh/bullmq-pro/issues/273)) ([e5cc134](https://github.com/taskforcesh/bullmq-pro/commit/e5cc13453b4cee58b04c87568b5cad6a26c31eb7)) +* **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](https://github.com/taskforcesh/bullmq/commit/09f257196f6d5a6690edbf55f12d585cec86ee8f)) + +# [7.22.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.21.1...v7.22.0) (2024-11-22) + + +### Bug Fixes + +* **queue:** change _jobScheduler from private to protected for extension ([#2920](https://github.com/taskforcesh/bullmq/issues/2920)) ([34c2348](https://github.com/taskforcesh/bullmq/commit/34c23485bcb32b3c69046b2fb37e5db8927561ce)) +* **scheduler:** use Job class from getter for extension ([#2917](https://github.com/taskforcesh/bullmq/issues/2917)) ([5fbb075](https://github.com/taskforcesh/bullmq/commit/5fbb075dd4abd51cc84a59575261de84e56633d8)) +* **telemetry:** do not set span on parent context if undefined ([c417a23](https://github.com/taskforcesh/bullmq/commit/c417a23bb28d9effa42115e954b18cc41c1fc043)) + + +### Features + +* **job-scheduler:** add telemetry support to the job scheduler ([72ea950](https://github.com/taskforcesh/bullmq/commit/72ea950ea251aa12f879ba19c0b5dfeb6a093da2)) +* **queue:** add rateLimit method ([#2896](https://github.com/taskforcesh/bullmq/issues/2896)) ([db84ad5](https://github.com/taskforcesh/bullmq/commit/db84ad51a945c754c3cd03e5e718cd8d0341a8b4)) +* **queue:** add removeRateLimitKey method ([#2806](https://github.com/taskforcesh/bullmq/issues/2806)) ([ff70613](https://github.com/taskforcesh/bullmq/commit/ff706131bf642fb7544b9d15994d75b1edcb27dc)) + + +### Performance Improvements + +* **marker:** add base markers while consuming jobs to get workers busy ([#2904](https://github.com/taskforcesh/bullmq/issues/2904)) fixes [#2842](https://github.com/taskforcesh/bullmq/issues/2842) ([1759c8b](https://github.com/taskforcesh/bullmq/commit/1759c8bc111cab9e43d5fccb4d8d2dccc9c39fb4)) + +## [7.21.1](https://github.com/taskforcesh/bullmq-pro/compare/v7.21.0...v7.21.1) (2024-11-15) + + +### Bug Fixes + +* **deps:** use fixed version of bullmq v5.26.1 ([#269](https://github.com/taskforcesh/bullmq-pro/issues/269)) ([33e73e4](https://github.com/taskforcesh/bullmq-pro/commit/33e73e4cb5864d91ca1fe84308f349771e41cdba)) + +# [7.21.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.20.2...v7.21.0) (2024-11-14) + + +### Bug Fixes + +* **queue:** fix generics to be able to properly be extended ([f2495e5](https://github.com/taskforcesh/bullmq/commit/f2495e5ee9ecdb26492da510dc38730718cb28c5)) + + +### Features + +* **queue-pro:** expose jobs pro in getters ([e1da097](https://github.com/taskforcesh/bullmq-pro/commit/e1da0973b9421d24940cbd828a6e33c952fc6cf0)) +* improve queue getters to use generic job type ([#2905](https://github.com/taskforcesh/bullmq/issues/2905)) ([c9531ec](https://github.com/taskforcesh/bullmq/commit/c9531ec7a49126a017611eb2fd2eaea8fcb5ada5)) + +## [7.20.2](https://github.com/taskforcesh/bullmq-pro/compare/v7.20.1...v7.20.2) (2024-11-13) + + +### Bug Fixes + +* **job-scheculer:** avoid hazards when upserting job schedulers concurrently ([022f7b7](https://github.com/taskforcesh/bullmq/commit/022f7b7d0a0ce14387ed2b9fed791e1f56e34770)) +* **connection:** do not allow to set blockingConnection option ([#2851](https://github.com/taskforcesh/bullmq/issues/2851)) ([9391cc2](https://github.com/taskforcesh/bullmq/commit/9391cc22200914ecc8958972ebc580862a70f63c)) + ## [7.20.1](https://github.com/taskforcesh/bullmq-pro/compare/v7.20.0...v7.20.1) (2024-11-10) diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index 908af9b22f..1e8ed95b34 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,22 @@ +# [5.39.0](https://github.com/taskforcesh/bullmq/compare/v5.38.0...v5.39.0) (2025-01-29) + + +### Features + +* **job-scheduler:** save limit option ([#3033](https://github.com/taskforcesh/bullmq/issues/3033)) ([a1571ea](https://github.com/taskforcesh/bullmq/commit/a1571ea03be6c6c41794fa272c38c29588351bbf)) + +# [5.38.0](https://github.com/taskforcesh/bullmq/compare/v5.37.0...v5.38.0) (2025-01-28) + + +### Bug Fixes + +* **flow-producer:** add support for skipWaitingForReady ([6d829fc](https://github.com/taskforcesh/bullmq/commit/6d829fceda9f204f193c533ffc780962692b8f16)) + + +### Features + +* **queue:** add option to skip wait until connection ready ([e728299](https://github.com/taskforcesh/bullmq/commit/e72829922d4234b92290346dce5d33f5b98ee373)) + # [5.37.0](https://github.com/taskforcesh/bullmq/compare/v5.36.0...v5.37.0) (2025-01-25) @@ -163,7 +182,6 @@ ### Features -* **queue:** add getDelayedCount method [python] ([#2934](https://github.com/taskforcesh/bullmq/issues/2934)) ([71ce75c](https://github.com/taskforcesh/bullmq/commit/71ce75c04b096b5593da0986c41a771add1a81ce)) * **queue:** add getJobSchedulersCount method ([#2945](https://github.com/taskforcesh/bullmq/issues/2945)) ([38820dc](https://github.com/taskforcesh/bullmq/commit/38820dc8c267c616ada9931198e9e3e9d2f0d536)) ## [5.29.1](https://github.com/taskforcesh/bullmq/compare/v5.29.0...v5.29.1) (2024-11-23) @@ -308,14 +326,8 @@ # [5.22.0](https://github.com/taskforcesh/bullmq/compare/v5.21.2...v5.22.0) (2024-10-31) -### Bug Fixes - -* **commands:** add missing build statement when releasing [python] ([#2869](https://github.com/taskforcesh/bullmq/issues/2869)) fixes [#2868](https://github.com/taskforcesh/bullmq/issues/2868) ([ff2a47b](https://github.com/taskforcesh/bullmq/commit/ff2a47b37c6b36ee1a725f91de2c6e4bcf8b011a)) - - ### Features -* **job:** add getChildrenValues method [python] ([#2853](https://github.com/taskforcesh/bullmq/issues/2853)) ([0f25213](https://github.com/taskforcesh/bullmq/commit/0f25213b28900a1c35922bd33611701629d83184)) * **queue:** add a telemetry interface ([#2721](https://github.com/taskforcesh/bullmq/issues/2721)) ([273b574](https://github.com/taskforcesh/bullmq/commit/273b574e6b5628680990eb02e1930809c9cba5bb)) ## [5.21.2](https://github.com/taskforcesh/bullmq/compare/v5.21.1...v5.21.2) (2024-10-22) diff --git a/package.json b/package.json index 50680e98cd..e8172d00e8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "5.37.0", + "version": "5.39.0", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js", diff --git a/src/classes/flow-producer.ts b/src/classes/flow-producer.ts index a513275f5b..ddfb7bfed7 100644 --- a/src/classes/flow-producer.ts +++ b/src/classes/flow-producer.ts @@ -114,12 +114,12 @@ export class FlowProducer extends EventEmitter { ...opts, }; - this.connection = new Connection( - opts.connection, - isRedisInstance(opts.connection), - false, - opts.skipVersionCheck, - ); + this.connection = new Connection(opts.connection, { + shared: isRedisInstance(opts.connection), + blocking: false, + skipVersionCheck: opts.skipVersionCheck, + skipWaitingForReady: opts.skipWaitingForReady, + }); this.connection.on('error', (error: Error) => this.emit('error', error)); this.connection.on('close', () => { diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 5f02910a26..523fbb9bf1 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -39,7 +39,7 @@ export class JobScheduler extends QueueBase { opts: JobSchedulerTemplateOptions, { override, producerId }: { override: boolean; producerId?: string }, ): Promise | undefined> { - const { every, pattern, offset } = repeatOpts; + const { every, limit, pattern, offset } = repeatOpts; if (pattern && every) { throw new Error( @@ -160,6 +160,7 @@ export class JobScheduler extends QueueBase { tz: repeatOpts.tz, pattern, every, + limit, }, Job.optsAsJSON(mergedOpts), producerId, @@ -275,21 +276,44 @@ export class JobScheduler extends QueueBase { next?: number, ): Promise> { if (jobData) { - return { + const jobSchedulerData: JobSchedulerJson = { key, - iterationCount: parseInt(jobData.ic) || null, name: jobData.name, - endDate: parseInt(jobData.endDate) || null, - tz: jobData.tz || null, - pattern: jobData.pattern || null, - every: jobData.every || null, - ...(jobData.data || jobData.opts - ? { - template: this.getTemplateFromJSON(jobData.data, jobData.opts), - } - : {}), next, }; + + if (jobData.ic) { + jobSchedulerData.iterationCount = parseInt(jobData.ic); + } + + if (jobData.limit) { + jobSchedulerData.limit = parseInt(jobData.limit); + } + + if (jobData.endDate) { + jobSchedulerData.endDate = parseInt(jobData.endDate); + } + + if (jobData.tz) { + jobSchedulerData.tz = jobData.tz; + } + + if (jobData.pattern) { + jobSchedulerData.pattern = jobData.pattern; + } + + if (jobData.every) { + jobSchedulerData.every = jobData.every; + } + + if (jobData.data || jobData.opts) { + jobSchedulerData.template = this.getTemplateFromJSON( + jobData.data, + jobData.opts, + ); + } + + return jobSchedulerData; } return this.keyToData(key, next); diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index bdd9da7911..bda0dd303f 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -63,12 +63,12 @@ export class QueueBase extends EventEmitter implements MinimalQueue { throw new Error('Queue name cannot contain :'); } - this.connection = new Connection( - opts.connection, - isRedisInstance(opts.connection), - hasBlockingConnection, - opts.skipVersionCheck, - ); + this.connection = new Connection(opts.connection, { + shared: isRedisInstance(opts.connection), + blocking: hasBlockingConnection, + skipVersionCheck: opts.skipVersionCheck, + skipWaitingForReady: opts.skipWaitingForReady, + }); this.connection.on('error', (error: Error) => this.emit('error', error)); this.connection.on('close', () => { diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 9621f4d8be..34c971d9ea 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -59,12 +59,25 @@ export class RedisConnection extends EventEmitter { private handleClientReady: () => void; constructor( - opts?: ConnectionOptions, - private readonly shared: boolean = false, - private readonly blocking = true, - skipVersionCheck = false, + opts: ConnectionOptions, + private readonly extraOptions?: { + shared?: boolean; + blocking?: boolean; + skipVersionCheck?: boolean; + skipWaitingForReady?: boolean; + }, ) { super(); + + // Set extra options defaults + this.extraOptions = { + shared: false, + blocking: true, + skipVersionCheck: false, + skipWaitingForReady: false, + ...extraOptions, + }; + if (!isRedisInstance(opts)) { this.checkBlockingOptions(overrideMessage, opts); @@ -77,7 +90,7 @@ export class RedisConnection extends EventEmitter { ...opts, }; - if (this.blocking) { + if (this.extraOptions.blocking) { this.opts.maxRetriesPerRequest = null; } } else { @@ -101,7 +114,9 @@ export class RedisConnection extends EventEmitter { } this.skipVersionCheck = - skipVersionCheck || !!(this.opts && this.opts.skipVersionCheck); + extraOptions?.skipVersionCheck || + !!(this.opts && this.opts.skipVersionCheck); + this.handleClientError = (err: Error): void => { this.emit('error', err); }; @@ -123,7 +138,7 @@ export class RedisConnection extends EventEmitter { options?: RedisOptions, throwError = false, ) { - if (this.blocking && options && options.maxRetriesPerRequest) { + if (this.extraOptions.blocking && options && options.maxRetriesPerRequest) { if (throwError) { throw new Error(msg); } else { @@ -228,7 +243,9 @@ export class RedisConnection extends EventEmitter { this._client.on('ready', this.handleClientReady); - await RedisConnection.waitUntilReady(this._client); + if (!this.extraOptions.skipWaitingForReady) { + await RedisConnection.waitUntilReady(this._client); + } this.loadCommands(this.packageVersion); @@ -315,7 +332,7 @@ export class RedisConnection extends EventEmitter { // Not sure if we need to wait for this await this.initializing; } - if (!this.shared) { + if (!this.extraOptions.shared) { if (status == 'initializing' || force) { // If we have not still connected to Redis, we need to disconnect. this._client.disconnect(); diff --git a/src/classes/worker.ts b/src/classes/worker.ts index faba273151..58bd3257a7 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -315,9 +315,11 @@ export class Worker< isRedisInstance(opts.connection) ? (opts.connection).duplicate({ connectionName }) : { ...opts.connection, connectionName }, - false, - true, - opts.skipVersionCheck, + { + shared: false, + blocking: true, + skipVersionCheck: opts.skipVersionCheck, + }, ); this.blockingConnection.on('error', error => this.emit('error', error)); this.blockingConnection.on('ready', () => diff --git a/src/commands/addJobScheduler-6.lua b/src/commands/addJobScheduler-6.lua index 5701facd7a..99c8acb950 100644 --- a/src/commands/addJobScheduler-6.lua +++ b/src/commands/addJobScheduler-6.lua @@ -53,6 +53,10 @@ if prevMillis ~= false then local prevDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis if rcall("ZSCORE", delayedKey, prevDelayedJobId) ~= false then + -- Delayed job is not regenerated with new scheduler opts, + -- next delayed job will be scheduled with old repeat options. + -- This is why we need to remove the current delayed job. + -- This is happening in that way because we use opts from current delayed job to schedule the next one. removeJob(prevDelayedJobId, true, prefixKey, true --[[remove debounce key]]) rcall("ZREM", delayedKey, prevDelayedJobId) end diff --git a/src/commands/includes/storeJobScheduler.lua b/src/commands/includes/storeJobScheduler.lua index 6194741473..d11cf41223 100644 --- a/src/commands/includes/storeJobScheduler.lua +++ b/src/commands/includes/storeJobScheduler.lua @@ -10,6 +10,11 @@ local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMilli table.insert(optionalValues, opts['tz']) end + if opts['limit'] then + table.insert(optionalValues, "limit") + table.insert(optionalValues, opts['limit']) + end + if opts['pattern'] then table.insert(optionalValues, "pattern") table.insert(optionalValues, opts['pattern']) diff --git a/src/interfaces/job-scheduler-json.ts b/src/interfaces/job-scheduler-json.ts index 538d7c2178..65fcd861d4 100644 --- a/src/interfaces/job-scheduler-json.ts +++ b/src/interfaces/job-scheduler-json.ts @@ -10,10 +10,11 @@ export interface JobSchedulerJson { name: string; id?: string | null; iterationCount?: number; - endDate: number | null; - tz: string | null; - pattern: string | null; - every?: string | null; + limit?: number; + endDate?: number; + tz?: string; + pattern?: string; + every?: string; next?: number; template?: JobSchedulerTemplateJson; } diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index a1459059e5..b4b3c82df9 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -38,6 +38,16 @@ export interface QueueBaseOptions { * Telemetry client */ telemetry?: Telemetry; + + /** + * Skip waiting for connection ready. + * + * In some instances if you want the queue to fail fast if the connection is + * not ready you can set this to true. This could be useful for testing and when + * adding jobs via HTTP endpoints for example. + * + */ + skipWaitingForReady?: boolean; } /** @@ -75,11 +85,6 @@ export interface QueueOptions extends QueueBaseOptions { * Advanced options for the repeatable jobs. */ settings?: AdvancedRepeatOptions; - - /** - * Telemetry client - */ - telemetry?: Telemetry; } /** diff --git a/src/interfaces/repeatable-options.ts b/src/interfaces/repeatable-options.ts index 6b2da11583..8875583db1 100644 --- a/src/interfaces/repeatable-options.ts +++ b/src/interfaces/repeatable-options.ts @@ -2,6 +2,7 @@ export type RepeatableOptions = { name: string; endDate?: number; tz?: string; + limit?: number; pattern?: string; every?: number; }; diff --git a/src/version.ts b/src/version.ts index defc506f88..79c6e55a66 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const version = '5.37.0'; +export const version = '5.39.0'; diff --git a/tests/test_connection.ts b/tests/test_connection.ts index 748bb0a7c0..dfd38aeb2d 100644 --- a/tests/test_connection.ts +++ b/tests/test_connection.ts @@ -1,7 +1,14 @@ import { expect } from 'chai'; import { default as IORedis, RedisOptions } from 'ioredis'; import { v4 } from 'uuid'; -import { Queue, Job, Worker, QueueBase } from '../src/classes'; +import { + Queue, + Job, + Worker, + QueueBase, + FlowProducer, + RedisConnection, +} from '../src/classes'; import { removeAllQueueData } from '../src/utils'; import { before, @@ -12,6 +19,149 @@ import { after as afterAll, } from 'mocha'; +import * as sinon from 'sinon'; + +describe('RedisConnection', () => { + describe('constructor', () => { + it('initializes with default extraOptions when none provided', () => { + const connection = new RedisConnection({}); + expect((connection as any).extraOptions).to.deep.equal({ + shared: false, + blocking: true, + skipVersionCheck: false, + skipWaitingForReady: false, + }); + }); + + it('merges provided extraOptions with defaults', () => { + const options = { + shared: true, + blocking: false, + skipVersionCheck: true, + skipWaitingForReady: true, + }; + const connection = new RedisConnection({}, options); + expect((connection as any).extraOptions).to.deep.include(options); + }); + }); + + describe('blocking option', () => { + it('sets maxRetriesPerRequest to null when blocking is true', () => { + const connection = new RedisConnection({}, { blocking: true }); + expect((connection as any).opts.maxRetriesPerRequest).to.be.null; + }); + + it('preserves maxRetriesPerRequest when blocking is false', () => { + const connection = new RedisConnection( + { maxRetriesPerRequest: 10 }, + { blocking: false }, + ); + expect((connection as any).opts.maxRetriesPerRequest).to.equal(10); + }); + }); + + describe('connect()', () => { + let waitUntilReadyStub: sinon.SinonStub; + + beforeEach(() => { + waitUntilReadyStub = sinon + .stub(RedisConnection, 'waitUntilReady') + .resolves(); + }); + + afterEach(() => { + waitUntilReadyStub.restore(); + }); + + it('skips waiting for ready when skipWaitingForReady is true', async () => { + const connection = new RedisConnection({}, { skipWaitingForReady: true }); + const client = await connection.client; + expect(waitUntilReadyStub.called).to.be.false; + }); + + it('awaits ready state when skipWaitingForReady is false', async () => { + const connection = new RedisConnection( + {}, + { skipWaitingForReady: false }, + ); + const client = await connection.client; + expect(waitUntilReadyStub.calledOnce).to.be.true; + }); + }); + + describe('Queue', () => { + it('propagates skipWaitingForReady to RedisConnection', () => { + const queue = new Queue('test', { + skipWaitingForReady: true, + connection: {}, + }); + expect((queue).connection.extraOptions.skipWaitingForReady).to.be + .true; + }); + + it('uses non-blocking connection by default', () => { + const queue = new Queue('test'); + expect((queue).connection.extraOptions.blocking).to.be.false; + }); + + it('uses shared connection if provided Redis instance', () => { + const connection = new IORedis(); + + const queue = new Queue('test', { + connection, + }); + expect((queue).connection.extraOptions.shared).to.be.true; + + connection.disconnect(); + }); + }); + + describe('Worker', () => { + it('initializes blockingConnection with blocking: true', () => { + const worker = new Worker('test', async () => {}, { connection: {} }); + expect((worker).blockingConnection.extraOptions.blocking).to.be.true; + }); + + it('sets shared: false for blockingConnection', () => { + const connection = new IORedis({ maxRetriesPerRequest: null }); + + const worker = new Worker('test', async () => {}, { connection }); + expect((worker).blockingConnection.extraOptions.shared).to.be.false; + + connection.disconnect(); + }); + + it('uses blocking connection by default', () => { + const connection = new IORedis({ maxRetriesPerRequest: null }); + + const worker = new Worker('test', async () => {}, { connection }); + + expect((worker).connection.extraOptions.blocking).to.be.false; + expect((worker).blockingConnection.extraOptions.blocking).to.be.true; + + connection.disconnect(); + }); + }); + + describe('FlowProducer', () => { + it('uses non-blocking connection', () => { + const flowProducer = new FlowProducer(); + expect((flowProducer).connection.extraOptions.blocking).to.be.false; + }); + + it('shares connection if provided Redis instance', () => { + const connection = new IORedis(); + + const flowProducer = new FlowProducer({ + connection, + }); + expect((flowProducer).connection.extraOptions.shared).to.be.true; + + connection.disconnect(); + }); + }); +}); + describe('connection', () => { const redisHost = process.env.REDIS_HOST || 'localhost'; const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; diff --git a/tests/test_job.ts b/tests/test_job.ts index 0393fe006e..741772b105 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -926,7 +926,7 @@ describe('Job', function () { const job = (await worker.getNextJob(token)) as Job; const id = job.id; await job.moveToFailed(new Error('test error'), '0'); - const sameJob = await queue.getJob(id); + const sameJob = await queue.getJob(id!); expect(sameJob).to.be.ok; expect(sameJob.stacktrace).to.be.not.empty; await worker.close(); diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index b99792ba0e..a9317d6fd3 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -410,10 +410,7 @@ describe('Job Scheduler', function () { key: 'test', iterationCount: 1, name: 'test', - endDate: null, - tz: null, pattern: '*/2 * * * * *', - every: null, next: 1486481042000, template: { data: { @@ -753,11 +750,8 @@ describe('Job Scheduler', function () { key: 'rrule', iterationCount: 1, name: 'rrule', - endDate: null, next: 1486481042000, - tz: null, pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO', - every: null, }); this.clock.tick(nextTick); @@ -1774,10 +1768,8 @@ describe('Job Scheduler', function () { key: 'test', iterationCount: 2, name: 'a', - endDate: null, tz: 'Asia/Calcutta', pattern: '0 * 1 * *', - every: null, next: 1488310200000, template: { data: { @@ -2356,7 +2348,7 @@ describe('Job Scheduler', function () { }); it('should not repeat more than 5 times', async function () { - const date = new Date('2017-02-07 9:24:00'); + const date = new Date('2017-02-07T09:24:00.000+05:30'); this.clock.setSystemTime(date); const nextTick = ONE_SECOND + 500; @@ -2367,6 +2359,18 @@ describe('Job Scheduler', function () { limit: 5, pattern: '*/1 * * * * *', }); + + const scheduler = await queue.getJobScheduler('repeat'); + + expect(scheduler).to.deep.equal({ + key: 'repeat', + iterationCount: 1, + name: 'repeat', + limit: 5, + pattern: '*/1 * * * * *', + next: 1486439641000, + }); + this.clock.tick(nextTick); let counter = 0; diff --git a/tests/test_script_loader.ts b/tests/test_script_loader.ts index e6a7c286e9..297c7c7188 100644 --- a/tests/test_script_loader.ts +++ b/tests/test_script_loader.ts @@ -330,7 +330,7 @@ describe('scriptLoader', () => { //let loader: ScriptLoader; beforeEach(async () => { - connection = new RedisConnection(); + connection = new RedisConnection({}); connection.on('error', () => {}); client = await connection.client; await RedisConnection.waitUntilReady(client);