Skip to content

Commit

Permalink
refactor: initial work
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 31, 2025
1 parent 0ff27ad commit bd5de3b
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 10 deletions.
7 changes: 5 additions & 2 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { parseExpression } from 'cron-parser';
import {
JobRepeatOptsRaw,
JobSchedulerJson,
JobSchedulerTemplateJson,
RedisClient,
Expand Down Expand Up @@ -33,6 +34,7 @@ export class JobScheduler extends QueueBase {

async upsertJobScheduler<T = any, R = any, N extends string = string>(
jobSchedulerId: string,
rawRepeatOpts: JobRepeatOptsRaw,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis'>,
jobName: N,
jobData: T,
Expand Down Expand Up @@ -268,11 +270,11 @@ export class JobScheduler extends QueueBase {
return this.transformSchedulerData<D>(key, jobData, next);
}

private async transformSchedulerData<D>(
private transformSchedulerData<D>(
key: string,
jobData: any,
next?: number,
): Promise<JobSchedulerJson<D>> {
): JobSchedulerJson<D> {
if (jobData) {
const jobSchedulerData: JobSchedulerJson<D> = {
key,
Expand Down Expand Up @@ -314,6 +316,7 @@ export class JobScheduler extends QueueBase {
return jobSchedulerData;
}

// TODO: remove next line in next breaking change
return this.keyToData(key, next);
}

Expand Down
1 change: 1 addition & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
DependenciesOpts,
JobJson,
JobJsonRaw,
JobRepeatOptsRaw,
MinimalJob,
MoveToWaitingChildrenOpts,
ParentKeys,
Expand Down
5 changes: 4 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1616,10 +1616,13 @@ export class Scripts {

export function raw2NextJobData(raw: any[]) {
if (raw) {
const result = [null, raw[1], raw[2], raw[3]];
const result = [null, raw[1], null, raw[2], raw[3]];
if (raw[0]) {
result[0] = array2obj(raw[0]);
}
if (raw[2]) {
result[2] = array2obj(raw[2]);
}
return result;
}
return [];
Expand Down
22 changes: 16 additions & 6 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
GetNextJobOptions,
IoredisListener,
JobJsonRaw,
JobRepeatOptsRaw,
Processor,
RedisClient,
Span,
Expand Down Expand Up @@ -668,11 +669,11 @@ will never work with more accuracy than 1ms. */
token: string,
name?: string,
): Promise<Job<DataType, ResultType, NameType>> {
const [jobData, id, limitUntil, delayUntil] =
const [jobData, id, repeatOpts, limitUntil, delayUntil] =
await this.scripts.moveToActive(client, token, name);
this.updateDelays(limitUntil, delayUntil);

return this.nextJobFromJobData(jobData, id, token);
return this.nextJobFromJobData(jobData, id, repeatOpts, token);
}

private async waitForJob(
Expand Down Expand Up @@ -777,6 +778,7 @@ will never work with more accuracy than 1ms. */
protected async nextJobFromJobData(
jobData?: JobJsonRaw,
jobId?: string,
repeatOpts?: JobRepeatOptsRaw,
token?: string,
): Promise<Job<DataType, ResultType, NameType>> {
if (!jobData) {
Expand All @@ -796,6 +798,7 @@ will never work with more accuracy than 1ms. */
const jobScheduler = await this.jobScheduler;
await jobScheduler.upsertJobScheduler(
job.repeatJobKey,
repeatOpts,
job.opts.repeat,
job.name,
job.data,
Expand Down Expand Up @@ -851,10 +854,11 @@ will never work with more accuracy than 1ms. */
[TelemetryAttributes.JobResult]: JSON.stringify(result),
});

const [jobData, jobId, limitUntil, delayUntil] = completed || [];
const [jobData, jobId, repeatOpts, limitUntil, delayUntil] =
completed || [];
this.updateDelays(limitUntil, delayUntil);

return this.nextJobFromJobData(jobData, jobId, token);
return this.nextJobFromJobData(jobData, jobId, repeatOpts, token);
}
};

Expand Down Expand Up @@ -886,9 +890,15 @@ will never work with more accuracy than 1ms. */
});

if (result) {
const [jobData, jobId, limitUntil, delayUntil] = result;
const [jobData, jobId, repeatOpts, limitUntil, delayUntil] =
result;
this.updateDelays(limitUntil, delayUntil);
return this.nextJobFromJobData(jobData, jobId, token);
return this.nextJobFromJobData(
jobData,
jobId,
repeatOpts,
token,
);
}
} catch (err) {
this.emit('error', <Error>err);
Expand Down
18 changes: 17 additions & 1 deletion src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
-- Includes
--- @include "addBaseMarkerIfNeeded"

local function getRepeatJobKey(jobAttributes)
for i = 1, #jobAttributes, 2 do
if jobAttributes[i] == "rjk" then
return jobAttributes[i + 1]
end
end
end

local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
jobId, processedOn, maxJobs, markerKey, opts)
local jobKey = keyPrefix .. jobId
Expand Down Expand Up @@ -46,5 +54,13 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey

addBaseMarkerIfNeeded(markerKey, false)

return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
local jobAttributes = rcall("HGETALL", jobKey)

local repeatJobKey = getRepeatJobKey(jobAttributes)

if repeatJobKey then
return {jobAttributes, jobId, rcall("HGETALL", keyPrefix .. "repeat:" .. repeatJobKey), 0, 0}
end

return {jobAttributes, jobId, 0, 0, 0} -- get job data
end
14 changes: 14 additions & 0 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,17 @@ export interface JobJsonRaw {
ats?: string;
pb?: string; // Worker name
}
// TODO: Evaluate tm value

export interface JobRepeatOptsRaw {
tz?: string;
limit?: string;
pattern?: string;
endDate?: string;
every?: string;
opts?: string;
data?: string;
name: string;
ic: string;
prevM: number;
}

0 comments on commit bd5de3b

Please sign in to comment.