Skip to content

Commit

Permalink
Add DeadlineEnforcingBlobAccess
Browse files Browse the repository at this point in the history
Sometimes it may be desirable to place an upper bound on the amount of
time incoming RPCs to bb-storage may take. For example, to reject writes
that are taking an excessive amount of time to complete. This change
adds a decorator that can be used to enforce this.
  • Loading branch information
EdSchouten committed Feb 17, 2025
1 parent 2600f22 commit c93a48e
Show file tree
Hide file tree
Showing 5 changed files with 689 additions and 484 deletions.
1 change: 1 addition & 0 deletions pkg/blobstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"authorizing_blob_access.go",
"blob_access.go",
"cas_read_buffer_factory.go",
"deadline_enforcing_blob_access.go",
"demultiplexing_blob_access.go",
"empty_blob_injecting_blob_access.go",
"error_blob_access.go",
Expand Down
15 changes: 15 additions & 0 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,21 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
BlobAccess: blobAccess,
DigestKeyFormat: digestKeyFormat,
}, "zip_writing", nil
case *pb.BlobAccessConfiguration_DeadlineEnforcing:
base, err := nc.NewNestedBlobAccess(backend.DeadlineEnforcing.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}

timeout := backend.DeadlineEnforcing.Timeout
if err := timeout.CheckValid(); err != nil {
return BlobAccessInfo{}, "", util.StatusWrap(err, "Invalid timeout for deadline enforcement")
}

return BlobAccessInfo{
BlobAccess: blobstore.NewDeadlineEnforcingBlobAccess(base.BlobAccess, timeout.AsDuration()),
DigestKeyFormat: base.DigestKeyFormat,
}, "deadline_enforcing", nil
}
return creator.NewCustomBlobAccess(configuration, nc)
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/blobstore/deadline_enforcing_blob_access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package blobstore

import (
"context"
"time"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"
)

type deadlineEnforcingBlobAccess struct {
delegate BlobAccess
timeout time.Duration
}

// NewDeadlineEnforcingBlobAccess creates a decorator for BlobAccess
// that enforces execution timeouts.
func NewDeadlineEnforcingBlobAccess(delegate BlobAccess, timeout time.Duration) BlobAccess {
return &deadlineEnforcingBlobAccess{
delegate: delegate,
timeout: timeout,
}
}

func (d *deadlineEnforcingBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()

return d.delegate.GetCapabilities(ctxWithTimeout, instanceName)
}

func (d *deadlineEnforcingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
ctxWithTimeout, cancel := context.WithTimeout(ctx, d.timeout)

return buffer.WithErrorHandler(
d.delegate.Get(ctxWithTimeout, digest),
&contextCancelingErrorHandler{cancel: cancel},
)
}

func (d *deadlineEnforcingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
ctxWithTimeout, cancel := context.WithTimeout(ctx, d.timeout)

return buffer.WithErrorHandler(
d.delegate.GetFromComposite(ctxWithTimeout, parentDigest, childDigest, slicer),
&contextCancelingErrorHandler{cancel: cancel},
)
}

func (d *deadlineEnforcingBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()

return d.delegate.Put(ctxWithTimeout, digest, b)
}

func (d *deadlineEnforcingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()

return d.delegate.FindMissing(ctxWithTimeout, digests)
}

// contextCancelingErrorHandler is used by deadlineEnforcingBlobAccess
// to ensure the Context object created by Get() and GetFromComposite()
// is canceled after the buffer is processed. This ensures that in the
// success path any resources allocated by context.WithTimeout() are
// released immediately, instead of waiting for the timeout to be
// reached.
type contextCancelingErrorHandler struct {
cancel context.CancelFunc
}

func (contextCancelingErrorHandler) OnError(err error) (buffer.Buffer, error) {
return nil, err
}

func (eh *contextCancelingErrorHandler) Done() {
eh.cancel()
eh.cancel = nil
}
Loading

0 comments on commit c93a48e

Please sign in to comment.