diff --git a/db.go b/db.go index dbf5b7e7a..27bde6eb0 100644 --- a/db.go +++ b/db.go @@ -2005,6 +2005,7 @@ func (db *DB) StreamDB(outOptions Options) error { // Stream contents of DB to the output DB. stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir) + stream.FullCopy = true stream.Send = func(buf *z.Buffer) error { return writer.Write(buf) diff --git a/iterator.go b/iterator.go index 54d2bf27d..400a6ba66 100644 --- a/iterator.go +++ b/iterator.go @@ -366,17 +366,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool { // that the tables are sorted in the right order. func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table { filterTables := func(tables []*table.Table) []*table.Table { - if opt.SinceTs > 0 { - tmp := tables[:0] - for _, t := range tables { - if t.MaxVersion() < opt.SinceTs { - continue - } - tmp = append(tmp, t) + if opt.SinceTs == 0 { + return tables + } + out := tables[:0] + for _, t := range tables { + if t.MaxVersion() < opt.SinceTs { + continue } - tables = tmp + out = append(out, t) } - return tables + return out } if len(opt.Prefix) == 0 { @@ -491,7 +491,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { for i := 0; i < len(tables); i++ { iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse)) } - iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references. + iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references. res := &Iterator{ txn: txn, iitr: table.NewMergeIterator(iters, opt.Reverse), diff --git a/key_registry.go b/key_registry.go index ddca6501a..6ac8b1ab1 100644 --- a/key_registry.go +++ b/key_registry.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/badger/v4/y" "google.golang.org/protobuf/proto" @@ -339,8 +341,7 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { defer kr.Unlock() // Key might have generated by another go routine. So, // checking once again. - key, valid = validKey() - if valid { + if key, valid := validKey(); valid { return key, nil } k := make([]byte, len(kr.opt.EncryptionKey)) @@ -348,8 +349,8 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { if err != nil { return nil, err } - _, err = rand.Read(k) - if err != nil { + + if _, err := rand.Read(k); err != nil { return nil, err } // Otherwise Increment the KeyID and generate new datakey. @@ -360,25 +361,40 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { CreatedAt: time.Now().Unix(), Iv: iv, } - // Don't store the datakey on file if badger is running in InMemory mode. - if !kr.opt.InMemory { - // Store the datekey. - buf := &bytes.Buffer{} - if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil { - return nil, err - } - // Persist the datakey to the disk - if _, err = kr.fp.Write(buf.Bytes()); err != nil { - return nil, err - } - } - // storeDatakey encrypts the datakey So, placing un-encrypted key in the memory. - dk.Data = k kr.lastCreated = dk.CreatedAt kr.dataKeys[kr.nextKeyID] = dk + // Don't store the datakey on file if badger is running in InMemory mode. + if kr.opt.InMemory { + return dk, nil + + } + // Store the datekey. + if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil { + return nil, err + } return dk, nil } +func (kr *KeyRegistry) AddKey(dk *pb.DataKey) (uint64, error) { + // If we don't have a encryption key, we cannot store the datakey. + if len(kr.opt.EncryptionKey) == 0 { + return 0, errors.New("No encryption key found. Cannot add data key") + } + + if _, ok := kr.dataKeys[dk.KeyId]; !ok { + // If KeyId does not exists already, then use the next available KeyId to store data key. + kr.nextKeyID++ + dk.KeyId = kr.nextKeyID + } + kr.dataKeys[dk.KeyId] = dk + + if kr.opt.InMemory { + return dk.KeyId, nil + } + // Store the datakey. + return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk) +} + // Close closes the key registry. func (kr *KeyRegistry) Close() error { if !(kr.opt.ReadOnly || kr.opt.InMemory) { @@ -388,7 +404,8 @@ func (kr *KeyRegistry) Close() error { } // storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset. -func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { +// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field. +func storeDataKey(w io.Writer, storageKey []byte, key *pb.DataKey) error { // xor will encrypt the IV and xor with the given data. // It'll used for both encryption and decryption. xor := func() error { @@ -396,30 +413,27 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { return nil } var err error - k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv) + key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv) return err } // In memory datakey will be plain text so encrypting before storing to the disk. - var err error - if err = xor(); err != nil { + if err := xor(); err != nil { return y.Wrapf(err, "Error while encrypting datakey in storeDataKey") } - var data []byte - if data, err = proto.Marshal(k); err != nil { + + data, err := proto.Marshal(key) + if err != nil { err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey") - var err2 error - // decrypting the datakey back. - if err2 = xor(); err2 != nil { - return y.Wrapf(err, - y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error()) + if err2 := xor(); err2 != nil { + return y.Wrapf(err, y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error()) } return err } + var lenCrcBuf [8]byte binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data))) binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable)) - y.Check2(buf.Write(lenCrcBuf[:])) - y.Check2(buf.Write(data)) - // Decrypting the datakey back since we're using the pointer. - return xor() + y.Check2(w.Write(lenCrcBuf[:])) + y.Check2(w.Write(data)) + return nil } diff --git a/level_handler.go b/level_handler.go index fc81cc452..917de5773 100644 --- a/level_handler.go +++ b/level_handler.go @@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { return maxVs, decr() } -// appendIterators appends iterators to an array of iterators, for merging. +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator { s.RLock() defer s.RUnlock() @@ -324,14 +324,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) out = append(out, t) } } - return appendIteratorsReversed(iters, out, topt) + return iteratorsReversed(out, topt) } tables := opt.pickTables(s.tables) if len(tables) == 0 { - return iters + return nil } - return append(iters, table.NewConcatIterator(tables, topt)) + return []y.Iterator{table.NewConcatIterator(tables, topt)} +} + +func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table { + if opt.Reverse { + panic("Invalid option for getTables") + } + + s.RLock() + defer s.RUnlock() + + if s.level == 0 { + var out []*table.Table + for _, t := range s.tables { + if opt.pickTable(t) { + t.IncrRef() + out = append(out, t) + } + } + return out + } + + tables := opt.pickTables(s.tables) + for _, t := range tables { + t.IncrRef() + } + return tables } type levelHandlerRLocked struct{} diff --git a/levels.go b/levels.go index 7b32cffa9..5c4a4f1ac 100644 --- a/levels.go +++ b/levels.go @@ -34,6 +34,7 @@ import ( "github.com/pkg/errors" otrace "go.opencensus.io/trace" + "github.com/dgraph-io/badger/v4/options" "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/badger/v4/table" "github.com/dgraph-io/badger/v4/y" @@ -909,7 +910,7 @@ func (s *levelsController) compactBuildTables( var iters []y.Iterator switch { case lev == 0: - iters = appendIteratorsReversed(iters, topTables, table.NOCACHE) + iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...) case len(topTables) > 0: y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)} @@ -1642,7 +1643,8 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) return maxVs, nil } -func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator { +func iteratorsReversed(th []*table.Table, opt int) []y.Iterator { + out := make([]y.Iterator, 0, len(th)) for i := len(th) - 1; i >= 0; i-- { // This will increment the reference of the table handler. out = append(out, th[i].NewIterator(opt)) @@ -1650,16 +1652,25 @@ func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.I return out } -// appendIterators appends iterators to an array of iterators, for merging. +// getTables return tables from all levels. It would call IncrRef on all returned tables. +func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table { + res := make([][]*table.Table, 0, len(s.levels)) + for _, level := range s.levels { + res = append(res, level.getTables(opt)) + } + return res +} + +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelsController) appendIterators( - iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator { // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing // data when there's a compaction. + itrs := make([]y.Iterator, 0, len(s.levels)) for _, level := range s.levels { - iters = level.appendIterators(iters, opt) + itrs = append(itrs, level.iterators(opt)...) } - return iters + return itrs } // TableInfo represents the information about a table. @@ -1786,3 +1797,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string { sort.Strings(splits) return splits } + +// AddTable builds the table from the KV.value options passed through the KV.Key. +func (lc *levelsController) AddTable( + kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error { + // TODO: Encryption / Decryption might be required for the table, if the sender and receiver + // don't have same encryption mode. See if inplace encryption/decryption can be done. + // Tables are sent in the sorted order, so no need to sort them here. + encrypted := len(lc.kv.opt.EncryptionKey) > 0 + y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted)) + // The keyId is zero if there is no encryption. + opts := buildTableOptions(lc.kv) + opts.Compression = options.CompressionType(change.Compression) + opts.DataKey = dk + + fileID := lc.reserveFileID() + fname := table.NewFilename(fileID, lc.kv.opt.Dir) + + // kv.Value is owned by the z.buffer. Ensure that we copy this buffer. + var tbl *table.Table + var err error + if lc.kv.opt.InMemory { + if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil { + return errors.Wrap(err, "while creating in-memory table from buffer") + } + } else { + if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil { + return errors.Wrap(err, "while creating table from buffer") + } + } + + lc.levels[lev].addTable(tbl) + // Release the ref held by OpenTable. addTable would add a reference. + _ = tbl.DecrRef() + + change.Id = fileID + change.Level = uint32(lev) + if dk != nil { + change.KeyId = dk.KeyId + } + // We use the same data KeyId. So, change.KeyId remains the same. + y.AssertTrue(change.Op == pb.ManifestChange_CREATE) + return lc.kv.manifest.addChanges([]*pb.ManifestChange{change}) +} diff --git a/levels_test.go b/levels_test.go index 67633ed4f..108214616 100644 --- a/levels_test.go +++ b/levels_test.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "io/ioutil" "math" "math/rand" "os" @@ -40,7 +41,15 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { BloomFalsePositive: db.opt.BloomFalsePositive, ChkMode: options.NoVerification, } - b := table.NewTableBuilder(opts) + createAndOpenWithOptions(db, td, level, &opts) +} + +func createAndOpenWithOptions(db *DB, td []keyValVersion, level int, opts *table.Options) { + if opts == nil { + bopts := buildTableOptions(db) + opts = &bopts + } + b := table.NewTableBuilder(*opts) defer b.Close() // Add all keys and versions to the table. @@ -49,13 +58,21 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} b.Add(key, val, 0) } - fname := table.NewFilename(db.lc.reserveFileID(), db.opt.Dir) - tab, err := table.CreateTable(fname, b) + fileID := db.lc.reserveFileID() + var tab *table.Table + var err error + if db.opt.InMemory { + data := b.Finish() + tab, err = table.OpenInMemoryTable(data, fileID, opts) + } else { + fname := table.NewFilename(fileID, db.opt.Dir) + tab, err = table.CreateTable(fname, b) + } if err != nil { panic(err) } if err := db.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(tab.ID(), level, 0, tab.CompressionType()), + newCreateChange(tab.ID(), level, tab.KeyID(), tab.CompressionType()), }); err != nil { panic(err) } @@ -1302,3 +1319,85 @@ func TestStaleDataCleanup(t *testing.T) { }) } + +func TestStreamWithFullCopy(t *testing.T) { + dbopts := DefaultOptions("") + dbopts.managedTxns = true + dbopts.MaxLevels = 7 + dbopts.NumVersionsToKeep = math.MaxInt32 + + encKey := make([]byte, 24) + _, err := rand.Read(encKey) + require.NoError(t, err) + + test := func(db *DB, outOpts Options) { + l4 := []keyValVersion{{"a", "1", 3, bitDelete}, {"d", "4", 3, 0}} + l5 := []keyValVersion{{"b", "2", 2, 0}} + l6 := []keyValVersion{{"a", "1", 2, 0}, {"c", "3", 1, 0}} + createAndOpenWithOptions(db, l4, 4, nil) + createAndOpenWithOptions(db, l5, 5, nil) + createAndOpenWithOptions(db, l6, 6, nil) + + if !outOpts.InMemory { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + outOpts.Dir = dir + outOpts.ValueDir = dir + } + + require.NoError(t, db.StreamDB(outOpts)) + out, err := Open(outOpts) + require.NoError(t, err) + defer func() { + require.NoError(t, out.Close()) + }() + err = out.View(func(txn *Txn) error { + // Key "a" should not be there because we deleted it at higher version. + _, err := txn.Get([]byte("a")) + require.Error(t, err) + require.Equal(t, err, ErrKeyNotFound) + _, err = txn.Get([]byte("b")) + require.NoError(t, err) + _, err = txn.Get([]byte("c")) + require.NoError(t, err) + _, err = txn.Get([]byte("d")) + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + t.Run("without encryption", func(t *testing.T) { + opts := dbopts + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + }) + }) + t.Run("with encryption", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) + t.Run("stream from in-memory to persistent", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + opts.InMemory = true + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + outOpts := opts + outOpts.InMemory = false + test(db, outOpts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) +} diff --git a/manifest.go b/manifest.go index ffe145529..23e9fffbe 100644 --- a/manifest.go +++ b/manifest.go @@ -129,7 +129,7 @@ func (m *Manifest) clone() Manifest { func openOrCreateManifestFile(opt Options) ( ret *manifestFile, result Manifest, err error) { if opt.InMemory { - return &manifestFile{inMemory: true}, Manifest{}, nil + return &manifestFile{inMemory: true, manifest: createManifest()}, Manifest{}, nil } return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion, manifestDeletionsRewriteThreshold) @@ -207,21 +207,20 @@ func (mf *manifestFile) close() error { // this depends on the filesystem -- some might append garbage data if a system crash happens at // the wrong time.) func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { - if mf.inMemory { - return nil - } changes := pb.ManifestChangeSet{Changes: changesParam} buf, err := proto.Marshal(&changes) if err != nil { return err } - // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() defer mf.appendLock.Unlock() if err := applyChangeSet(&mf.manifest, &changes); err != nil { return err } + if mf.inMemory { + return nil + } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { diff --git a/pb/badgerpb4.pb.go b/pb/badgerpb4.pb.go index 17bbfc36e..d9c5af2d9 100644 --- a/pb/badgerpb4.pb.go +++ b/pb/badgerpb4.pb.go @@ -80,6 +80,55 @@ func (EncryptionAlgo) EnumDescriptor() ([]byte, []int) { return file_badgerpb4_proto_rawDescGZIP(), []int{0} } +type KV_Kind int32 + +const ( + KV_KEY KV_Kind = 0 + KV_DATA_KEY KV_Kind = 1 + KV_FILE KV_Kind = 2 +) + +// Enum value maps for KV_Kind. +var ( + KV_Kind_name = map[int32]string{ + 0: "KEY", + 1: "DATA_KEY", + 2: "FILE", + } + KV_Kind_value = map[string]int32{ + "KEY": 0, + "DATA_KEY": 1, + "FILE": 2, + } +) + +func (x KV_Kind) Enum() *KV_Kind { + p := new(KV_Kind) + *p = x + return p +} + +func (x KV_Kind) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KV_Kind) Descriptor() protoreflect.EnumDescriptor { + return file_badgerpb4_proto_enumTypes[1].Descriptor() +} + +func (KV_Kind) Type() protoreflect.EnumType { + return &file_badgerpb4_proto_enumTypes[1] +} + +func (x KV_Kind) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KV_Kind.Descriptor instead. +func (KV_Kind) EnumDescriptor() ([]byte, []int) { + return file_badgerpb4_proto_rawDescGZIP(), []int{0, 0} +} + type ManifestChange_Operation int32 const ( @@ -110,11 +159,11 @@ func (x ManifestChange_Operation) String() string { } func (ManifestChange_Operation) Descriptor() protoreflect.EnumDescriptor { - return file_badgerpb4_proto_enumTypes[1].Descriptor() + return file_badgerpb4_proto_enumTypes[2].Descriptor() } func (ManifestChange_Operation) Type() protoreflect.EnumType { - return &file_badgerpb4_proto_enumTypes[1] + return &file_badgerpb4_proto_enumTypes[2] } func (x ManifestChange_Operation) Number() protoreflect.EnumNumber { @@ -156,11 +205,11 @@ func (x Checksum_Algorithm) String() string { } func (Checksum_Algorithm) Descriptor() protoreflect.EnumDescriptor { - return file_badgerpb4_proto_enumTypes[2].Descriptor() + return file_badgerpb4_proto_enumTypes[3].Descriptor() } func (Checksum_Algorithm) Type() protoreflect.EnumType { - return &file_badgerpb4_proto_enumTypes[2] + return &file_badgerpb4_proto_enumTypes[3] } func (x Checksum_Algorithm) Number() protoreflect.EnumNumber { @@ -186,7 +235,8 @@ type KV struct { // Stream id is used to identify which stream the KV came from. StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` // Stream done is used to indicate end of stream. - StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + Kind KV_Kind `protobuf:"varint,12,opt,name=kind,proto3,enum=badgerpb4.KV_Kind" json:"kind,omitempty"` } func (x *KV) Reset() { @@ -277,6 +327,13 @@ func (x *KV) GetStreamDone() bool { return false } +func (x *KV) GetKind() KV_Kind { + if x != nil { + return x.Kind + } + return KV_KEY +} + type KVList struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -653,7 +710,7 @@ var File_badgerpb4_proto protoreflect.FileDescriptor var file_badgerpb4_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x09, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x22, 0xd4, 0x01, 0x0a, + 0x6f, 0x12, 0x09, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x22, 0xa5, 0x02, 0x0a, 0x02, 0x4b, 0x56, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x75, @@ -667,55 +724,60 @@ var file_badgerpb4_proto_rawDesc = []byte{ 0x69, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x64, 0x6f, 0x6e, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, - 0x6f, 0x6e, 0x65, 0x22, 0x44, 0x0a, 0x06, 0x4b, 0x56, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1d, 0x0a, - 0x02, 0x6b, 0x76, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x62, 0x61, 0x64, 0x67, - 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x4b, 0x56, 0x52, 0x02, 0x6b, 0x76, 0x12, 0x1b, 0x0a, 0x09, - 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x5f, 0x72, 0x65, 0x66, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x08, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x52, 0x65, 0x66, 0x22, 0x48, 0x0a, 0x11, 0x4d, 0x61, 0x6e, - 0x69, 0x66, 0x65, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, 0x33, - 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x19, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x4d, 0x61, 0x6e, 0x69, - 0x66, 0x65, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, - 0x67, 0x65, 0x73, 0x22, 0x8d, 0x02, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, - 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x02, 0x49, 0x64, 0x12, 0x33, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x23, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x4d, - 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x4f, 0x70, - 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x4c, - 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x4c, 0x65, 0x76, 0x65, - 0x6c, 0x12, 0x15, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x05, 0x6b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x6e, 0x63, 0x72, - 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x6c, 0x67, 0x6f, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x19, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x45, 0x6e, - 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x6c, 0x67, 0x6f, 0x52, 0x0e, 0x65, 0x6e, - 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x6c, 0x67, 0x6f, 0x12, 0x20, 0x0a, 0x0b, - 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x23, - 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x43, - 0x52, 0x45, 0x41, 0x54, 0x45, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4c, 0x45, 0x54, - 0x45, 0x10, 0x01, 0x22, 0x76, 0x0a, 0x08, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x12, - 0x31, 0x0a, 0x04, 0x61, 0x6c, 0x67, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, - 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, - 0x75, 0x6d, 0x2e, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x52, 0x04, 0x61, 0x6c, - 0x67, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x03, 0x73, 0x75, 0x6d, 0x22, 0x25, 0x0a, 0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, - 0x6d, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x52, 0x43, 0x33, 0x32, 0x43, 0x10, 0x00, 0x12, 0x0c, 0x0a, - 0x08, 0x58, 0x58, 0x48, 0x61, 0x73, 0x68, 0x36, 0x34, 0x10, 0x01, 0x22, 0x63, 0x0a, 0x07, 0x44, - 0x61, 0x74, 0x61, 0x4b, 0x65, 0x79, 0x12, 0x15, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x6b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x12, 0x0a, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x76, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, - 0x76, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, - 0x22, 0x42, 0x0a, 0x05, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x72, 0x65, - 0x66, 0x69, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, - 0x78, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, - 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x42, - 0x79, 0x74, 0x65, 0x73, 0x2a, 0x19, 0x0a, 0x0e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x41, 0x6c, 0x67, 0x6f, 0x12, 0x07, 0x0a, 0x03, 0x61, 0x65, 0x73, 0x10, 0x00, 0x42, - 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x67, - 0x72, 0x61, 0x70, 0x68, 0x2d, 0x69, 0x6f, 0x2f, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x2f, 0x76, - 0x34, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x65, 0x12, 0x26, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x12, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x4b, 0x56, + 0x2e, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x27, 0x0a, 0x04, 0x4b, + 0x69, 0x6e, 0x64, 0x12, 0x07, 0x0a, 0x03, 0x4b, 0x45, 0x59, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, + 0x44, 0x41, 0x54, 0x41, 0x5f, 0x4b, 0x45, 0x59, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x46, 0x49, + 0x4c, 0x45, 0x10, 0x02, 0x22, 0x44, 0x0a, 0x06, 0x4b, 0x56, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1d, + 0x0a, 0x02, 0x6b, 0x76, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x62, 0x61, 0x64, + 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x4b, 0x56, 0x52, 0x02, 0x6b, 0x76, 0x12, 0x1b, 0x0a, + 0x09, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x5f, 0x72, 0x65, 0x66, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x08, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x52, 0x65, 0x66, 0x22, 0x48, 0x0a, 0x11, 0x4d, 0x61, + 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, + 0x33, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x19, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x4d, 0x61, 0x6e, + 0x69, 0x66, 0x65, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x63, 0x68, 0x61, + 0x6e, 0x67, 0x65, 0x73, 0x22, 0x8d, 0x02, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, + 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x02, 0x49, 0x64, 0x12, 0x33, 0x0a, 0x02, 0x4f, 0x70, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x23, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, + 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x4f, + 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x02, 0x4f, 0x70, 0x12, 0x14, 0x0a, 0x05, + 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x4c, 0x65, 0x76, + 0x65, 0x6c, 0x12, 0x15, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x05, 0x6b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x6e, 0x63, + 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x6c, 0x67, 0x6f, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x45, + 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x6c, 0x67, 0x6f, 0x52, 0x0e, 0x65, + 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x6c, 0x67, 0x6f, 0x12, 0x20, 0x0a, + 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, + 0x23, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, + 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4c, 0x45, + 0x54, 0x45, 0x10, 0x01, 0x22, 0x76, 0x0a, 0x08, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, + 0x12, 0x31, 0x0a, 0x04, 0x61, 0x6c, 0x67, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, + 0x2e, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x70, 0x62, 0x34, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x73, 0x75, 0x6d, 0x2e, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x52, 0x04, 0x61, + 0x6c, 0x67, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x03, 0x73, 0x75, 0x6d, 0x22, 0x25, 0x0a, 0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, + 0x68, 0x6d, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x52, 0x43, 0x33, 0x32, 0x43, 0x10, 0x00, 0x12, 0x0c, + 0x0a, 0x08, 0x58, 0x58, 0x48, 0x61, 0x73, 0x68, 0x36, 0x34, 0x10, 0x01, 0x22, 0x63, 0x0a, 0x07, + 0x44, 0x61, 0x74, 0x61, 0x4b, 0x65, 0x79, 0x12, 0x15, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x6b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x76, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, + 0x69, 0x76, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, + 0x74, 0x22, 0x42, 0x0a, 0x05, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x72, + 0x65, 0x66, 0x69, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, 0x72, 0x65, 0x66, + 0x69, 0x78, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x5f, 0x62, 0x79, 0x74, + 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, + 0x42, 0x79, 0x74, 0x65, 0x73, 0x2a, 0x19, 0x0a, 0x0e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x41, 0x6c, 0x67, 0x6f, 0x12, 0x07, 0x0a, 0x03, 0x61, 0x65, 0x73, 0x10, 0x00, + 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, + 0x67, 0x72, 0x61, 0x70, 0x68, 0x2d, 0x69, 0x6f, 0x2f, 0x62, 0x61, 0x64, 0x67, 0x65, 0x72, 0x2f, + 0x76, 0x34, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -730,31 +792,33 @@ func file_badgerpb4_proto_rawDescGZIP() []byte { return file_badgerpb4_proto_rawDescData } -var file_badgerpb4_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_badgerpb4_proto_enumTypes = make([]protoimpl.EnumInfo, 4) var file_badgerpb4_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_badgerpb4_proto_goTypes = []interface{}{ (EncryptionAlgo)(0), // 0: badgerpb4.EncryptionAlgo - (ManifestChange_Operation)(0), // 1: badgerpb4.ManifestChange.Operation - (Checksum_Algorithm)(0), // 2: badgerpb4.Checksum.Algorithm - (*KV)(nil), // 3: badgerpb4.KV - (*KVList)(nil), // 4: badgerpb4.KVList - (*ManifestChangeSet)(nil), // 5: badgerpb4.ManifestChangeSet - (*ManifestChange)(nil), // 6: badgerpb4.ManifestChange - (*Checksum)(nil), // 7: badgerpb4.Checksum - (*DataKey)(nil), // 8: badgerpb4.DataKey - (*Match)(nil), // 9: badgerpb4.Match + (KV_Kind)(0), // 1: badgerpb4.KV.Kind + (ManifestChange_Operation)(0), // 2: badgerpb4.ManifestChange.Operation + (Checksum_Algorithm)(0), // 3: badgerpb4.Checksum.Algorithm + (*KV)(nil), // 4: badgerpb4.KV + (*KVList)(nil), // 5: badgerpb4.KVList + (*ManifestChangeSet)(nil), // 6: badgerpb4.ManifestChangeSet + (*ManifestChange)(nil), // 7: badgerpb4.ManifestChange + (*Checksum)(nil), // 8: badgerpb4.Checksum + (*DataKey)(nil), // 9: badgerpb4.DataKey + (*Match)(nil), // 10: badgerpb4.Match } var file_badgerpb4_proto_depIdxs = []int32{ - 3, // 0: badgerpb4.KVList.kv:type_name -> badgerpb4.KV - 6, // 1: badgerpb4.ManifestChangeSet.changes:type_name -> badgerpb4.ManifestChange - 1, // 2: badgerpb4.ManifestChange.Op:type_name -> badgerpb4.ManifestChange.Operation - 0, // 3: badgerpb4.ManifestChange.encryption_algo:type_name -> badgerpb4.EncryptionAlgo - 2, // 4: badgerpb4.Checksum.algo:type_name -> badgerpb4.Checksum.Algorithm - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 1, // 0: badgerpb4.KV.kind:type_name -> badgerpb4.KV.Kind + 4, // 1: badgerpb4.KVList.kv:type_name -> badgerpb4.KV + 7, // 2: badgerpb4.ManifestChangeSet.changes:type_name -> badgerpb4.ManifestChange + 2, // 3: badgerpb4.ManifestChange.Op:type_name -> badgerpb4.ManifestChange.Operation + 0, // 4: badgerpb4.ManifestChange.encryption_algo:type_name -> badgerpb4.EncryptionAlgo + 3, // 5: badgerpb4.Checksum.algo:type_name -> badgerpb4.Checksum.Algorithm + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_badgerpb4_proto_init() } @@ -853,7 +917,7 @@ func file_badgerpb4_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_badgerpb4_proto_rawDesc, - NumEnums: 3, + NumEnums: 4, NumMessages: 7, NumExtensions: 0, NumServices: 0, diff --git a/pb/badgerpb4.proto b/pb/badgerpb4.proto index 62bfc90b6..04171bbf8 100644 --- a/pb/badgerpb4.proto +++ b/pb/badgerpb4.proto @@ -33,6 +33,13 @@ message KV { uint32 stream_id = 10; // Stream done is used to indicate end of stream. bool stream_done = 11; + + enum Kind { + KEY = 0; + DATA_KEY = 1; + FILE = 2; + } + Kind kind = 12; } message KVList { diff --git a/stream.go b/stream.go index 934c8abcf..f2ce4bd21 100644 --- a/stream.go +++ b/stream.go @@ -25,9 +25,11 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/pkg/errors" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4/pb" + "github.com/dgraph-io/badger/v4/table" "github.com/dgraph-io/badger/v4/y" "github.com/dgraph-io/ristretto/v2/z" ) @@ -92,7 +94,9 @@ type Stream struct { Send func(buf *z.Buffer) error // Read data above the sinceTs. All keys with version =< sinceTs will be ignored. - SinceTs uint64 + SinceTs uint64 + // FullCopy should be set to true only when encryption mode is same for sender and receiver. + FullCopy bool readTs uint64 db *DB rangeCh chan keyRange @@ -117,9 +121,6 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() - if item.IsDeletedOrExpired() { - break - } if !bytes.Equal(key, item.Key()) { // Break out on the first encounter with another key. break @@ -137,6 +138,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { } kv.Version = item.Version() kv.ExpiresAt = item.ExpiresAt() + // As we do full copy, we need to transmit only if it is a delete key or not. + kv.Meta = []byte{item.meta & bitDelete} kv.UserMeta = a.Copy([]byte{item.UserMeta()}) list.Kv = append(list.Kv, kv) @@ -147,6 +150,12 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { if item.DiscardEarlierVersions() { break } + if item.IsDeletedOrExpired() { + // We do a FullCopy in stream. It might happen that tables from L6 contain K(version=1), + // while the table at L4 that was not copied contains K(version=2) with delete mark. + // Hence, we need to send the deleted or expired item too. + break + } } return list, nil } @@ -173,18 +182,10 @@ func (st *Stream) produceRanges(ctx context.Context) { } // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. -func (st *Stream) produceKVs(ctx context.Context, threadId int) error { +func (st *Stream) produceKVs(ctx context.Context, itr *Iterator) error { st.numProducers.Add(1) defer st.numProducers.Add(-1) - var txn *Txn - if st.readTs > 0 { - txn = st.db.NewTransactionAt(st.readTs, false) - } else { - txn = st.db.NewTransaction(false) - } - defer txn.Discard() - // produceKVs is running iterate serially. So, we can define the outList here. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs") defer func() { @@ -194,15 +195,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { }() iterate := func(kr keyRange) error { - iterOpts := DefaultIteratorOptions - iterOpts.AllVersions = true - iterOpts.Prefix = st.Prefix - iterOpts.PrefetchValues = false - iterOpts.SinceTs = st.SinceTs - itr := txn.NewIterator(iterOpts) - itr.ThreadId = threadId - defer itr.Close() - itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate") defer itr.Alloc.Release() @@ -386,6 +378,77 @@ outer: return nil } +func (st *Stream) copyTablesOver(ctx context.Context, tableMatrix [][]*table.Table) error { + // TODO: See if making this concurrent would be helpful. Most likely it won't. + // But, if it does work, then most like <3 goroutines might be sufficient. + infof := st.db.opt.Infof + // Make a copy of the manifest so that we don't have race condition. + manifest := st.db.manifest.manifest.clone() + dataKeys := make(map[uint64]struct{}) + // Iterate in reverse order so that the receiver gets the bottommost level first. + for i := len(tableMatrix) - 1; i >= 0; i-- { + level := i + tables := tableMatrix[i] + for _, t := range tables { + // This table can be picked for copying directly. + out := z.NewBuffer(int(t.Size())+1024, "Stream.Table") + if dk := t.DataKey(); dk != nil { + y.AssertTrue(dk.KeyId != 0) + // If we have a legit data key, send it over so the table can be decrypted. The same + // data key could have been used to encrypt many tables. Avoid sending it + // repeatedly. + if _, sent := dataKeys[dk.KeyId]; !sent { + infof("Sending data key with ID: %d\n", dk.KeyId) + val, err := proto.Marshal(dk) + y.Check(err) + + // This would go to key registry in destination. + kv := &pb.KV{ + Value: val, + Kind: pb.KV_DATA_KEY, + } + KVToBuffer(kv, out) + dataKeys[dk.KeyId] = struct{}{} + } + } + + infof("Sending table ID: %d at level: %d. Size: %s\n", + t.ID(), level, humanize.IBytes(uint64(t.Size()))) + tableManifest := manifest.Tables[t.ID()] + + change := &pb.ManifestChange{ + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + KeyId: tableManifest.KeyID, + // Hard coding it, since we're supporting only AES for now. + EncryptionAlgo: pb.EncryptionAlgo_aes, + Compression: uint32(tableManifest.Compression), + } + buf, err := proto.Marshal(change) + y.Check(err) + + // We send the table along with level to the destination, so they'd know where to + // place the tables. We'd send all the tables first, before we start streaming. So, the + // destination DB would write streamed keys one level above. + kv := &pb.KV{ + // Key can be used for MANIFEST. + Key: buf, + Value: t.Data, + Kind: pb.KV_FILE, + } + KVToBuffer(kv, out) + + select { + case st.kvChan <- out: + case <-ctx.Done(): + _ = out.Release() + return ctx.Err() + } + } + } + return nil +} + // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also @@ -393,6 +456,11 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { + if st.FullCopy { + if !st.db.opt.managedTxns || st.SinceTs != 0 || st.ChooseKey != nil && st.KeyToList != nil { + panic("Got invalid stream options when doing full copy") + } + } ctx, cancel := context.WithCancel(ctx) defer cancel() st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. @@ -406,7 +474,127 @@ func (st *Stream) Orchestrate(ctx context.Context) error { st.KeyToList = st.ToList } + // Pick up key-values from kvChan and send to stream. + kvErr := make(chan error, 1) + go func() { + // Picks up KV lists from kvChan, and sends them to Output. + err := st.streamKVs(ctx) + if err != nil { + cancel() // Stop all the go routines. + } + kvErr <- err + }() + + // Pick all relevant tables from levels. We'd use this to copy them over, + // or generate iterators from them. + memTables, decr := st.db.getMemTables() + defer decr() + + opts := DefaultIteratorOptions + opts.Prefix = st.Prefix + opts.SinceTs = st.SinceTs + tableMatrix := st.db.lc.getTables(&opts) + defer func() { + for _, tables := range tableMatrix { + for _, t := range tables { + _ = t.DecrRef() + } + } + }() + y.AssertTrue(len(tableMatrix) == st.db.opt.MaxLevels) + + infof := st.db.opt.Infof + copyTables := func() error { + // Figure out which tables we can copy. Only choose from the last 2 levels. + // Say last level has data of size 100. Given a 10x level multiplier and + // assuming the tree is balanced, second last level would have 10, and the + // third last level would have 1. The third last level would only have 1% + // of the data of the last level. It's OK for us to stop there and just + // stream it, instead of trying to copy over those tables too. When we + // copy over tables to Level i, we can't stream any data to level i, i+1, + // and so on. The stream has to create tables at level i-1, so there can be + // overlap between the tables at i-1 and i. + + // Let's pick the tables which can be fully copied over from last level. + threshold := len(tableMatrix) - 2 + toCopy := make([][]*table.Table, len(tableMatrix)) + var numCopy, numStream int + for lev, tables := range tableMatrix { + // We stream only the data in the two bottommost levels. + if lev < threshold { + numStream += len(tables) + continue + } + var rem []*table.Table + cp := tables[:0] + for _, t := range tables { + // We can only copy over those tables that satisfy following conditions: + // - All the keys have version less than st.readTs + // - st.Prefix fully covers the table + if t.MaxVersion() > st.readTs || !t.CoveredByPrefix(st.Prefix) { + rem = append(rem, t) + continue + } + cp = append(cp, t) + } + toCopy[lev] = cp // Pick tables to copy. + tableMatrix[lev] = rem // Keep remaining for streaming. + numCopy += len(cp) + numStream += len(rem) + } + infof("Num tables to copy: %d. Num to stream: %d\n", numCopy, numStream) + + return st.copyTablesOver(ctx, toCopy) + } + + if st.FullCopy { + // As of now, we don't handle the non-zero SinceTs. + if err := copyTables(); err != nil { + return errors.Wrap(err, "while copying tables") + } + } + + var txn *Txn + if st.readTs > 0 { + txn = st.db.NewTransactionAt(st.readTs, false) + } else { + txn = st.db.NewTransaction(false) + } + defer txn.Discard() + + newIterator := func(threadId int) *Iterator { + var itrs []y.Iterator + for _, mt := range memTables { + itrs = append(itrs, mt.sl.NewUniIterator(false)) + } + if tables := tableMatrix[0]; len(tables) > 0 { + itrs = append(itrs, iteratorsReversed(tables, 0)...) + } + for _, tables := range tableMatrix[1:] { + if len(tables) == 0 { + continue + } + itrs = append(itrs, table.NewConcatIterator(tables, 0)) + } + + opt := DefaultIteratorOptions + opt.AllVersions = true + opt.Prefix = st.Prefix + opt.PrefetchValues = false + opt.SinceTs = st.SinceTs + + res := &Iterator{ + txn: txn, + iitr: table.NewMergeIterator(itrs, false), + opt: opt, + readTs: txn.readTs, + ThreadId: threadId, + } + return res + } + // Picks up ranges from Badger, and sends them to rangeCh. + // Just for simplicity, we'd consider all the tables for range production. go st.produceRanges(ctx) errCh := make(chan error, st.NumGo) // Stores error by consumeKeys. @@ -417,7 +605,9 @@ func (st *Stream) Orchestrate(ctx context.Context) error { go func(threadId int) { defer wg.Done() // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan. - if err := st.produceKVs(ctx, threadId); err != nil { + itr := newIterator(threadId) + defer itr.Close() + if err := st.produceKVs(ctx, itr); err != nil { select { case errCh <- err: default: @@ -426,16 +616,6 @@ func (st *Stream) Orchestrate(ctx context.Context) error { }(i) } - // Pick up key-values from kvChan and send to stream. - kvErr := make(chan error, 1) - go func() { - // Picks up KV lists from kvChan, and sends them to Output. - err := st.streamKVs(ctx) - if err != nil { - cancel() // Stop all the go routines. - } - kvErr <- err - }() wg.Wait() // Wait for produceKVs to be over. close(st.kvChan) // Now we can close kvChan. defer func() { diff --git a/stream_writer.go b/stream_writer.go index 0059c1eae..a1a0b4a92 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -43,13 +43,18 @@ import ( // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new // DBs. type StreamWriter struct { - writeLock sync.Mutex - db *DB - done func() - throttle *y.Throttle - maxVersion uint64 - writers map[uint32]*sortedWriter - prevLevel int + writeLock sync.Mutex + db *DB + done func() + throttle *y.Throttle + maxVersion uint64 + writers map[uint32]*sortedWriter + prevLevel int + senderPrevLevel int + keyId map[uint64]*pb.DataKey // map stores reader's keyId to data key. + // Writer might receive tables first, and then receive keys. If true, that means we have + // started processing keys. + processingKeys bool } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -63,6 +68,7 @@ func (db *DB) NewStreamWriter() *StreamWriter { // concurrent streams being processed. throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), + keyId: make(map[uint64]*pb.DataKey), } } @@ -165,8 +171,64 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if _, ok := closedStreams[kv.StreamId]; ok { panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) } + switch kv.Kind { + case pb.KV_DATA_KEY: + sw.writeLock.Lock() + defer sw.writeLock.Unlock() + y.AssertTrue(len(sw.db.opt.EncryptionKey) > 0) + var dk pb.DataKey + if err := proto.Unmarshal(kv.Value, &dk); err != nil { + return errors.Wrapf(err, "unmarshal failed %s", kv.Value) + } + readerId := dk.KeyId + if _, ok := sw.keyId[readerId]; !ok { + // Insert the data key to the key registry if not already inserted. + id, err := sw.db.registry.AddKey(&dk) + if err != nil { + return errors.Wrap(err, "failed to write data key") + } + dk.KeyId = id + sw.keyId[readerId] = &dk + } + return nil + case pb.KV_FILE: + sw.writeLock.Lock() + defer sw.writeLock.Unlock() + // All tables should be recieved before any of the keys. + if sw.processingKeys { + return errors.New("Received pb.KV_FILE after pb.KV_KEY") + } + var change pb.ManifestChange + if err := proto.Unmarshal(kv.Key, &change); err != nil { + return errors.Wrap(err, "unable to unmarshal manifest change") + } + level := int(change.Level) + if sw.senderPrevLevel == 0 { + // We received the first file, set the sender's and receiver's max levels. + sw.senderPrevLevel = level + sw.prevLevel = len(sw.db.lc.levels) - 1 + } + + // This is based on the assumption that the tables from the last + // level will be sent first and then the second last level tables. + // As long as the kv.Version (which stores the level) is same as + // the prevLevel, we know we're processing a last level table. + // The last level for this DB can be 8 while the DB that's sending + // this could have the last level at 7. + if sw.senderPrevLevel != level { + // If the previous level and the current level is different, we + // must be processing a table from the next last level. + sw.senderPrevLevel = level + sw.prevLevel-- + } + dk := sw.keyId[change.KeyId] + return sw.db.lc.AddTable(&kv, sw.prevLevel, dk, &change) + case pb.KV_KEY: + // Pass. The following code will handle the keys. + } sw.writeLock.Lock() + sw.processingKeys = true if sw.maxVersion < kv.Version { sw.maxVersion = kv.Version } @@ -185,6 +247,7 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if len(kv.UserMeta) > 0 { userMeta = kv.UserMeta[0] } + e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), Value: y.Copy(kv.Value), diff --git a/table/table.go b/table/table.go index 1f90a73e5..a53067d2d 100644 --- a/table/table.go +++ b/table/table.go @@ -145,10 +145,13 @@ func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSi // KeyCount is the total number of keys in this table. func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount } -// OnDiskSize returns the total size of key-values stored in this table (including the -// disk space occupied on the value log). +// OnDiskSize returns the total size of key-values stored in this table +// (including the disk space occupied on the value log). func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize } +// DataKey returns the encryption key +func (t *Table) DataKey() *pb.DataKey { return t.opt.DataKey } + // CompressionType returns the compression algorithm used for block compression. func (t *Table) CompressionType() options.CompressionType { return t.opt.Compression @@ -255,7 +258,21 @@ func (b *Block) verifyCheckSum() error { func CreateTable(fname string, builder *Builder) (*Table, error) { bd := builder.Done() - mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size) + mf, err := newFile(fname, bd.Size) + if err != nil { + return nil, err + } + + written := bd.Copy(mf.Data) + y.AssertTrue(written == len(mf.Data)) + if err := z.Msync(mf.Data); err != nil { + return nil, y.Wrapf(err, "while calling msync on %s", fname) + } + return OpenTable(mf, *builder.opts) +} + +func newFile(fname string, sz int) (*z.MmapFile, error) { + mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, sz) if err == z.NewFile { // Expected. } else if err != nil { @@ -263,13 +280,22 @@ func CreateTable(fname string, builder *Builder) (*Table, error) { } else { return nil, errors.Errorf("file already exists: %s", fname) } + return mf, nil +} - written := bd.Copy(mf.Data) +func CreateTableFromBuffer(fname string, buf []byte, opts Options) (*Table, error) { + mf, err := newFile(fname, len(buf)) + if err != nil { + return nil, err + } + + // We cannot use the buf directly here because it is not mmapped. + written := copy(mf.Data, buf) y.AssertTrue(written == len(mf.Data)) if err := z.Msync(mf.Data); err != nil { return nil, y.Wrapf(err, "while calling msync on %s", fname) } - return OpenTable(mf, *builder.opts) + return OpenTable(mf, opts) } // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function @@ -692,6 +718,12 @@ func (t *Table) DoesNotHave(hash uint32) bool { return !mayContain } +// CoveredByPrefix returns true if all the keys in the table are prefixed by the given prefix. +func (t *Table) CoveredByPrefix(prefix []byte) bool { + return bytes.HasPrefix(y.ParseKey(t.Biggest()), prefix) && + bytes.HasPrefix(y.ParseKey(t.Smallest()), prefix) +} + // readTableIndex reads table index from the sst and returns its pb format. func (t *Table) readTableIndex() (*fb.TableIndex, error) { data := t.readNoFail(t.indexStart, t.indexLen) diff --git a/test.sh b/test.sh index 53b5477aa..e6ade0ed9 100755 --- a/test.sh +++ b/test.sh @@ -99,7 +99,6 @@ write_coverage() { cat cover_tmp.out >> cover.out && rm cover_tmp.out fi fi - } # parallel tests currently not working