Skip to content

Commit

Permalink
fix(retry-job): consider updating failures in job (#3036)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jan 30, 2025
1 parent 0ff27ad commit 21e8495
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 21 deletions.
13 changes: 8 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
KeepJobs,
MoveToDelayedOpts,
RepeatableOptions,
RetryJobOpts,
} from '../interfaces';
import {
JobState,
Expand Down Expand Up @@ -1165,7 +1166,7 @@ export class Scripts {
jobId: string,
lifo: boolean,
token: string,
fieldsToUpdate?: Record<string, any>,
opts: MoveToDelayedOpts = {},
): (string | number | Buffer)[] {
const keys: (string | number | Buffer)[] = [
this.queue.keys.active,
Expand All @@ -1189,19 +1190,21 @@ export class Scripts {
pushCmd,
jobId,
token,
fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
opts.fieldsToUpdate
? pack(objectToFlatArray(opts.fieldsToUpdate))
: void 0,
]);
}

async retryJob(
jobId: string,
lifo: boolean,
token: string,
fieldsToUpdate?: Record<string, any>,
token = '0',
opts: RetryJobOpts = {},
): Promise<void> {
const client = await this.queue.client;

const args = this.retryJobArgs(jobId, lifo, token, fieldsToUpdate);
const args = this.retryJobArgs(jobId, lifo, token, opts);
const result = await this.execCommand(client, 'retryJob', args);
if (result < 0) {
throw this.finishedErrors({
Expand Down
10 changes: 5 additions & 5 deletions src/commands/includes/updateJobFields.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
Function to update a bunch of fields in a job.
]]
local function updateJobFields(jobKey, msgpackedFields)
if msgpackedFields and #msgpackedFields > 0 then
local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
if fieldsToUpdate then
redis.call("HMSET", jobKey, unpack(fieldsToUpdate))
end
if msgpackedFields and #msgpackedFields > 0 then
local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
if fieldsToUpdate then
rcall("HMSET", jobKey, unpack(fieldsToUpdate))
end
end
end
16 changes: 11 additions & 5 deletions src/commands/retryJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ARGV[3] pushCmd
ARGV[4] jobId
ARGV[5] token
ARGV[6] optional job fields to update
Events:
'waiting'
Expand All @@ -36,9 +37,10 @@ local rcall = redis.call
--- @include "includes/addJobWithPriority"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/isQueuePausedOrMaxed"
--- @include "includes/promoteDelayedJobs"
--- @include "includes/removeLock"
--- @include "includes/isQueuePausedOrMaxed"
--- @include "includes/updateJobFields"

local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3])
local markerKey = KEYS[10]
Expand All @@ -47,16 +49,20 @@ local markerKey = KEYS[10]
-- test example: when there are delayed jobs between retries
promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed)

if rcall("EXISTS", KEYS[4]) == 1 then
local errorCode = removeLock(KEYS[4], KEYS[11], ARGV[5], ARGV[4])
local jobKey = KEYS[4]

if rcall("EXISTS", jobKey) == 1 then
local errorCode = removeLock(jobKey, KEYS[11], ARGV[5], ARGV[4])
if errorCode < 0 then
return errorCode
end

updateJobFields(jobKey, ARGV[6])

local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[4])
if (numRemovedElements < 1) then return -3 end

local priority = tonumber(rcall("HGET", KEYS[4], "priority")) or 0
local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0

--need to re-evaluate after removing job from active
isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1])
Expand All @@ -68,7 +74,7 @@ if rcall("EXISTS", KEYS[4]) == 1 then
addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed)
end

rcall("HINCRBY", KEYS[4], "atm", 1)
rcall("HINCRBY", jobKey, "atm", 1)

local maxEvents = getOrSetMaxEvents(KEYS[5])

Expand Down
4 changes: 4 additions & 0 deletions src/interfaces/minimal-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export interface MoveToDelayedOpts {
fieldsToUpdate?: Record<string, any>;
}

export interface RetryJobOpts {
fieldsToUpdate?: Record<string, any>;
}

export interface MoveToWaitingChildrenOpts {
child?: {
id: string;
Expand Down
12 changes: 6 additions & 6 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2698,14 +2698,14 @@ describe('workers', function () {
});
});

describe('when job has been marked as discarded', () => {
it('does not retry a job', async () => {
describe('when job has been failed and moved to wait', () => {
it('saves failedReason', async () => {
const worker = new Worker(
queueName,
async job => {
expect(job.attemptsMade).to.equal(0);
job.discard();
throw new Error('unrecoverable error');
await queue.rateLimit(5000);
throw new Error('error');
},
{ connection, prefix },
);
Expand All @@ -2724,9 +2724,9 @@ describe('workers', function () {
worker.on('failed', resolve);
});

const state = await job.getState();
const updatedJob = await queue.getJob(job.id!);

expect(state).to.be.equal('failed');
expect(updatedJob.failedReason).to.be.equal('error');

await worker.close();
});
Expand Down

0 comments on commit 21e8495

Please sign in to comment.