Skip to content

Commit

Permalink
Pool chunk slices instead of reusing them (#6)
Browse files Browse the repository at this point in the history
Similar to #2, where buffers are reused instead of pooled. Reusing a
buffer instead of pooling it means its capacity is never released.
Therefore if a large spike in traffic causes that buffer to grow, the
resources it holds onto are never returned even as the traffic
decreases. This causes those large buffers to stick around and not get
used.

This change uses a `sync.Pool` instead of each `deltaSender` having its
own buffer. This way buffers are automatically released when they aren't
used.

Here's a screenshot of the motivation behind this change, these unfreed
buffers are responsible for about 20% of the memory usage of the
observer:

![image](https://github.com/user-attachments/assets/7e02ed52-346d-496f-9fa9-9bea84be7da4)
  • Loading branch information
PapaCharlie authored Oct 31, 2024
1 parent 1d64b49 commit 3c5cb8c
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions internal/server/handlers_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"log/slog"
"slices"
"sync"

"github.com/linkedin/diderot/ads"
"github.com/linkedin/diderot/internal/utils"
Expand Down Expand Up @@ -87,42 +88,61 @@ type deltaSender struct {
// larger than this size will cause the message to be dropped.
maxChunkSize int

// This slice is reused by chunk. It contains the updates about to be sent, sorted by their size over
// the wire.
queuedUpdates []queuedResourceUpdate
// The minimum size an encoded chunk will serialize to, in bytes. Used to check whether a given
// update can _ever_ be sent, and as the initial size of a chunk. Note that this value only depends
// on utils.NonceLength and the length of typeURL.
minChunkSize int
}

var queuedUpdatesPool = sync.Pool{}

// newQueue creates a new []queuedResourceUpdate with at least enough capacity to hold the required
// size. Note that this returns a pointer to a slice instead of a slice to avoid heap allocations.
// This is the recommended way to use a [sync.Pool] with slices.
func newQueue(size int) *[]queuedResourceUpdate {
// Attempt to get a queue from the pool. If it returns nil, ok will be false meaning the pool was
// empty.
queue, ok := queuedUpdatesPool.Get().(*[]queuedResourceUpdate)
if ok && cap(*queue) >= size {
*queue = (*queue)[:0]
} else {
if ok {
// Return the queue that was too short to the pool
queuedUpdatesPool.Put(queue)
}
queue = new([]queuedResourceUpdate)
*queue = make([]queuedResourceUpdate, 0, size)
}
return queue
}

func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.DeltaDiscoveryResponse) {
defer func() {
clear(ds.queuedUpdates)
ds.queuedUpdates = ds.queuedUpdates[:0]
}()
queuePtr := newQueue(len(resourceUpdates))
defer queuedUpdatesPool.Put(queuePtr)

queue := *queuePtr
for name, e := range resourceUpdates {
ds.queuedUpdates = append(ds.queuedUpdates, queuedResourceUpdate{
queue = append(queue, queuedResourceUpdate{
Name: name,
Size: encodedUpdateSize(name, e.Resource),
})
}
// Sort the updates in descending order
slices.SortFunc(ds.queuedUpdates, func(a, b queuedResourceUpdate) int {
slices.SortFunc(queue, func(a, b queuedResourceUpdate) int {
return -cmp.Compare(a.Size, b.Size)
})

// This nested loop builds the fewest possible chunks it can from the given resourceUpdates map. It
// implements an approximation of the bin-packing algorithm called next-fit-decreasing bin-packing
// https://en.wikipedia.org/wiki/Next-fit-decreasing_bin_packing
idx := 0
for idx < len(ds.queuedUpdates) {
for idx < len(queue) {
// This chunk will hold all the updates for this loop iteration
chunk := ds.newChunk()
chunkSize := proto.Size(chunk)

for ; idx < len(ds.queuedUpdates); idx++ {
update := ds.queuedUpdates[idx]
for ; idx < len(queue); idx++ {
update := queue[idx]
r := resourceUpdates[update.Name].Resource

if ds.maxChunkSize > 0 {
Expand Down Expand Up @@ -172,7 +192,7 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
"Response exceeded max response size, sent in chunks",
"chunks", len(chunks),
"typeURL", ds.typeURL,
"updates", len(ds.queuedUpdates),
"updates", len(queue),
)
for i, c := range chunks {
c.Nonce = utils.NewNonce(len(chunks) - i - 1)
Expand Down

0 comments on commit 3c5cb8c

Please sign in to comment.