From e99f2009ae23707b6a51acf80dbceaaf36436330 Mon Sep 17 00:00:00 2001 From: Matthew Magaldi Date: Fri, 7 Feb 2025 15:22:17 -0500 Subject: [PATCH] fix(worker): fixes deadlock where resource Worker runs nested db txns (#12) --- informer.go | 31 +++++++++++++------------- worker.go | 64 ++++++++++++++++++----------------------------------- 2 files changed, 37 insertions(+), 58 deletions(-) diff --git a/informer.go b/informer.go index 13b52b2..1b9e0d1 100644 --- a/informer.go +++ b/informer.go @@ -137,6 +137,8 @@ func (i *controllerInformer[T]) Work(ctx context.Context, job *river.Job[InformA Limit: firstNonZero(job.Args.Options.Limit), } + informStart := time.Now() + informFunc := func(ctx context.Context) error { timeout := i.informer.InformTimeout(&job.Args) if timeout == 0 { @@ -156,26 +158,16 @@ func (i *controllerInformer[T]) Work(ctx context.Context, job *river.Job[InformA } numObjects := 0 - var lastObjectCreatedAt time.Time + lastInformTimestamp := informStart for { select { case obj, ok := <-queue: if !ok { // happy path: successfully informed all resources - if !job.Args.ProcessExisting && numObjects > 0 { + if !job.Args.ProcessExisting { // only update the controller last inform time if we aren't processing existing resources. - // and we have informed at least one resource. - // - // default to use the current time but use the last object created time if it's before the current time - // to ensure complete coverage of resources in the case of potential race conditions - // e.g. situations where a resource is created after our informer finishes but before this - // point is reached. - lastInformTime := time.Now() - if !lastObjectCreatedAt.IsZero() && lastObjectCreatedAt.Before(lastInformTime) { - lastInformTime = lastObjectCreatedAt - } err := queries.ControllerSetLastInformTime(ctx, &sqlc.ControllerSetLastInformTimeParams{ - LastInformTime: lastInformTime, + LastInformTime: lastInformTimestamp, Name: controller.Name, }) if err != nil { @@ -185,8 +177,8 @@ func (i *controllerInformer[T]) Work(ctx context.Context, job *river.Job[InformA return nil } if objWithCreatedAt, ok := Object(obj).(ObjectWithCreatedAt); ok { - if objWithCreatedAt.CreatedAt().After(lastObjectCreatedAt) { - lastObjectCreatedAt = objWithCreatedAt.CreatedAt() + if objWithCreatedAt.CreatedAt().After(lastInformTimestamp) { + lastInformTimestamp = objWithCreatedAt.CreatedAt() } } numObjects++ @@ -295,6 +287,9 @@ func (i *controllerInformer[T]) processObject(ctx context.Context, object T, arg Queue: object.Kind(), Tags: resourceRow.Tags, Metadata: resourceRow.Metadata, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + }, }) if err != nil { return err @@ -343,6 +338,9 @@ func (i *controllerInformer[T]) processObject(ctx context.Context, object T, arg Queue: object.Kind(), Tags: resourceRow.Tags, Metadata: resourceRow.Metadata, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + }, }) if err != nil { return fmt.Errorf("failed to enqueue resource: %w", err) @@ -366,7 +364,8 @@ func (i *controllerInformer[T]) compareObjects(object T, resource deltatype.Reso } // check hash - // we lose ordering with the hash (that's why we prefer the object comparison) + // we lose chronological ordering with the hash (that's why we prefer the object comparison) + // TODO: use deterministic ordering of fields to ensure consistent hash objBytes, err := json.Marshal(object) if err != nil { return 0, err diff --git a/worker.go b/worker.go index 568cd47..d31ad65 100644 --- a/worker.go +++ b/worker.go @@ -73,6 +73,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ // first check that the resource still exists in the DB // if it doesn't, we don't want to re-work it and instead can cancel the job + // // This query is a SELECT FOR UPDATE, which locks the row for the duration of the transaction. // Row-level locks do not affect data querying; they block only writers and lockers to the same row. sqlcRow, err := queries.ResourceUpdateAndGetByObjectIDAndKind(ctx, &sqlc.ResourceUpdateAndGetByObjectIDAndKindParams{ @@ -92,7 +93,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ resourceRow := toResourceRow(sqlcRow) // should we use the DB resource row or the job.Args resource? - logger.DebugContext(ctx, "working resource", "id", resourceRow.ID, "resource_id", resourceRow.ID, "resource_kind", resourceRow.Kind, "attempt", resourceRow.Attempt) + logger.DebugContext(ctx, "working resource", "id", resourceRow.ID, "resource_id", resourceRow.ID, "resource_kind", resourceRow.Kind(), "attempt", resourceRow.Attempt) object := w.factory.Make(&resourceRow) if err := object.UnmarshalResource(); err != nil { @@ -119,16 +120,16 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ } // do the work! - err = workFunc(ctx) - if err != nil { + wErr := workFunc(ctx) + if wErr != nil { // handle resource delete error deleteErr := new(ResourceDeleteError) - if errors.Is(err, deleteErr) { + if errors.Is(wErr, deleteErr) { now := time.Now().UTC() errorData, err := json.Marshal(deltatype.AttemptError{ At: now, Attempt: resourceRow.Attempt, - Error: err.Error(), + Error: wErr.Error(), }) if err != nil { return fmt.Errorf("error marshaling error JSON: %w", err) @@ -143,16 +144,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ Column6: errorData, }) if derr != nil { - // handle no rows (since it's possible the delta resource record was deleted during the Work() call) - if errors.Is(derr, pgx.ErrNoRows) { - if err = tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return river.JobCancel(fmt.Errorf("resource %s:%s no longer exists: %w", resource.Object.Kind(), resource.Object.ID(), err)) - } return fmt.Errorf("failed to set resource state to deleted: %w", derr) } + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + deletedRow := toResourceRow(deleted) client.eventCh <- []Event{ { @@ -162,11 +160,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ }, } - if err = tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - - return river.JobCancel(fmt.Errorf("resource deleted: %w", err)) + return river.JobCancel(fmt.Errorf("resource deleted: %w", wErr)) } state := sqlc.DeltaResourceStateFailed @@ -174,13 +168,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ state = sqlc.DeltaResourceStateDegraded } - logger.WarnContext(ctx, "resource failed", "attempt", resourceRow.Attempt, "state", state, "error", err) + logger.WarnContext(ctx, "resource failed", "attempt", resourceRow.Attempt, "state", state, "error", wErr) now := time.Now().UTC() errorData, err := json.Marshal(deltatype.AttemptError{ At: now, Attempt: resourceRow.Attempt, - Error: err.Error(), + Error: wErr.Error(), }) if err != nil { return fmt.Errorf("error marshaling error JSON: %w", err) @@ -196,16 +190,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ Column6: errorData, }) if uerr != nil { - // handle no rows (since it's possible the delta resource record was deleted during the Work() call) - if errors.Is(uerr, pgx.ErrNoRows) { - if err = tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return river.JobCancel(fmt.Errorf("resource %s:%s no longer exists: %w", resource.Object.Kind(), resource.Object.ID(), err)) - } return uerr } + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + failedRow := toResourceRow(failed) client.eventCh <- []Event{ { @@ -215,11 +206,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ }, } - if err = tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - - return err + return wErr } now := time.Now() @@ -231,16 +218,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ SyncedAt: &now, }) if err != nil { - // handle no rows (since it's possible the delta resource record was deleted during the Work() call) - if errors.Is(err, pgx.ErrNoRows) { - if err = tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return river.JobCancel(fmt.Errorf("resource %s:%s no longer exists: %w", resource.Object.Kind(), resource.Object.ID(), err)) - } return err } + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + syncedRow := toResourceRow(synced) client.eventCh <- []Event{ { @@ -250,10 +234,6 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[ }, } - if err = tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - logger.InfoContext(ctx, "finished working resource", "id", resource.ID, "resource_id", resource.Object.ID(), "resource_kind", resource.Object.Kind()) return nil