diff --git a/pkg/client/options.go b/pkg/client/options.go index d722330..9c209ea 100644 --- a/pkg/client/options.go +++ b/pkg/client/options.go @@ -60,6 +60,13 @@ func getRequestOpts(i *info.RequestInfo, options psrpc.ClientOpts, opts ...psrpc opt(o) } + if o.SelectionOpts.Jitter < 0 { + o.SelectionOpts.Jitter = 0 + } + if o.SelectionOpts.Jitter > 1 { + o.SelectionOpts.Jitter = 1 + } + return *o } diff --git a/pkg/client/rpc.go b/pkg/client/rpc.go index 3b7507a..865a61b 100644 --- a/pkg/client/rpc.go +++ b/pkg/client/rpc.go @@ -17,6 +17,8 @@ package client import ( "context" "errors" + r "math/rand" + "sort" "time" "google.golang.org/protobuf/proto" @@ -176,36 +178,52 @@ func selectServer( time.AfterFunc(opts.AffinityTimeout, cancel) } - serverID := "" - best := float32(0) shorted := false - claims := 0 var resErr error + var claims []*internal.ClaimRequest + var resCount int for { select { case <-ctx.Done(): - if best > 0 { - return serverID, nil + if len(claims) > 0 { + sort.Slice(claims, func(i, j int) bool { + return claims[i].Affinity > claims[j].Affinity + }) + + best := claims[0].Affinity + minAffinity := best * (1 - opts.Jitter) + + i := 0 + for ; i < len(claims); i++ { + if claims[i].Affinity < minAffinity { + break + } + } + + return claims[r.Intn(i)].ServerId, nil } + if resErr != nil { return "", resErr } - if claims == 0 { - return "", psrpc.ErrNoResponse + + if len(claims) == 0 { + if resCount > 0 { + return "", psrpc.NewErrorf(psrpc.Unavailable, "no servers available (received %d responses)", resCount) + } else { + return "", psrpc.ErrNoResponse + } } - return "", psrpc.NewErrorf(psrpc.Unavailable, "no servers available (received %d responses)", claims) case claim := <-claimChan: - claims++ - if (opts.MinimumAffinity > 0 && claim.Affinity >= opts.MinimumAffinity && claim.Affinity > best) || - (opts.MinimumAffinity <= 0 && claim.Affinity > best) { + resCount++ + if (opts.MinimumAffinity > 0 && claim.Affinity >= opts.MinimumAffinity) || opts.MinimumAffinity <= 0 { if opts.AcceptFirstAvailable || opts.MaximumAffinity > 0 && claim.Affinity >= opts.MaximumAffinity { return claim.ServerId, nil } - serverID = claim.ServerId - best = claim.Affinity + claims = append(claims, claim) if opts.ShortCircuitTimeout > 0 && !shorted { shorted = true diff --git a/request.go b/request.go index d08e446..e93d38b 100644 --- a/request.go +++ b/request.go @@ -31,6 +31,7 @@ type RequestOpts struct { type SelectionOpts struct { MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler MaximumAffinity float32 // if > 0, any server returning a max score will be selected immediately + Jitter float32 // randomness applied to selection (0 to 1) AcceptFirstAvailable bool // go fast AffinityTimeout time.Duration // server selection deadline ShortCircuitTimeout time.Duration // deadline imposed after receiving first response