Skip to content

Commit

Permalink
Rate limit support (#165)
Browse files Browse the repository at this point in the history
* Connectorbuilder: Return responses even in error cases. This lets us bubble rate limiting annotations up the stack.

* Pass annotations up the stack so we can obey rate limits and back off on retries.

* Retry when error is Deadline exceeded

* Return Unavailable when status code is 5xx

* Add some comments

* Include wait time as detail in gRPC error message

* Parse retry-after has int or as date

- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Date

* Remove unused lint exception for gosec

* Log a warning if header content is unavailable or not parseable

* Log warning for 503 error too

* Revert "Remove unused lint exception for gosec"

This reverts commit 8b53ccc.

* Use `ratelimit.ExtractRateLimitData`

* Extract details for rate limit from error

* Remove annotations from returns

* Generalize detail embedding in error

* Ignore unused argument in function

* Join any errors when running DoOption functions. Add some comments in syncer.

* WIP add some testing/logging stuff for rate limits in client do.

* Use one grpc status error instead of joining them together. Round up in rate limiting.

* Make WrapErrors public

* Return optErrs as well as status code error.

* Better name for WrapErrors. Don't return an error in GRPCWrap if there's an issue geting rate limit data or adding details.

* Don't return annotations for now. This pull request is big enough already.

* Don't shadow err here.

* Don't log every time we fail to get rate limit data in an error. This probably happens a lot because not all http servers return rate limit data.

---------

Co-authored-by: Jorge Javier Araya Navarro <jorge@esavara.cr>
  • Loading branch information
ggreer and shackra authored Sep 12, 2024
1 parent eb862ad commit d3a21e0
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 41 deletions.
45 changes: 24 additions & 21 deletions pkg/connectorbuilder/connectorbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,22 @@ func (b *builderImpl) ListResources(ctx context.Context, request *v2.ResourcesSe
Size: int(request.PageSize),
Token: request.PageToken,
})
resp := &v2.ResourcesServiceListResourcesResponse{
List: out,
NextPageToken: nextPageToken,
Annotations: annos,
}
if err != nil {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: listing resources failed: %w", err)
return resp, fmt.Errorf("error: listing resources failed: %w", err)
}
if request.PageToken != "" && request.PageToken == nextPageToken {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: listing resources failed: next page token is the same as the current page token. this is most likely a connector bug")
return resp, fmt.Errorf("error: listing resources failed: next page token is the same as the current page token. this is most likely a connector bug")
}

b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.ResourcesServiceListResourcesResponse{
List: out,
NextPageToken: nextPageToken,
Annotations: annos,
}, nil
return resp, nil
}

// ListEntitlements returns all the entitlements for a given resource.
Expand All @@ -404,21 +405,22 @@ func (b *builderImpl) ListEntitlements(ctx context.Context, request *v2.Entitlem
Size: int(request.PageSize),
Token: request.PageToken,
})
resp := &v2.EntitlementsServiceListEntitlementsResponse{
List: out,
NextPageToken: nextPageToken,
Annotations: annos,
}
if err != nil {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: listing entitlements failed: %w", err)
return resp, fmt.Errorf("error: listing entitlements failed: %w", err)
}
if request.PageToken != "" && request.PageToken == nextPageToken {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: listing entitlements failed: next page token is the same as the current page token. this is most likely a connector bug")
return resp, fmt.Errorf("error: listing entitlements failed: next page token is the same as the current page token. this is most likely a connector bug")
}

b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.EntitlementsServiceListEntitlementsResponse{
List: out,
NextPageToken: nextPageToken,
Annotations: annos,
}, nil
return resp, nil
}

// ListGrants lists all the grants for a given resource.
Expand All @@ -436,23 +438,24 @@ func (b *builderImpl) ListGrants(ctx context.Context, request *v2.GrantsServiceL
Size: int(request.PageSize),
Token: request.PageToken,
})
resp := &v2.GrantsServiceListGrantsResponse{
List: out,
NextPageToken: nextPageToken,
Annotations: annos,
}
if err != nil {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: listing grants for resource %s/%s failed: %w", rid.ResourceType, rid.Resource, err)
return resp, fmt.Errorf("error: listing grants for resource %s/%s failed: %w", rid.ResourceType, rid.Resource, err)
}
if request.PageToken != "" && request.PageToken == nextPageToken {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: listing grants for resource %s/%s failed: next page token is the same as the current page token. this is most likely a connector bug",
return resp, fmt.Errorf("error: listing grants for resource %s/%s failed: next page token is the same as the current page token. this is most likely a connector bug",
rid.ResourceType,
rid.Resource)
}

b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.GrantsServiceListGrantsResponse{
List: out,
NextPageToken: nextPageToken,
Annotations: annos,
}, nil
return resp, nil
}

