Skip to content

Commit

Permalink
feat: remove resolvable pools, reduces memory (#959)
Browse files Browse the repository at this point in the history
  • Loading branch information
jensneuse authored Nov 7, 2024
1 parent 3b20d63 commit 7fb59ca
Showing 1 changed file with 12 additions and 63 deletions.
75 changes: 12 additions & 63 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,16 @@ type Resolver struct {
triggerUpdatesSem *semaphore.Weighted
triggerUpdateBuf *bytes.Buffer

allowedErrorExtensionFields map[string]struct{}
allowedErrorFields map[string]struct{}

connectionIDs atomic.Int64

reporter Reporter
asyncErrorWriter AsyncErrorWriter

propagateSubgraphErrors bool
propagateSubgraphStatusCodes bool

// We create dedicated pools for request and subscription tools to more efficiently manage memory
// The assumption is that subscription responses are smaller than regular requests and we avoid
// the overhead of allocating memory for the larger request tools

requestTools *sync.Pool
subscriptionTools *sync.Pool
}

func (r *Resolver) SetAsyncErrorWriter(w AsyncErrorWriter) {
Expand Down Expand Up @@ -190,16 +186,8 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
reporter: options.Reporter,
asyncErrorWriter: options.AsyncErrorWriter,
triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
requestTools: &sync.Pool{
New: func() any {
return newTools(options, allowedExtensionFields, allowedErrorFields)
},
},
subscriptionTools: &sync.Pool{
New: func() any {
return newTools(options, allowedExtensionFields, allowedErrorFields)
},
},
allowedErrorExtensionFields: allowedExtensionFields,
allowedErrorFields: allowedErrorFields,
}
resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency)
for i := 0; i < options.MaxConcurrency; i++ {
Expand Down Expand Up @@ -238,35 +226,6 @@ func newTools(options ResolverOptions, allowedExtensionFields map[string]struct{
}
}

// getRequestTools returns a new tools struct with a limit of how many can be created concurrently
// The limit is defined by the MaxConcurrency option. Use putRequestTools to return the tools struct back to the pool
func (r *Resolver) getRequestTools() (time.Duration, *tools) {
start := time.Now()
<-r.maxConcurrency
tool := r.requestTools.Get().(*tools)
return time.Since(start), tool
}

// putRequestTools returns the tools struct back to the pool and releases the semaphore
func (r *Resolver) putRequestTools(t *tools) {
t.loader.Free()
t.resolvable.Reset(r.options.MaxRecyclableParserSize)
r.requestTools.Put(t)
r.maxConcurrency <- struct{}{}
}

// getSubscriptionTools returns a new tools struct. Use putSubscriptionTools to return the tools struct back to the pool.
func (r *Resolver) getSubscriptionTools() *tools {
return r.subscriptionTools.Get().(*tools)
}

// putSubscriptionTools returns the tools struct back to the pool
func (r *Resolver) putSubscriptionTools(t *tools) {
t.loader.Free()
t.resolvable.Reset(r.options.MaxRecyclableParserSize)
r.subscriptionTools.Put(t)
}

type GraphQLResolveInfo struct {
ResolveAcquireWaitTime time.Duration
}
Expand All @@ -275,19 +234,15 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons

resp := &GraphQLResolveInfo{}

toolsCleaned := false
acquireWaitTime, t := r.getRequestTools()
resp.ResolveAcquireWaitTime = acquireWaitTime

// Ensure that the tools are returned even on panic
// This is important because getTools() acquires a semaphore
// and if we don't return the tools, we will have a deadlock
start := time.Now()
<-r.maxConcurrency
resp.ResolveAcquireWaitTime = time.Since(start)
defer func() {
if !toolsCleaned {
r.putRequestTools(t)
}
r.maxConcurrency <- struct{}{}
}()

t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)

err := t.resolvable.Init(ctx, data, response.Info.OperationType)
if err != nil {
return nil, err
Expand All @@ -302,11 +257,6 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons

buf := &bytes.Buffer{}
err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf)

// Return the tools as soon as possible. More efficient in case of a slow client / network.
r.putRequestTools(t)
toolsCleaned = true

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -336,8 +286,7 @@ func (r *Resolver) executeSubscriptionUpdate(ctx *Context, sub *sub, sharedInput
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID)
}
t := r.getSubscriptionTools()
defer r.putSubscriptionTools(t)
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)

input := make([]byte, len(sharedInput))
copy(input, sharedInput)
Expand Down

0 comments on commit 7fb59ca

Please sign in to comment.