Skip to content

Commit

Permalink
Merge pull request #2 from integration-system/reschedule_job
Browse files Browse the repository at this point in the history
Reschedule job
  • Loading branch information
Naiks10 authored Jan 9, 2023
2 parents 554b47d + 6c7db0b commit 12d9ae2
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 0 deletions.
12 changes: 12 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type Tx interface {
Job() Job
Update(ctx context.Context, id string, attempt int32, lastError string, nextRunAt int64) error
UpdateNextRun(ctx context.Context, id string, nextRunAt int64) error
Delete(ctx context.Context, id string) error
SaveInDlq(ctx context.Context, job Job) error
}
Expand Down Expand Up @@ -98,6 +99,17 @@ func (c *Client) jobTx(ctx context.Context, tx Tx, f func(ctx context.Context, j
}
}

if result.reschedule {
err := tx.UpdateNextRun(
ctx,
job.Id,
timeNow().Add(result.rescheduleDelay).Unix(),
)
if err != nil {
return fmt.Errorf("update job: %w", err)
}
}

return nil
}

Expand Down
38 changes: 38 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,44 @@ func TestClient_DoRetry(t *testing.T) {
require.True(errors.Is(err, sql.ErrNoRows))
}

func TestClient_DoReschedule(t *testing.T) {
require, db, cli := prepareTest(t)

req := bgjob.EnqueueRequest{
Id: "123",
Queue: "name",
Type: "test",
Arg: []byte(`{"simpleJson": 1}`),
}
err := cli.Enqueue(context.Background(), req)
require.NoError(err)
err = cli.Do(context.Background(), "name", func(ctx context.Context, job bgjob.Job) bgjob.Result {
return bgjob.Reschedule(0)
})
require.NoError(err)

err = cli.Do(context.Background(), "name", func(ctx context.Context, job bgjob.Job) bgjob.Result {
require.EqualValues(1, job.Attempt)
return bgjob.Reschedule(5 * time.Second)
})
require.NoError(err)

err = cli.Do(context.Background(), "name", func(ctx context.Context, job bgjob.Job) bgjob.Result {
return bgjob.Complete()
})
require.EqualValues(bgjob.ErrEmptyQueue, err)

time.Sleep(5 * time.Second)
err = cli.Do(context.Background(), "name", func(ctx context.Context, job bgjob.Job) bgjob.Result {
require.EqualValues(1, job.Attempt)
return bgjob.Complete()
})
require.NoError(err)

_, err = getJob(db.DB, "123")
require.True(errors.Is(err, sql.ErrNoRows))
}

func TestClient_DoDlq(t *testing.T) {
require, db, cli := prepareTest(t)

Expand Down
4 changes: 4 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
type Observer interface {
JobStarted(ctx context.Context, job Job)
JobCompleted(ctx context.Context, job Job)
JobRescheduled(ctx context.Context, job Job, after time.Duration)
JobWillBeRetried(ctx context.Context, job Job, after time.Duration, err error)
JobMovedToDlq(ctx context.Context, job Job, err error)
QueueIsEmpty(ctx context.Context)
Expand All @@ -26,6 +27,9 @@ func (n NoopObserver) JobCompleted(ctx context.Context, job Job) {
func (n NoopObserver) JobWillBeRetried(ctx context.Context, job Job, after time.Duration, err error) {
}

func (n NoopObserver) JobRescheduled(ctx context.Context, job Job, after time.Duration) {
}

func (n NoopObserver) JobMovedToDlq(ctx context.Context, job Job, err error) {
}

Expand Down
6 changes: 6 additions & 0 deletions pg_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (p *pgTx) Update(ctx context.Context, id string, attempt int32, lastError s
return err
}

func (p *pgTx) UpdateNextRun(ctx context.Context, id string, nextRunAt int64) error {
query := "UPDATE bgjob_job SET next_run_at = $1, updated_at = $2 WHERE id = $3"
_, err := p.tx.ExecContext(ctx, query, nextRunAt, timeNow(), id)
return err
}

func (p *pgTx) Delete(ctx context.Context, id string) error {
query := `DELETE FROM bgjob_job WHERE id = $1`
_, err := p.tx.ExecContext(ctx, query, id)
Expand Down
7 changes: 7 additions & 0 deletions result.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type Result struct {
moveToDlq bool
retry bool
retryDelay time.Duration

reschedule bool
rescheduleDelay time.Duration
}

func Complete() Result {
Expand All @@ -23,3 +26,7 @@ func Retry(after time.Duration, err error) Result {
func MoveToDlq(err error) Result {
return Result{moveToDlq: true, err: err}
}

func Reschedule(after time.Duration) Result {
return Result{reschedule: true, rescheduleDelay: after}
}
3 changes: 3 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (w *Worker) run(ctx context.Context) {
if lastResult.moveToDlq {
w.observer.JobMovedToDlq(ctx, lastJob, lastResult.err)
}
if lastResult.reschedule {
w.observer.JobRescheduled(ctx, lastJob, lastResult.rescheduleDelay)
}
}
}

Expand Down
35 changes: 35 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,36 @@ func TestWorker_Observer(t *testing.T) {
require.EqualValues(0, atomic.LoadInt32(&observer.workerError))
}

func TestWorker_Observer_Reschedule(t *testing.T) {
require, _, cli := prepareTest(t)

observer := &observerCounter{}

rescheduled := int32(0)
w := bgjob.NewWorker(cli, "test", bgjob.HandlerFunc(func(ctx context.Context, job bgjob.Job) bgjob.Result {
if atomic.LoadInt32(&rescheduled) == 3 {
return bgjob.Complete()
}
atomic.AddInt32(&rescheduled, 1)
return bgjob.Reschedule(1 * time.Second)
}), bgjob.WithObserver(observer))
w.Run(context.Background())
time.Sleep(1 * time.Second) //trigger queue is empty

err := cli.Enqueue(context.Background(), bgjob.EnqueueRequest{
Type: "reschedule_me",
Queue: "test",
})
require.NoError(err)
time.Sleep(5 * time.Second)

require.EqualValues(1, atomic.LoadInt32(&observer.jobCompeted))
require.EqualValues(3, atomic.LoadInt32(&observer.jobRescheduled))
require.EqualValues(4, atomic.LoadInt32(&observer.jobStarted))
require.GreaterOrEqual(atomic.LoadInt32(&observer.queueIsEmpty), int32(1))
require.EqualValues(0, atomic.LoadInt32(&observer.workerError))
}

func TestWorker_PollInterval(t *testing.T) {
require, _, cli := prepareTest(t)
observer := &observerCounter{}
Expand Down Expand Up @@ -187,6 +217,7 @@ type observerCounter struct {
jobStarted int32
jobCompeted int32
jobWillBeRetried int32
jobRescheduled int32
jobMovedToDlq int32
queueIsEmpty int32
workerError int32
Expand All @@ -204,6 +235,10 @@ func (o *observerCounter) JobWillBeRetried(ctx context.Context, job bgjob.Job, a
atomic.AddInt32(&o.jobWillBeRetried, 1)
}

func (o *observerCounter) JobRescheduled(ctx context.Context, job bgjob.Job, after time.Duration) {
atomic.AddInt32(&o.jobRescheduled, 1)
}

func (o *observerCounter) JobMovedToDlq(ctx context.Context, job bgjob.Job, err error) {
atomic.AddInt32(&o.jobMovedToDlq, 1)
}
Expand Down

0 comments on commit 12d9ae2

Please sign in to comment.