// GetMetadata gets all metadata for a connector.
Expand Down
21 changes: 18 additions & 3 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -92,15 +93,29 @@ func shouldWaitAndRetry(ctx context.Context, err error) bool {
attempts = 0
return true
}
if status.Code(err) != codes.Unavailable {
if status.Code(err) != codes.Unavailable && status.Code(err) != codes.DeadlineExceeded {
return false
}

attempts++
l := ctxzap.Extract(ctx)

// use linear time by default
var wait time.Duration = time.Duration(attempts) * time.Second

// If error contains rate limit data, use that instead
if st, ok := status.FromError(err); ok {
details := st.Details()
for _, detail := range details {
if rlData, ok := detail.(*v2.RateLimitDescription); ok {
wait = time.Until(rlData.ResetAt.AsTime())
wait /= time.Duration(rlData.Limit)
// Round up to the nearest second to make sure we don't hit the rate limit again
wait = time.Duration(math.Ceil(wait.Seconds())) * time.Second
}
}
}

l.Warn("retrying operation", zap.Error(err), zap.Duration("wait", wait))

for {
Expand Down Expand Up @@ -235,7 +250,7 @@ func (s *syncer) Sync(ctx context.Context) error {

case SyncAssetsOp:
err = s.SyncAssets(ctx)
if err != nil {
if !shouldWaitAndRetry(ctx, err) {
return err
}
continue
Expand Down Expand Up @@ -1325,7 +1340,7 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) {
return false, nil
}

func (s *syncer) newExpandedGrant(ctx context.Context, descEntitlement *v2.Entitlement, principal *v2.Resource) (*v2.Grant, error) {
func (s *syncer) newExpandedGrant(_ context.Context, descEntitlement *v2.Entitlement, principal *v2.Resource) (*v2.Grant, error) {
enResource := descEntitlement.GetResource()
if enResource == nil {
return nil, fmt.Errorf("newExpandedGrant: entitlement has no resource")
Expand Down
2 changes: 1 addition & 1 deletion pkg/uhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (uat *userAgentTripper) RoundTrip(req *http.Request) (*http.Response, error
return uat.next.RoundTrip(req)
}

func (t *Transport) make(ctx context.Context) (http.RoundTripper, error) {
func (t *Transport) make(_ context.Context) (http.RoundTripper, error) {
// based on http.DefaultTransport
baseTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand Down
54 changes: 38 additions & 16 deletions pkg/uhttp/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,23 @@ func WithResponse(response interface{}) DoOption {
}
}

func GRPCWrap(preferredCode codes.Code, resp *http.Response, errs ...error) error {
st := status.New(preferredCode, resp.Status)

description, err := ratelimit.ExtractRateLimitData(resp.StatusCode, &resp.Header)
// Ignore any error extracting rate limit data
if err == nil {
st, _ = st.WithDetails(description)
}

if len(errs) == 0 {
return st.Err()
}

allErrs := append([]error{st.Err()}, errs...)
return errors.Join(allErrs...)
}

func (c *BaseHttpClient) Do(req *http.Request, options ...DoOption) (*http.Response, error) {
var (
cacheKey string
Expand Down Expand Up @@ -268,41 +285,46 @@ func (c *BaseHttpClient) Do(req *http.Request, options ...DoOption) (*http.Respo
StatusCode: resp.StatusCode,
Body: body,
}

var optErrs []error
for _, option := range options {
err = option(&wresp)
if err != nil {
return resp, err
optErr := option(&wresp)
if optErr != nil {
optErrs = append(optErrs, optErr)
}
}

switch resp.StatusCode {
case http.StatusRequestTimeout:
return resp, status.Error(codes.DeadlineExceeded, resp.Status)
case http.StatusTooManyRequests:
return resp, status.Error(codes.Unavailable, resp.Status)
return resp, GRPCWrap(codes.DeadlineExceeded, resp, optErrs...)
case http.StatusTooManyRequests, http.StatusServiceUnavailable:
return resp, GRPCWrap(codes.Unavailable, resp, optErrs...)
case http.StatusNotFound:
return resp, status.Error(codes.NotFound, resp.Status)
return resp, GRPCWrap(codes.NotFound, resp, optErrs...)
case http.StatusUnauthorized:
return resp, status.Error(codes.Unauthenticated, resp.Status)
return resp, GRPCWrap(codes.Unauthenticated, resp, optErrs...)
case http.StatusForbidden:
return resp, status.Error(codes.PermissionDenied, resp.Status)
return resp, GRPCWrap(codes.PermissionDenied, resp, optErrs...)
case http.StatusNotImplemented:
return resp, status.Error(codes.Unimplemented, resp.Status)
return resp, GRPCWrap(codes.Unimplemented, resp, optErrs...)
}

if resp.StatusCode >= 500 && resp.StatusCode <= 599 {
return resp, GRPCWrap(codes.Unavailable, resp, optErrs...)
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return resp, status.Error(codes.Unknown, fmt.Sprintf("unexpected status code: %d", resp.StatusCode))
return resp, GRPCWrap(codes.Unknown, resp, append(optErrs, fmt.Errorf("unexpected status code: %d", resp.StatusCode))...)
}

if req.Method == http.MethodGet && resp.StatusCode == http.StatusOK {
err := c.baseHttpCache.Set(cacheKey, resp)
if err != nil {
l.Debug("error setting cache", zap.String("cacheKey", cacheKey), zap.String("url", req.URL.String()), zap.Error(err))
return resp, err
cacheErr := c.baseHttpCache.Set(cacheKey, resp)
if cacheErr != nil {
l.Warn("error setting cache", zap.String("cacheKey", cacheKey), zap.String("url", req.URL.String()), zap.Error(cacheErr))
}
}

return resp, err
return resp, errors.Join(optErrs...)
}

func WithHeader(key, value string) RequestOption {
Expand Down

0 comments on commit d3a21e0

Please sign in to comment.