From bd5de3b81b6b171f6a523dc177f89d48fba672a8 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Thu, 30 Jan 2025 22:54:51 -0600 Subject: [PATCH] refactor: initial work --- src/classes/job-scheduler.ts | 7 ++++-- src/classes/job.ts | 1 + src/classes/scripts.ts | 5 ++++- src/classes/worker.ts | 22 ++++++++++++++----- .../includes/prepareJobForProcessing.lua | 18 ++++++++++++++- src/interfaces/job-json.ts | 14 ++++++++++++ 6 files changed, 57 insertions(+), 10 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 05936cd5d5..cd89e43e80 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -1,5 +1,6 @@ import { parseExpression } from 'cron-parser'; import { + JobRepeatOptsRaw, JobSchedulerJson, JobSchedulerTemplateJson, RedisClient, @@ -33,6 +34,7 @@ export class JobScheduler extends QueueBase { async upsertJobScheduler( jobSchedulerId: string, + rawRepeatOpts: JobRepeatOptsRaw, repeatOpts: Omit, jobName: N, jobData: T, @@ -268,11 +270,11 @@ export class JobScheduler extends QueueBase { return this.transformSchedulerData(key, jobData, next); } - private async transformSchedulerData( + private transformSchedulerData( key: string, jobData: any, next?: number, - ): Promise> { + ): JobSchedulerJson { if (jobData) { const jobSchedulerData: JobSchedulerJson = { key, @@ -314,6 +316,7 @@ export class JobScheduler extends QueueBase { return jobSchedulerData; } + // TODO: remove next line in next breaking change return this.keyToData(key, next); } diff --git a/src/classes/job.ts b/src/classes/job.ts index 3293b619e1..5ed8723e83 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -5,6 +5,7 @@ import { DependenciesOpts, JobJson, JobJsonRaw, + JobRepeatOptsRaw, MinimalJob, MoveToWaitingChildrenOpts, ParentKeys, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 8350c9dff1..a9ff21e430 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -1616,10 +1616,13 @@ export class Scripts { export function raw2NextJobData(raw: any[]) { if (raw) { - const result = [null, raw[1], raw[2], raw[3]]; + const result = [null, raw[1], null, raw[2], raw[3]]; if (raw[0]) { result[0] = array2obj(raw[0]); } + if (raw[2]) { + result[2] = array2obj(raw[2]); + } return result; } return []; diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 58bd3257a7..b75692e37e 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -11,6 +11,7 @@ import { GetNextJobOptions, IoredisListener, JobJsonRaw, + JobRepeatOptsRaw, Processor, RedisClient, Span, @@ -668,11 +669,11 @@ will never work with more accuracy than 1ms. */ token: string, name?: string, ): Promise> { - const [jobData, id, limitUntil, delayUntil] = + const [jobData, id, repeatOpts, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token, name); this.updateDelays(limitUntil, delayUntil); - return this.nextJobFromJobData(jobData, id, token); + return this.nextJobFromJobData(jobData, id, repeatOpts, token); } private async waitForJob( @@ -777,6 +778,7 @@ will never work with more accuracy than 1ms. */ protected async nextJobFromJobData( jobData?: JobJsonRaw, jobId?: string, + repeatOpts?: JobRepeatOptsRaw, token?: string, ): Promise> { if (!jobData) { @@ -796,6 +798,7 @@ will never work with more accuracy than 1ms. */ const jobScheduler = await this.jobScheduler; await jobScheduler.upsertJobScheduler( job.repeatJobKey, + repeatOpts, job.opts.repeat, job.name, job.data, @@ -851,10 +854,11 @@ will never work with more accuracy than 1ms. */ [TelemetryAttributes.JobResult]: JSON.stringify(result), }); - const [jobData, jobId, limitUntil, delayUntil] = completed || []; + const [jobData, jobId, repeatOpts, limitUntil, delayUntil] = + completed || []; this.updateDelays(limitUntil, delayUntil); - return this.nextJobFromJobData(jobData, jobId, token); + return this.nextJobFromJobData(jobData, jobId, repeatOpts, token); } }; @@ -886,9 +890,15 @@ will never work with more accuracy than 1ms. */ }); if (result) { - const [jobData, jobId, limitUntil, delayUntil] = result; + const [jobData, jobId, repeatOpts, limitUntil, delayUntil] = + result; this.updateDelays(limitUntil, delayUntil); - return this.nextJobFromJobData(jobData, jobId, token); + return this.nextJobFromJobData( + jobData, + jobId, + repeatOpts, + token, + ); } } catch (err) { this.emit('error', err); diff --git a/src/commands/includes/prepareJobForProcessing.lua b/src/commands/includes/prepareJobForProcessing.lua index 7bceac146e..8f94a24603 100644 --- a/src/commands/includes/prepareJobForProcessing.lua +++ b/src/commands/includes/prepareJobForProcessing.lua @@ -10,6 +10,14 @@ -- Includes --- @include "addBaseMarkerIfNeeded" +local function getRepeatJobKey(jobAttributes) + for i = 1, #jobAttributes, 2 do + if jobAttributes[i] == "rjk" then + return jobAttributes[i + 1] + end + end +end + local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey, jobId, processedOn, maxJobs, markerKey, opts) local jobKey = keyPrefix .. jobId @@ -46,5 +54,13 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey addBaseMarkerIfNeeded(markerKey, false) - return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data + local jobAttributes = rcall("HGETALL", jobKey) + + local repeatJobKey = getRepeatJobKey(jobAttributes) + + if repeatJobKey then + return {jobAttributes, jobId, rcall("HGETALL", keyPrefix .. "repeat:" .. repeatJobKey), 0, 0} + end + + return {jobAttributes, jobId, 0, 0, 0} -- get job data end diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index e27c302432..5bd755fb26 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -47,3 +47,17 @@ export interface JobJsonRaw { ats?: string; pb?: string; // Worker name } +// TODO: Evaluate tm value + +export interface JobRepeatOptsRaw { + tz?: string; + limit?: string; + pattern?: string; + endDate?: string; + every?: string; + opts?: string; + data?: string; + name: string; + ic: string; + prevM: number; +}