Skip to content

Commit

Permalink
fix(worker): fixes deadlock where resource Worker runs nested db txns (
Browse files Browse the repository at this point in the history
  • Loading branch information
magaldima authored Feb 7, 2025
1 parent 4f9467b commit e99f200
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 58 deletions.
31 changes: 15 additions & 16 deletions informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (i *controllerInformer[T]) Work(ctx context.Context, job *river.Job[InformA
Limit: firstNonZero(job.Args.Options.Limit),
}

informStart := time.Now()

informFunc := func(ctx context.Context) error {
timeout := i.informer.InformTimeout(&job.Args)
if timeout == 0 {
Expand All @@ -156,26 +158,16 @@ func (i *controllerInformer[T]) Work(ctx context.Context, job *river.Job[InformA
}

numObjects := 0
var lastObjectCreatedAt time.Time
lastInformTimestamp := informStart
for {
select {
case obj, ok := <-queue:
if !ok {
// happy path: successfully informed all resources
if !job.Args.ProcessExisting && numObjects > 0 {
if !job.Args.ProcessExisting {
// only update the controller last inform time if we aren't processing existing resources.
// and we have informed at least one resource.
//
// default to use the current time but use the last object created time if it's before the current time
// to ensure complete coverage of resources in the case of potential race conditions
// e.g. situations where a resource is created after our informer finishes but before this
// point is reached.
lastInformTime := time.Now()
if !lastObjectCreatedAt.IsZero() && lastObjectCreatedAt.Before(lastInformTime) {
lastInformTime = lastObjectCreatedAt
}
err := queries.ControllerSetLastInformTime(ctx, &sqlc.ControllerSetLastInformTimeParams{
LastInformTime: lastInformTime,
LastInformTime: lastInformTimestamp,
Name: controller.Name,
})
if err != nil {
Expand All @@ -185,8 +177,8 @@ func (i *controllerInformer[T]) Work(ctx context.Context, job *river.Job[InformA
return nil
}
if objWithCreatedAt, ok := Object(obj).(ObjectWithCreatedAt); ok {
if objWithCreatedAt.CreatedAt().After(lastObjectCreatedAt) {
lastObjectCreatedAt = objWithCreatedAt.CreatedAt()
if objWithCreatedAt.CreatedAt().After(lastInformTimestamp) {
lastInformTimestamp = objWithCreatedAt.CreatedAt()
}
}
numObjects++
Expand Down Expand Up @@ -295,6 +287,9 @@ func (i *controllerInformer[T]) processObject(ctx context.Context, object T, arg
Queue: object.Kind(),
Tags: resourceRow.Tags,
Metadata: resourceRow.Metadata,
UniqueOpts: river.UniqueOpts{
ByArgs: true,
},
})
if err != nil {
return err
Expand Down Expand Up @@ -343,6 +338,9 @@ func (i *controllerInformer[T]) processObject(ctx context.Context, object T, arg
Queue: object.Kind(),
Tags: resourceRow.Tags,
Metadata: resourceRow.Metadata,
UniqueOpts: river.UniqueOpts{
ByArgs: true,
},
})
if err != nil {
return fmt.Errorf("failed to enqueue resource: %w", err)
Expand All @@ -366,7 +364,8 @@ func (i *controllerInformer[T]) compareObjects(object T, resource deltatype.Reso
}

// check hash
// we lose ordering with the hash (that's why we prefer the object comparison)
// we lose chronological ordering with the hash (that's why we prefer the object comparison)
// TODO: use deterministic ordering of fields to ensure consistent hash
objBytes, err := json.Marshal(object)
if err != nil {
return 0, err
Expand Down
64 changes: 22 additions & 42 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[

// first check that the resource still exists in the DB
// if it doesn't, we don't want to re-work it and instead can cancel the job
//
// This query is a SELECT FOR UPDATE, which locks the row for the duration of the transaction.
// Row-level locks do not affect data querying; they block only writers and lockers to the same row.
sqlcRow, err := queries.ResourceUpdateAndGetByObjectIDAndKind(ctx, &sqlc.ResourceUpdateAndGetByObjectIDAndKindParams{
Expand All @@ -92,7 +93,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
resourceRow := toResourceRow(sqlcRow)

// should we use the DB resource row or the job.Args resource?
logger.DebugContext(ctx, "working resource", "id", resourceRow.ID, "resource_id", resourceRow.ID, "resource_kind", resourceRow.Kind, "attempt", resourceRow.Attempt)
logger.DebugContext(ctx, "working resource", "id", resourceRow.ID, "resource_id", resourceRow.ID, "resource_kind", resourceRow.Kind(), "attempt", resourceRow.Attempt)

object := w.factory.Make(&resourceRow)
if err := object.UnmarshalResource(); err != nil {
Expand All @@ -119,16 +120,16 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
}

// do the work!
err = workFunc(ctx)
if err != nil {
wErr := workFunc(ctx)
if wErr != nil {
// handle resource delete error
deleteErr := new(ResourceDeleteError)
if errors.Is(err, deleteErr) {
if errors.Is(wErr, deleteErr) {
now := time.Now().UTC()
errorData, err := json.Marshal(deltatype.AttemptError{
At: now,
Attempt: resourceRow.Attempt,
Error: err.Error(),
Error: wErr.Error(),
})
if err != nil {
return fmt.Errorf("error marshaling error JSON: %w", err)
Expand All @@ -143,16 +144,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
Column6: errorData,
})
if derr != nil {
// handle no rows (since it's possible the delta resource record was deleted during the Work() call)
if errors.Is(derr, pgx.ErrNoRows) {
if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return river.JobCancel(fmt.Errorf("resource %s:%s no longer exists: %w", resource.Object.Kind(), resource.Object.ID(), err))
}
return fmt.Errorf("failed to set resource state to deleted: %w", derr)
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

deletedRow := toResourceRow(deleted)
client.eventCh <- []Event{
{
Expand All @@ -162,25 +160,21 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
},
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return river.JobCancel(fmt.Errorf("resource deleted: %w", err))
return river.JobCancel(fmt.Errorf("resource deleted: %w", wErr))
}

state := sqlc.DeltaResourceStateFailed
if job.Attempt >= job.MaxAttempts {
state = sqlc.DeltaResourceStateDegraded
}

logger.WarnContext(ctx, "resource failed", "attempt", resourceRow.Attempt, "state", state, "error", err)
logger.WarnContext(ctx, "resource failed", "attempt", resourceRow.Attempt, "state", state, "error", wErr)

now := time.Now().UTC()
errorData, err := json.Marshal(deltatype.AttemptError{
At: now,
Attempt: resourceRow.Attempt,
Error: err.Error(),
Error: wErr.Error(),
})
if err != nil {
return fmt.Errorf("error marshaling error JSON: %w", err)
Expand All @@ -196,16 +190,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
Column6: errorData,
})
if uerr != nil {
// handle no rows (since it's possible the delta resource record was deleted during the Work() call)
if errors.Is(uerr, pgx.ErrNoRows) {
if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return river.JobCancel(fmt.Errorf("resource %s:%s no longer exists: %w", resource.Object.Kind(), resource.Object.ID(), err))
}
return uerr
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

failedRow := toResourceRow(failed)
client.eventCh <- []Event{
{
Expand All @@ -215,11 +206,7 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
},
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return err
return wErr
}

now := time.Now()
Expand All @@ -231,16 +218,13 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
SyncedAt: &now,
})
if err != nil {
// handle no rows (since it's possible the delta resource record was deleted during the Work() call)
if errors.Is(err, pgx.ErrNoRows) {
if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return river.JobCancel(fmt.Errorf("resource %s:%s no longer exists: %w", resource.Object.Kind(), resource.Object.ID(), err))
}
return err
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

syncedRow := toResourceRow(synced)
client.eventCh <- []Event{
{
Expand All @@ -250,10 +234,6 @@ func (w *controllerWorker[T]) Work(ctx context.Context, job *river.Job[Resource[
},
}

if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

logger.InfoContext(ctx, "finished working resource", "id", resource.ID, "resource_id", resource.Object.ID(), "resource_kind", resource.Object.Kind())

return nil
Expand Down

0 comments on commit e99f200

Please sign in to comment.