Skip to content

Commit

Permalink
Switch sharding struct from array to map
Browse files Browse the repository at this point in the history
  • Loading branch information
meroton-benjamin committed Feb 3, 2025
1 parent 07366c6 commit f0b6827
Show file tree
Hide file tree
Showing 13 changed files with 656 additions and 683 deletions.
2 changes: 1 addition & 1 deletion internal/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ gomock(
gomock(
name = "blobstore_sharding",
out = "blobstore_sharding.go",
interfaces = ["ShardPermuter"],
interfaces = ["ShardSelector"],
library = "//pkg/blobstore/sharding",
mockgen_model_library = "@org_uber_go_mock//mockgen/model",
mockgen_tool = "@org_uber_go_mock//mockgen",
Expand Down
34 changes: 14 additions & 20 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,40 +85,34 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
DigestKeyFormat: slow.DigestKeyFormat,
}, "read_caching", nil
case *pb.BlobAccessConfiguration_Sharding:
backends := make([]blobstore.BlobAccess, 0, len(backend.Sharding.Shards))
weights := make([]uint32, 0, len(backend.Sharding.Shards))
backends := make(map[string]blobstore.BlobAccess, len(backend.Sharding.ShardMap))
weights := make(map[string]uint32, len(backend.Sharding.ShardMap))
var combinedDigestKeyFormat *digest.KeyFormat
for _, shard := range backend.Sharding.Shards {
if shard.Backend == nil {
// Drained backend.
backends = append(backends, nil)
for key, shard := range backend.Sharding.ShardMap {
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends[key] = backend.BlobAccess
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
// Undrained backend.
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}

if shard.Weight == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shards must have positive weights")
}
weights = append(weights, shard.Weight)
weights[key] = shard.Weight
}
if combinedDigestKeyFormat == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any undrained backends")
}
return BlobAccessInfo{
BlobAccess: sharding.NewShardingBlobAccess(
backends,
sharding.NewWeightedShardPermuter(weights),
sharding.NewRendezvousShardSelector(weights),
backend.Sharding.HashInitialization),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/blobstore/sharding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sharding",
srcs = [
"shard_permuter.go",
"shard_selector.go",
"sharding_blob_access.go",
"weighted_shard_permuter.go",
"rendezvous_shard_selector.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/sharding",
visibility = ["//visibility:public"],
Expand All @@ -16,7 +16,6 @@ go_library(
"//pkg/digest",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_lazybeaver_xorshift//:xorshift",
"@org_golang_x_sync//errgroup",
],
)
Expand All @@ -25,7 +24,7 @@ go_test(
name = "sharding_test",
srcs = [
"sharding_blob_access_test.go",
"weighted_shard_permuter_test.go",
"rendezvous_shard_selector_test.go",
],
deps = [
":sharding",
Expand Down
64 changes: 64 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package sharding

import (
"hash/fnv"
"math"
)

type rendezvousShardSelector struct {
keyMap map[uint64]string
weightMap map[uint64]uint32
}

func hashServer(key string) uint64 {
h := fnv.New64a()
h.Write([]byte(key))
return h.Sum64()
}

func NewRendezvousShardSelector(weights map[string]uint32) *rendezvousShardSelector {
weightMap := make(map[uint64]uint32, len(weights))
keyMap := make(map[uint64]string, len(weights))

for key, weight := range weights {
keyHash := hashServer(key)
keyMap[keyHash] = key
weightMap[keyHash] = weight
}
return &rendezvousShardSelector{
keyMap: keyMap,
weightMap: weightMap,
}
}

func score(x uint64) float64 {
// branchless clamp to [1,MAX_UINT64-1]
x = x - ((x|-x)>>63) + (((^x)|-(^x)) >> 63)
frac := float64(x)/float64(^uint64(0))
return 1.0/-math.Log(frac)
}

// PRNG without branches or allocations
func splitmix64(x uint64) uint64 {
x ^= x >> 30
x *= 0xbf58476d1ce4e5b9
x ^= x >> 27
x *= 0x94d049bb133111eb
x ^= x >> 31
return x
}

func (s *rendezvousShardSelector) GetShard(hash uint64) string {
var best float64
var bestKey string

for keyHash, weight := range s.weightMap {
mixed := splitmix64(hash^keyHash)
current := float64(weight) * score(mixed)
if current > best {
best = current
bestKey = s.keyMap[keyHash]
}
}
return bestKey
}
26 changes: 26 additions & 0 deletions pkg/blobstore/sharding/rendezvous_shard_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sharding_test

import (
"testing"

"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
"github.com/stretchr/testify/require"
)

func TestRendezvousShardSelectorDistribution(t *testing.T) {
// Distribution across five backends with a total weight of 15.
weights := map[string]uint32{"a": 1, "b": 4, "c:": 2, "d": 5, "e": 3}
s := sharding.NewRendezvousShardSelector(weights)

// Request the shard for a very large amount of blobs
occurrences := map[string]uint32{}
for i := 0; i < 1000000; i++ {
hash := uint64(i)
occurrences[s.GetShard(hash)] += 1
}

// Requests should be fanned out with a small error margin.
for shard, weight := range weights {
require.InEpsilon(t, weight*1000000/15, occurrences[shard], 0.01)
}
}
19 changes: 0 additions & 19 deletions pkg/blobstore/sharding/shard_permuter.go

This file was deleted.

10 changes: 10 additions & 0 deletions pkg/blobstore/sharding/shard_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package sharding

// ShardSelector is an algorithm that for a hash resolves into a key which
// corresponds to the specific backend for that shard.
//
// The algorithm must be stable, the removal of an unavailable backend should
// not result in the reshuffling of any other blobs.
type ShardSelector interface {
GetShard(hash uint64) string
}
78 changes: 35 additions & 43 deletions pkg/blobstore/sharding/sharding_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,90 +15,82 @@ import (
)

type shardingBlobAccess struct {
backends []blobstore.BlobAccess
shardPermuter ShardPermuter
backends map[string]blobstore.BlobAccess
shardSelector ShardSelector
hashInitialization uint64
getCapabilitiesRound atomic.Uint64
}

// NewShardingBlobAccess is an adapter for BlobAccess that partitions
// requests across backends by hashing the digest. A ShardPermuter is
// requests across backends by hashing the digest. A ShardSelector is
// used to map hashes to backends.
func NewShardingBlobAccess(backends []blobstore.BlobAccess, shardPermuter ShardPermuter, hashInitialization uint64) blobstore.BlobAccess {
func NewShardingBlobAccess(backends map[string]blobstore.BlobAccess, shardSelector ShardSelector, hashInitialization uint64) blobstore.BlobAccess {
return &shardingBlobAccess{
backends: backends,
shardPermuter: shardPermuter,
shardSelector: shardSelector,
hashInitialization: hashInitialization,
}
}

func (ba *shardingBlobAccess) getBackendIndexByDigest(blobDigest digest.Digest) int {
func (ba *shardingBlobAccess) getBackendKeyByDigest(blobDigest digest.Digest) string {
// Hash the key using FNV-1a.
h := ba.hashInitialization
for _, c := range blobDigest.GetKey(digest.KeyWithoutInstance) {
h ^= uint64(c)
h *= 1099511628211
}
return ba.getBackendIndexByHash(h)
return ba.getBackendKeyByHash(h)
}

func (ba *shardingBlobAccess) getBackendIndexByHash(h uint64) int {
// Keep requesting shards until matching one that is undrained.
var selectedIndex int
ba.shardPermuter.GetShard(h, func(index int) bool {
if ba.backends[index] == nil {
return true
}
selectedIndex = index
return false
})
return selectedIndex
func (ba *shardingBlobAccess) getBackendKeyByHash(h uint64) string {
var selectedKey string = ba.shardSelector.GetShard(h)
return selectedKey
}

func (ba *shardingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
index := ba.getBackendIndexByDigest(digest)
key := ba.getBackendKeyByDigest(digest)
return buffer.WithErrorHandler(
ba.backends[index].Get(ctx, digest),
shardIndexAddingErrorHandler{index: index})
ba.backends[key].Get(ctx, digest),
shardKeyAddingErrorHandler{key: key})
}

func (ba *shardingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
index := ba.getBackendIndexByDigest(parentDigest)
key := ba.getBackendKeyByDigest(parentDigest)
return buffer.WithErrorHandler(
ba.backends[index].GetFromComposite(ctx, parentDigest, childDigest, slicer),
shardIndexAddingErrorHandler{index: index})
ba.backends[key].GetFromComposite(ctx, parentDigest, childDigest, slicer),
shardKeyAddingErrorHandler{key: key})
}

func (ba *shardingBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
index := ba.getBackendIndexByDigest(digest)
if err := ba.backends[index].Put(ctx, digest, b); err != nil {
return util.StatusWrapf(err, "Shard %d", index)
key := ba.getBackendKeyByDigest(digest)
if err := ba.backends[key].Put(ctx, digest, b); err != nil {
return util.StatusWrapf(err, "Shard %s", key)
}
return nil
}

func (ba *shardingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
// Partition all digests by shard.
digestsPerBackend := make([]digest.SetBuilder, 0, len(ba.backends))
for range ba.backends {
digestsPerBackend = append(digestsPerBackend, digest.NewSetBuilder())
digestsPerBackend := make(map[string]digest.SetBuilder, len(ba.backends))
for key, _ := range ba.backends {
digestsPerBackend[key] = digest.NewSetBuilder()
}
for _, blobDigest := range digests.Items() {
digestsPerBackend[ba.getBackendIndexByDigest(blobDigest)].Add(blobDigest)
digestsPerBackend[ba.getBackendKeyByDigest(blobDigest)].Add(blobDigest)
}

// Asynchronously call FindMissing() on backends.
missingPerBackend := make([]digest.Set, 0, len(ba.backends))
group, ctxWithCancel := errgroup.WithContext(ctx)
for indexIter, digestsIter := range digestsPerBackend {
index, digests := indexIter, digestsIter
for keyIter, digestsIter := range digestsPerBackend {
key, digests := keyIter, digestsIter
if digests.Length() > 0 {
missingPerBackend = append(missingPerBackend, digest.EmptySet)
missingOut := &missingPerBackend[len(missingPerBackend)-1]
group.Go(func() error {
missing, err := ba.backends[index].FindMissing(ctxWithCancel, digests.Build())
missing, err := ba.backends[key].FindMissing(ctxWithCancel, digests.Build())
if err != nil {
return util.StatusWrapf(err, "Shard %d", index)
return util.StatusWrapf(err, "Shard %s", key)
}
*missingOut = missing
return nil
Expand All @@ -115,20 +107,20 @@ func (ba *shardingBlobAccess) FindMissing(ctx context.Context, digests digest.Se

func (ba *shardingBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
// Spread requests across shards.
index := ba.getBackendIndexByHash(ba.getCapabilitiesRound.Add(1))
capabilities, err := ba.backends[index].GetCapabilities(ctx, instanceName)
key := ba.getBackendKeyByHash(ba.getCapabilitiesRound.Add(1))
capabilities, err := ba.backends[key].GetCapabilities(ctx, instanceName)
if err != nil {
return nil, util.StatusWrapf(err, "Shard %d", index)
return nil, util.StatusWrapf(err, "Shard %s", key)
}
return capabilities, nil
}

type shardIndexAddingErrorHandler struct {
index int
type shardKeyAddingErrorHandler struct {
key string
}

func (eh shardIndexAddingErrorHandler) OnError(err error) (buffer.Buffer, error) {
return nil, util.StatusWrapf(err, "Shard %d", eh.index)
func (eh shardKeyAddingErrorHandler) OnError(err error) (buffer.Buffer, error) {
return nil, util.StatusWrapf(err, "Shard %s", eh.key)
}

func (eh shardIndexAddingErrorHandler) Done() {}
func (eh shardKeyAddingErrorHandler) Done() {}
Loading

0 comments on commit f0b6827

Please sign in to comment.