Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool chunk slices instead of reusing them #6

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions internal/server/handlers_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"log/slog"
"slices"
"sync"

"github.com/linkedin/diderot/ads"
"github.com/linkedin/diderot/internal/utils"
Expand Down Expand Up @@ -87,42 +88,61 @@ type deltaSender struct {
// larger than this size will cause the message to be dropped.
maxChunkSize int

// This slice is reused by chunk. It contains the updates about to be sent, sorted by their size over
// the wire.
queuedUpdates []queuedResourceUpdate
// The minimum size an encoded chunk will serialize to, in bytes. Used to check whether a given
// update can _ever_ be sent, and as the initial size of a chunk. Note that this value only depends
// on utils.NonceLength and the length of typeURL.
minChunkSize int
}

var queuedUpdatesPool = sync.Pool{}

// newQueue creates a new []queuedResourceUpdate with at least enough capacity to hold the required
// size. Note that this returns a pointer to a slice instead of a slice to avoid heap allocations.
// This is the recommended way to use a [sync.Pool] with slices.
func newQueue(size int) *[]queuedResourceUpdate {
// Attempt to get a queue from the pool. If it returns nil, ok will be false meaning the pool was
// empty.
queue, ok := queuedUpdatesPool.Get().(*[]queuedResourceUpdate)
if ok && cap(*queue) >= size {
*queue = (*queue)[:0]
} else {
if ok {
// Return the queue that was too short to the pool
queuedUpdatesPool.Put(queue)
}
queue = new([]queuedResourceUpdate)
Copy link

@bohhyang bohhyang Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my knowledge, this creates a new slice in the pool but what happens to the shorter one? Is it still in the pool or released? or just got extended longer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see just above, if the slice we get from the pool is too short, it returns it to the pool. I left a comment on what it means if ok is true

*queue = make([]queuedResourceUpdate, 0, size)
}
return queue
}

func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.DeltaDiscoveryResponse) {
defer func() {
clear(ds.queuedUpdates)
ds.queuedUpdates = ds.queuedUpdates[:0]
}()
queuePtr := newQueue(len(resourceUpdates))
defer queuedUpdatesPool.Put(queuePtr)

queue := *queuePtr
for name, e := range resourceUpdates {
ds.queuedUpdates = append(ds.queuedUpdates, queuedResourceUpdate{
queue = append(queue, queuedResourceUpdate{
Name: name,
Size: encodedUpdateSize(name, e.Resource),
})
}
// Sort the updates in descending order
slices.SortFunc(ds.queuedUpdates, func(a, b queuedResourceUpdate) int {
slices.SortFunc(queue, func(a, b queuedResourceUpdate) int {
return -cmp.Compare(a.Size, b.Size)
})

// This nested loop builds the fewest possible chunks it can from the given resourceUpdates map. It
// implements an approximation of the bin-packing algorithm called next-fit-decreasing bin-packing
// https://en.wikipedia.org/wiki/Next-fit-decreasing_bin_packing
idx := 0
for idx < len(ds.queuedUpdates) {
for idx < len(queue) {
// This chunk will hold all the updates for this loop iteration
chunk := ds.newChunk()
chunkSize := proto.Size(chunk)

for ; idx < len(ds.queuedUpdates); idx++ {
update := ds.queuedUpdates[idx]
for ; idx < len(queue); idx++ {
update := queue[idx]
r := resourceUpdates[update.Name].Resource

if ds.maxChunkSize > 0 {
Expand Down Expand Up @@ -172,7 +192,7 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
"Response exceeded max response size, sent in chunks",
"chunks", len(chunks),
"typeURL", ds.typeURL,
"updates", len(ds.queuedUpdates),
"updates", len(queue),
)
for i, c := range chunks {
c.Nonce = utils.NewNonce(len(chunks) - i - 1)
Expand Down