From 3c5cb8c3873df1a68685d4e84946f49bc45c748d Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Thu, 31 Oct 2024 19:59:24 -0400 Subject: [PATCH] Pool chunk slices instead of reusing them (#6) Similar to #2, where buffers are reused instead of pooled. Reusing a buffer instead of pooling it means its capacity is never released. Therefore if a large spike in traffic causes that buffer to grow, the resources it holds onto are never returned even as the traffic decreases. This causes those large buffers to stick around and not get used. This change uses a `sync.Pool` instead of each `deltaSender` having its own buffer. This way buffers are automatically released when they aren't used. Here's a screenshot of the motivation behind this change, these unfreed buffers are responsible for about 20% of the memory usage of the observer: ![image](https://github.com/user-attachments/assets/7e02ed52-346d-496f-9fa9-9bea84be7da4) --- internal/server/handlers_delta.go | 46 ++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/internal/server/handlers_delta.go b/internal/server/handlers_delta.go index 3e25613..39b0b20 100644 --- a/internal/server/handlers_delta.go +++ b/internal/server/handlers_delta.go @@ -5,6 +5,7 @@ import ( "context" "log/slog" "slices" + "sync" "github.com/linkedin/diderot/ads" "github.com/linkedin/diderot/internal/utils" @@ -87,28 +88,47 @@ type deltaSender struct { // larger than this size will cause the message to be dropped. maxChunkSize int - // This slice is reused by chunk. It contains the updates about to be sent, sorted by their size over - // the wire. - queuedUpdates []queuedResourceUpdate // The minimum size an encoded chunk will serialize to, in bytes. Used to check whether a given // update can _ever_ be sent, and as the initial size of a chunk. Note that this value only depends // on utils.NonceLength and the length of typeURL. minChunkSize int } +var queuedUpdatesPool = sync.Pool{} + +// newQueue creates a new []queuedResourceUpdate with at least enough capacity to hold the required +// size. Note that this returns a pointer to a slice instead of a slice to avoid heap allocations. +// This is the recommended way to use a [sync.Pool] with slices. +func newQueue(size int) *[]queuedResourceUpdate { + // Attempt to get a queue from the pool. If it returns nil, ok will be false meaning the pool was + // empty. + queue, ok := queuedUpdatesPool.Get().(*[]queuedResourceUpdate) + if ok && cap(*queue) >= size { + *queue = (*queue)[:0] + } else { + if ok { + // Return the queue that was too short to the pool + queuedUpdatesPool.Put(queue) + } + queue = new([]queuedResourceUpdate) + *queue = make([]queuedResourceUpdate, 0, size) + } + return queue +} + func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.DeltaDiscoveryResponse) { - defer func() { - clear(ds.queuedUpdates) - ds.queuedUpdates = ds.queuedUpdates[:0] - }() + queuePtr := newQueue(len(resourceUpdates)) + defer queuedUpdatesPool.Put(queuePtr) + + queue := *queuePtr for name, e := range resourceUpdates { - ds.queuedUpdates = append(ds.queuedUpdates, queuedResourceUpdate{ + queue = append(queue, queuedResourceUpdate{ Name: name, Size: encodedUpdateSize(name, e.Resource), }) } // Sort the updates in descending order - slices.SortFunc(ds.queuedUpdates, func(a, b queuedResourceUpdate) int { + slices.SortFunc(queue, func(a, b queuedResourceUpdate) int { return -cmp.Compare(a.Size, b.Size) }) @@ -116,13 +136,13 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De // implements an approximation of the bin-packing algorithm called next-fit-decreasing bin-packing // https://en.wikipedia.org/wiki/Next-fit-decreasing_bin_packing idx := 0 - for idx < len(ds.queuedUpdates) { + for idx < len(queue) { // This chunk will hold all the updates for this loop iteration chunk := ds.newChunk() chunkSize := proto.Size(chunk) - for ; idx < len(ds.queuedUpdates); idx++ { - update := ds.queuedUpdates[idx] + for ; idx < len(queue); idx++ { + update := queue[idx] r := resourceUpdates[update.Name].Resource if ds.maxChunkSize > 0 { @@ -172,7 +192,7 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De "Response exceeded max response size, sent in chunks", "chunks", len(chunks), "typeURL", ds.typeURL, - "updates", len(ds.queuedUpdates), + "updates", len(queue), ) for i, c := range chunks { c.Nonce = utils.NewNonce(len(chunks) - i - 1)