From 6f66e36372a0466d9ea6007f442ef9256c2dca4a Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Tue, 29 Oct 2024 11:21:56 -0700 Subject: [PATCH] Pool chunk slices instead of reusing them Similar to #2, where buffers are reused instead of pooled. Reusing a buffer instead of pooling it means its capacity is never released. Therefore if a large spike in traffic causes that buffer to grow, the resources it holds onto are never returned even as the traffic decreases. This causes those large buffers to stick around and not get used. This change uses a `sync.Pool` instead of each `deltaSender` having its own buffer. This way buffers are automatically released when they aren't used. --- internal/server/handlers_delta.go | 46 ++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/internal/server/handlers_delta.go b/internal/server/handlers_delta.go index 3e25613..39b0b20 100644 --- a/internal/server/handlers_delta.go +++ b/internal/server/handlers_delta.go @@ -5,6 +5,7 @@ import ( "context" "log/slog" "slices" + "sync" "github.com/linkedin/diderot/ads" "github.com/linkedin/diderot/internal/utils" @@ -87,28 +88,47 @@ type deltaSender struct { // larger than this size will cause the message to be dropped. maxChunkSize int - // This slice is reused by chunk. It contains the updates about to be sent, sorted by their size over - // the wire. - queuedUpdates []queuedResourceUpdate // The minimum size an encoded chunk will serialize to, in bytes. Used to check whether a given // update can _ever_ be sent, and as the initial size of a chunk. Note that this value only depends // on utils.NonceLength and the length of typeURL. minChunkSize int } +var queuedUpdatesPool = sync.Pool{} + +// newQueue creates a new []queuedResourceUpdate with at least enough capacity to hold the required +// size. Note that this returns a pointer to a slice instead of a slice to avoid heap allocations. +// This is the recommended way to use a [sync.Pool] with slices. +func newQueue(size int) *[]queuedResourceUpdate { + // Attempt to get a queue from the pool. If it returns nil, ok will be false meaning the pool was + // empty. + queue, ok := queuedUpdatesPool.Get().(*[]queuedResourceUpdate) + if ok && cap(*queue) >= size { + *queue = (*queue)[:0] + } else { + if ok { + // Return the queue that was too short to the pool + queuedUpdatesPool.Put(queue) + } + queue = new([]queuedResourceUpdate) + *queue = make([]queuedResourceUpdate, 0, size) + } + return queue +} + func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.DeltaDiscoveryResponse) { - defer func() { - clear(ds.queuedUpdates) - ds.queuedUpdates = ds.queuedUpdates[:0] - }() + queuePtr := newQueue(len(resourceUpdates)) + defer queuedUpdatesPool.Put(queuePtr) + + queue := *queuePtr for name, e := range resourceUpdates { - ds.queuedUpdates = append(ds.queuedUpdates, queuedResourceUpdate{ + queue = append(queue, queuedResourceUpdate{ Name: name, Size: encodedUpdateSize(name, e.Resource), }) } // Sort the updates in descending order - slices.SortFunc(ds.queuedUpdates, func(a, b queuedResourceUpdate) int { + slices.SortFunc(queue, func(a, b queuedResourceUpdate) int { return -cmp.Compare(a.Size, b.Size) }) @@ -116,13 +136,13 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De // implements an approximation of the bin-packing algorithm called next-fit-decreasing bin-packing // https://en.wikipedia.org/wiki/Next-fit-decreasing_bin_packing idx := 0 - for idx < len(ds.queuedUpdates) { + for idx < len(queue) { // This chunk will hold all the updates for this loop iteration chunk := ds.newChunk() chunkSize := proto.Size(chunk) - for ; idx < len(ds.queuedUpdates); idx++ { - update := ds.queuedUpdates[idx] + for ; idx < len(queue); idx++ { + update := queue[idx] r := resourceUpdates[update.Name].Resource if ds.maxChunkSize > 0 { @@ -172,7 +192,7 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De "Response exceeded max response size, sent in chunks", "chunks", len(chunks), "typeURL", ds.typeURL, - "updates", len(ds.queuedUpdates), + "updates", len(queue), ) for i, c := range chunks { c.Nonce = utils.NewNonce(len(chunks) - i - 1)