diff --git a/db.go b/db.go index a94c3bfbe..d8d57860d 100644 --- a/db.go +++ b/db.go @@ -1996,6 +1996,7 @@ func (db *DB) StreamDB(outOptions Options) error { // Stream contents of DB to the output DB. stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir) + stream.FullCopy = true stream.Send = func(buf *z.Buffer) error { return writer.Write(buf) diff --git a/iterator.go b/iterator.go index c56259881..d23517af4 100644 --- a/iterator.go +++ b/iterator.go @@ -366,17 +366,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool { // that the tables are sorted in the right order. func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table { filterTables := func(tables []*table.Table) []*table.Table { - if opt.SinceTs > 0 { - tmp := tables[:0] - for _, t := range tables { - if t.MaxVersion() < opt.SinceTs { - continue - } - tmp = append(tmp, t) + if opt.SinceTs == 0 { + return tables + } + out := tables[:0] + for _, t := range tables { + if t.MaxVersion() < opt.SinceTs { + continue } - tables = tmp + out = append(out, t) } - return tables + return out } if len(opt.Prefix) == 0 { @@ -489,7 +489,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { for i := 0; i < len(tables); i++ { iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse)) } - iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references. + iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references. res := &Iterator{ txn: txn, iitr: table.NewMergeIterator(iters, opt.Reverse), diff --git a/key_registry.go b/key_registry.go index a1be0435d..1568e2705 100644 --- a/key_registry.go +++ b/key_registry.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/badger/v4/y" ) @@ -264,7 +266,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error { // Write all the datakeys to the buf. for _, k := range reg.dataKeys { // Writing the datakey to the given buffer. - if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil { + if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil { return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry") } } @@ -338,8 +340,7 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { defer kr.Unlock() // Key might have generated by another go routine. So, // checking once again. - key, valid = validKey() - if valid { + if key, valid := validKey(); valid { return key, nil } k := make([]byte, len(kr.opt.EncryptionKey)) @@ -347,35 +348,50 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { if err != nil { return nil, err } - _, err = rand.Read(k) - if err != nil { + + if _, err := rand.Read(k); err != nil { return nil, err } // Otherwise Increment the KeyID and generate new datakey. kr.nextKeyID++ - dk := &pb.DataKey{ + dk := pb.DataKey{ KeyId: kr.nextKeyID, Data: k, CreatedAt: time.Now().Unix(), Iv: iv, } + kr.lastCreated = dk.CreatedAt + kr.dataKeys[kr.nextKeyID] = &dk // Don't store the datakey on file if badger is running in InMemory mode. - if !kr.opt.InMemory { - // Store the datekey. - buf := &bytes.Buffer{} - if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil { - return nil, err - } - // Persist the datakey to the disk - if _, err = kr.fp.Write(buf.Bytes()); err != nil { - return nil, err - } + if kr.opt.InMemory { + return &dk, nil + } - // storeDatakey encrypts the datakey So, placing un-encrypted key in the memory. - dk.Data = k - kr.lastCreated = dk.CreatedAt - kr.dataKeys[kr.nextKeyID] = dk - return dk, nil + // Store the datekey. + if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil { + return nil, err + } + return &dk, nil +} + +func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) { + // If we don't have a encryption key, we cannot store the datakey. + if len(kr.opt.EncryptionKey) == 0 { + return 0, errors.New("No encryption key found. Cannot add data key") + } + + if _, ok := kr.dataKeys[dk.KeyId]; !ok { + // If KeyId does not exists already, then use the next available KeyId to store data key. + kr.nextKeyID++ + dk.KeyId = kr.nextKeyID + } + kr.dataKeys[dk.KeyId] = &dk + + if kr.opt.InMemory { + return dk.KeyId, nil + } + // Store the datakey. + return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk) } // Close closes the key registry. @@ -387,7 +403,8 @@ func (kr *KeyRegistry) Close() error { } // storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset. -func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { +// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field. +func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error { // xor will encrypt the IV and xor with the given data. // It'll used for both encryption and decryption. xor := func() error { @@ -395,30 +412,21 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { return nil } var err error - k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv) + key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv) return err } // In memory datakey will be plain text so encrypting before storing to the disk. - var err error - if err = xor(); err != nil { + if err := xor(); err != nil { return y.Wrapf(err, "Error while encrypting datakey in storeDataKey") } - var data []byte - if data, err = k.Marshal(); err != nil { - err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey") - var err2 error - // decrypting the datakey back. - if err2 = xor(); err2 != nil { - return y.Wrapf(err, - y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error()) - } - return err + data, err := key.Marshal() + if err != nil { + return y.Wrapf(err, "Error while marshaling datakey in storeDataKey") } var lenCrcBuf [8]byte binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data))) binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable)) - y.Check2(buf.Write(lenCrcBuf[:])) - y.Check2(buf.Write(data)) - // Decrypting the datakey back since we're using the pointer. - return xor() + y.Check2(w.Write(lenCrcBuf[:])) + y.Check2(w.Write(data)) + return nil } diff --git a/level_handler.go b/level_handler.go index 31673f15b..6a8e95084 100644 --- a/level_handler.go +++ b/level_handler.go @@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { return maxVs, decr() } -// appendIterators appends iterators to an array of iterators, for merging. +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator { s.RLock() defer s.RUnlock() @@ -324,14 +324,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) out = append(out, t) } } - return appendIteratorsReversed(iters, out, topt) + return iteratorsReversed(out, topt) } tables := opt.pickTables(s.tables) if len(tables) == 0 { - return iters + return nil } - return append(iters, table.NewConcatIterator(tables, topt)) + return []y.Iterator{table.NewConcatIterator(tables, topt)} +} + +func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table { + if opt.Reverse { + panic("Invalid option for getTables") + } + + s.RLock() + defer s.RUnlock() + + if s.level == 0 { + var out []*table.Table + for _, t := range s.tables { + if opt.pickTable(t) { + t.IncrRef() + out = append(out, t) + } + } + return out + } + + tables := opt.pickTables(s.tables) + for _, t := range tables { + t.IncrRef() + } + return tables } type levelHandlerRLocked struct{} diff --git a/levels.go b/levels.go index 9b26038de..d69324c01 100644 --- a/levels.go +++ b/levels.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" otrace "go.opencensus.io/trace" + "github.com/dgraph-io/badger/v4/options" "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/badger/v4/table" "github.com/dgraph-io/badger/v4/y" @@ -895,7 +896,7 @@ func (s *levelsController) compactBuildTables( var iters []y.Iterator switch { case lev == 0: - iters = appendIteratorsReversed(iters, topTables, table.NOCACHE) + iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...) case len(topTables) > 0: y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)} @@ -1608,7 +1609,8 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) return maxVs, nil } -func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator { +func iteratorsReversed(th []*table.Table, opt int) []y.Iterator { + out := make([]y.Iterator, 0, len(th)) for i := len(th) - 1; i >= 0; i-- { // This will increment the reference of the table handler. out = append(out, th[i].NewIterator(opt)) @@ -1616,16 +1618,25 @@ func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.I return out } -// appendIterators appends iterators to an array of iterators, for merging. +// getTables return tables from all levels. It would call IncrRef on all returned tables. +func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table { + res := make([][]*table.Table, 0, len(s.levels)) + for _, level := range s.levels { + res = append(res, level.getTables(opt)) + } + return res +} + +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelsController) appendIterators( - iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator { // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing // data when there's a compaction. + itrs := make([]y.Iterator, 0, len(s.levels)) for _, level := range s.levels { - iters = level.appendIterators(iters, opt) + itrs = append(itrs, level.iterators(opt)...) } - return iters + return itrs } // TableInfo represents the information about a table. @@ -1752,3 +1763,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string { sort.Strings(splits) return splits } + +// AddTable builds the table from the KV.value options passed through the KV.Key. +func (lc *levelsController) AddTable( + kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error { + // TODO: Encryption / Decryption might be required for the table, if the sender and receiver + // don't have same encryption mode. See if inplace encryption/decryption can be done. + // Tables are sent in the sorted order, so no need to sort them here. + encrypted := len(lc.kv.opt.EncryptionKey) > 0 + y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted)) + // The keyId is zero if there is no encryption. + opts := buildTableOptions(lc.kv) + opts.Compression = options.CompressionType(change.Compression) + opts.DataKey = dk + + fileID := lc.reserveFileID() + fname := table.NewFilename(fileID, lc.kv.opt.Dir) + + // kv.Value is owned by the z.buffer. Ensure that we copy this buffer. + var tbl *table.Table + var err error + if lc.kv.opt.InMemory { + if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil { + return errors.Wrap(err, "while creating in-memory table from buffer") + } + } else { + if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil { + return errors.Wrap(err, "while creating table from buffer") + } + } + + lc.levels[lev].addTable(tbl) + // Release the ref held by OpenTable. addTable would add a reference. + _ = tbl.DecrRef() + + change.Id = fileID + change.Level = uint32(lev) + if dk != nil { + change.KeyId = dk.KeyId + } + // We use the same data KeyId. So, change.KeyId remains the same. + y.AssertTrue(change.Op == pb.ManifestChange_CREATE) + return lc.kv.manifest.addChanges([]*pb.ManifestChange{change}) +} diff --git a/levels_test.go b/levels_test.go index 67633ed4f..108214616 100644 --- a/levels_test.go +++ b/levels_test.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "io/ioutil" "math" "math/rand" "os" @@ -40,7 +41,15 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { BloomFalsePositive: db.opt.BloomFalsePositive, ChkMode: options.NoVerification, } - b := table.NewTableBuilder(opts) + createAndOpenWithOptions(db, td, level, &opts) +} + +func createAndOpenWithOptions(db *DB, td []keyValVersion, level int, opts *table.Options) { + if opts == nil { + bopts := buildTableOptions(db) + opts = &bopts + } + b := table.NewTableBuilder(*opts) defer b.Close() // Add all keys and versions to the table. @@ -49,13 +58,21 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} b.Add(key, val, 0) } - fname := table.NewFilename(db.lc.reserveFileID(), db.opt.Dir) - tab, err := table.CreateTable(fname, b) + fileID := db.lc.reserveFileID() + var tab *table.Table + var err error + if db.opt.InMemory { + data := b.Finish() + tab, err = table.OpenInMemoryTable(data, fileID, opts) + } else { + fname := table.NewFilename(fileID, db.opt.Dir) + tab, err = table.CreateTable(fname, b) + } if err != nil { panic(err) } if err := db.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(tab.ID(), level, 0, tab.CompressionType()), + newCreateChange(tab.ID(), level, tab.KeyID(), tab.CompressionType()), }); err != nil { panic(err) } @@ -1302,3 +1319,85 @@ func TestStaleDataCleanup(t *testing.T) { }) } + +func TestStreamWithFullCopy(t *testing.T) { + dbopts := DefaultOptions("") + dbopts.managedTxns = true + dbopts.MaxLevels = 7 + dbopts.NumVersionsToKeep = math.MaxInt32 + + encKey := make([]byte, 24) + _, err := rand.Read(encKey) + require.NoError(t, err) + + test := func(db *DB, outOpts Options) { + l4 := []keyValVersion{{"a", "1", 3, bitDelete}, {"d", "4", 3, 0}} + l5 := []keyValVersion{{"b", "2", 2, 0}} + l6 := []keyValVersion{{"a", "1", 2, 0}, {"c", "3", 1, 0}} + createAndOpenWithOptions(db, l4, 4, nil) + createAndOpenWithOptions(db, l5, 5, nil) + createAndOpenWithOptions(db, l6, 6, nil) + + if !outOpts.InMemory { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + outOpts.Dir = dir + outOpts.ValueDir = dir + } + + require.NoError(t, db.StreamDB(outOpts)) + out, err := Open(outOpts) + require.NoError(t, err) + defer func() { + require.NoError(t, out.Close()) + }() + err = out.View(func(txn *Txn) error { + // Key "a" should not be there because we deleted it at higher version. + _, err := txn.Get([]byte("a")) + require.Error(t, err) + require.Equal(t, err, ErrKeyNotFound) + _, err = txn.Get([]byte("b")) + require.NoError(t, err) + _, err = txn.Get([]byte("c")) + require.NoError(t, err) + _, err = txn.Get([]byte("d")) + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + t.Run("without encryption", func(t *testing.T) { + opts := dbopts + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + }) + }) + t.Run("with encryption", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) + t.Run("stream from in-memory to persistent", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + opts.InMemory = true + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + outOpts := opts + outOpts.InMemory = false + test(db, outOpts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) +} diff --git a/manifest.go b/manifest.go index e681ae0a0..e04aa7f7e 100644 --- a/manifest.go +++ b/manifest.go @@ -128,7 +128,7 @@ func (m *Manifest) clone() Manifest { func openOrCreateManifestFile(opt Options) ( ret *manifestFile, result Manifest, err error) { if opt.InMemory { - return &manifestFile{inMemory: true}, Manifest{}, nil + return &manifestFile{inMemory: true, manifest: createManifest()}, Manifest{}, nil } return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion, manifestDeletionsRewriteThreshold) @@ -206,21 +206,20 @@ func (mf *manifestFile) close() error { // this depends on the filesystem -- some might append garbage data if a system crash happens at // the wrong time.) func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { - if mf.inMemory { - return nil - } changes := pb.ManifestChangeSet{Changes: changesParam} buf, err := proto.Marshal(&changes) if err != nil { return err } - // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() defer mf.appendLock.Unlock() if err := applyChangeSet(&mf.manifest, &changes); err != nil { return err } + if mf.inMemory { + return nil + } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { diff --git a/pb/badgerpb4.pb.go b/pb/badgerpb4.pb.go index 521f99293..ac94cfc4c 100644 --- a/pb/badgerpb4.pb.go +++ b/pb/badgerpb4.pb.go @@ -44,6 +44,34 @@ func (EncryptionAlgo) EnumDescriptor() ([]byte, []int) { return fileDescriptor_452c1d780baa15ef, []int{0} } +type KV_Kind int32 + +const ( + KV_KEY KV_Kind = 0 + KV_DATA_KEY KV_Kind = 1 + KV_FILE KV_Kind = 2 +) + +var KV_Kind_name = map[int32]string{ + 0: "KEY", + 1: "DATA_KEY", + 2: "FILE", +} + +var KV_Kind_value = map[string]int32{ + "KEY": 0, + "DATA_KEY": 1, + "FILE": 2, +} + +func (x KV_Kind) String() string { + return proto.EnumName(KV_Kind_name, int32(x)) +} + +func (KV_Kind) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_452c1d780baa15ef, []int{0, 0} +} + type ManifestChange_Operation int32 const ( @@ -104,7 +132,8 @@ type KV struct { // Stream id is used to identify which stream the KV came from. StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` // Stream done is used to indicate end of stream. - StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + Kind KV_Kind `protobuf:"varint,12,opt,name=kind,proto3,enum=badgerpb4.KV_Kind" json:"kind,omitempty"` } func (m *KV) Reset() { *m = KV{} } @@ -196,6 +225,13 @@ func (m *KV) GetStreamDone() bool { return false } +func (m *KV) GetKind() KV_Kind { + if m != nil { + return m.Kind + } + return KV_KEY +} + type KVList struct { Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` // alloc_ref used internally for memory management. @@ -552,6 +588,7 @@ func (m *Match) GetIgnoreBytes() string { func init() { proto.RegisterEnum("badgerpb4.EncryptionAlgo", EncryptionAlgo_name, EncryptionAlgo_value) + proto.RegisterEnum("badgerpb4.KV_Kind", KV_Kind_name, KV_Kind_value) proto.RegisterEnum("badgerpb4.ManifestChange_Operation", ManifestChange_Operation_name, ManifestChange_Operation_value) proto.RegisterEnum("badgerpb4.Checksum_Algorithm", Checksum_Algorithm_name, Checksum_Algorithm_value) proto.RegisterType((*KV)(nil), "badgerpb4.KV") @@ -566,48 +603,52 @@ func init() { func init() { proto.RegisterFile("badgerpb4.proto", fileDescriptor_452c1d780baa15ef) } var fileDescriptor_452c1d780baa15ef = []byte{ - // 653 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0x4f, 0x6b, 0xdb, 0x4e, - 0x10, 0xf5, 0xca, 0xf2, 0xbf, 0x71, 0xe2, 0xf8, 0xb7, 0xfc, 0x5a, 0x14, 0x4a, 0x5c, 0x47, 0xa1, - 0x60, 0x0a, 0xb5, 0x69, 0x1c, 0x7a, 0xe9, 0xc9, 0xff, 0x20, 0xc6, 0x09, 0x81, 0x6d, 0x08, 0xa1, - 0x17, 0xb3, 0x96, 0xc6, 0xb6, 0xb0, 0x2d, 0x89, 0xd5, 0x5a, 0xc4, 0x1f, 0xa2, 0xd0, 0x8f, 0xd5, - 0x63, 0x0e, 0x3d, 0xf4, 0x58, 0x92, 0x2f, 0x52, 0x76, 0xa5, 0xb8, 0xf6, 0xa1, 0xb7, 0x99, 0x37, - 0xa3, 0x79, 0xa3, 0xf7, 0x46, 0x82, 0xa3, 0x09, 0x77, 0x67, 0x28, 0xc2, 0xc9, 0x45, 0x33, 0x14, - 0x81, 0x0c, 0x68, 0x69, 0x0b, 0xd8, 0x3f, 0x09, 0x18, 0xa3, 0x3b, 0x5a, 0x85, 0xec, 0x02, 0x37, - 0x16, 0xa9, 0x93, 0xc6, 0x01, 0x53, 0x21, 0xfd, 0x1f, 0x72, 0x31, 0x5f, 0xae, 0xd1, 0x32, 0x34, - 0x96, 0x24, 0xf4, 0x0d, 0x94, 0xd6, 0x11, 0x8a, 0xf1, 0x0a, 0x25, 0xb7, 0xb2, 0xba, 0x52, 0x54, - 0xc0, 0x35, 0x4a, 0x4e, 0x2d, 0x28, 0xc4, 0x28, 0x22, 0x2f, 0xf0, 0x2d, 0xb3, 0x4e, 0x1a, 0x26, - 0x7b, 0x49, 0xe9, 0x09, 0x00, 0x3e, 0x84, 0x9e, 0xc0, 0x68, 0xcc, 0xa5, 0x95, 0xd3, 0xc5, 0x52, - 0x8a, 0x74, 0x24, 0xa5, 0x60, 0xea, 0x81, 0x79, 0x3d, 0x50, 0xc7, 0x8a, 0x29, 0x92, 0x02, 0xf9, - 0x6a, 0xec, 0xb9, 0x16, 0xd4, 0x49, 0xe3, 0x90, 0x15, 0x13, 0x60, 0xe8, 0xd2, 0xb7, 0x50, 0x4e, - 0x8b, 0x6e, 0xe0, 0xa3, 0x55, 0xae, 0x93, 0x46, 0x91, 0x41, 0x02, 0xf5, 0x03, 0x1f, 0xed, 0x3e, - 0xe4, 0x47, 0x77, 0x57, 0x5e, 0x24, 0xe9, 0x09, 0x18, 0x8b, 0xd8, 0x22, 0xf5, 0x6c, 0xa3, 0x7c, - 0x7e, 0xd8, 0xfc, 0xab, 0xc4, 0xe8, 0x8e, 0x19, 0x8b, 0x58, 0xd1, 0xf0, 0xe5, 0x32, 0x70, 0xc6, - 0x02, 0xa7, 0x9a, 0xc6, 0x64, 0x45, 0x0d, 0x30, 0x9c, 0xda, 0x97, 0xf0, 0xdf, 0x35, 0xf7, 0xbd, - 0x29, 0x46, 0xb2, 0x37, 0xe7, 0xfe, 0x0c, 0xbf, 0xa0, 0xa4, 0x6d, 0x28, 0x38, 0x3a, 0x89, 0xd2, - 0xa9, 0xc7, 0x3b, 0x53, 0xf7, 0xdb, 0xd9, 0x4b, 0xa7, 0xfd, 0xcd, 0x80, 0xca, 0x7e, 0x8d, 0x56, - 0xc0, 0x18, 0xba, 0x5a, 0x71, 0x93, 0x19, 0x43, 0x97, 0xb6, 0xc1, 0xb8, 0x09, 0xb5, 0xda, 0x95, - 0xf3, 0xb3, 0x7f, 0x8e, 0x6c, 0xde, 0x84, 0x28, 0xb8, 0xf4, 0x02, 0x9f, 0x19, 0x37, 0xa1, 0x72, - 0xe9, 0x0a, 0x63, 0x5c, 0x6a, 0x2f, 0x0e, 0x59, 0x92, 0xd0, 0x57, 0x90, 0x5f, 0xe0, 0x46, 0x09, - 0x97, 0xf8, 0x90, 0x5b, 0xe0, 0x66, 0xe8, 0xd2, 0x2e, 0x1c, 0xa1, 0xef, 0x88, 0x4d, 0xa8, 0x1e, - 0x1f, 0xf3, 0xe5, 0x2c, 0xd0, 0x56, 0x54, 0xf6, 0xde, 0x60, 0xb0, 0xed, 0xe8, 0x2c, 0x67, 0x01, - 0xab, 0xe0, 0x5e, 0x4e, 0xeb, 0x50, 0x76, 0x82, 0x55, 0x28, 0x30, 0xd2, 0x3e, 0xe7, 0x35, 0xed, - 0x2e, 0x64, 0x9f, 0x41, 0x69, 0xbb, 0x23, 0x05, 0xc8, 0xf7, 0xd8, 0xa0, 0x73, 0x3b, 0xa8, 0x66, - 0x54, 0xdc, 0x1f, 0x5c, 0x0d, 0x6e, 0x07, 0x55, 0x62, 0xc7, 0x50, 0xec, 0xcd, 0xd1, 0x59, 0x44, - 0xeb, 0x15, 0xfd, 0x08, 0xa6, 0xde, 0x85, 0xe8, 0x5d, 0x4e, 0x76, 0x76, 0x79, 0x69, 0x69, 0x2a, - 0x6a, 0xe1, 0xc9, 0xf9, 0x8a, 0xe9, 0x56, 0x75, 0xae, 0xd1, 0x7a, 0xa5, 0xc5, 0x32, 0x99, 0x0a, - 0xed, 0x77, 0x50, 0xda, 0x36, 0x25, 0xac, 0xbd, 0xf6, 0x79, 0xaf, 0x9a, 0xa1, 0x07, 0x50, 0xbc, - 0xbf, 0xbf, 0xe4, 0xd1, 0xfc, 0xd3, 0x45, 0x95, 0xd8, 0x0e, 0x14, 0xfa, 0x5c, 0xf2, 0x11, 0x6e, - 0x76, 0x44, 0x22, 0xbb, 0x22, 0x51, 0x30, 0x5d, 0x2e, 0x79, 0x7a, 0xf6, 0x3a, 0x56, 0x56, 0x79, - 0x71, 0x7a, 0xee, 0x86, 0x17, 0xab, 0x73, 0x76, 0x04, 0x72, 0x89, 0xae, 0x3a, 0x67, 0xa5, 0x71, - 0x96, 0x95, 0x52, 0xa4, 0x23, 0xed, 0x2e, 0xe4, 0xae, 0xb9, 0x74, 0xe6, 0xf4, 0x35, 0xe4, 0x43, - 0x81, 0x53, 0xef, 0x21, 0xfd, 0xb0, 0xd2, 0x8c, 0x9e, 0xc2, 0x81, 0x37, 0xf3, 0x03, 0x81, 0xe3, - 0xc9, 0x46, 0x62, 0xa4, 0xb9, 0x4a, 0xac, 0x9c, 0x60, 0x5d, 0x05, 0xbd, 0x3f, 0x86, 0xca, 0xbe, - 0x13, 0xb4, 0x00, 0x59, 0x8e, 0x51, 0x35, 0xd3, 0xfd, 0xfc, 0xe3, 0xa9, 0x46, 0x1e, 0x9f, 0x6a, - 0xe4, 0xf7, 0x53, 0x8d, 0x7c, 0x7f, 0xae, 0x65, 0x1e, 0x9f, 0x6b, 0x99, 0x5f, 0xcf, 0xb5, 0xcc, - 0xd7, 0xd3, 0x99, 0x27, 0xe7, 0xeb, 0x49, 0xd3, 0x09, 0x56, 0x2d, 0x77, 0x26, 0x78, 0x38, 0xff, - 0xe0, 0x05, 0xad, 0x44, 0xcf, 0x56, 0x7c, 0xd1, 0x0a, 0x27, 0x93, 0xbc, 0xfe, 0x03, 0xb4, 0xff, - 0x04, 0x00, 0x00, 0xff, 0xff, 0xec, 0x26, 0x3b, 0x76, 0x14, 0x04, 0x00, 0x00, + // 705 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0xcd, 0x6e, 0xe2, 0x48, + 0x10, 0xc6, 0xc6, 0xfc, 0x15, 0x84, 0xb0, 0xad, 0xdd, 0x95, 0xa3, 0x55, 0x58, 0xe2, 0x68, 0x77, + 0xd1, 0x4a, 0x0b, 0x5a, 0x88, 0xf6, 0xb2, 0x27, 0x7e, 0x3c, 0x0a, 0x82, 0x28, 0x52, 0x4f, 0x14, + 0x65, 0xe6, 0x82, 0x1a, 0xbb, 0x00, 0x0b, 0xb0, 0xad, 0x76, 0x63, 0x85, 0x87, 0x18, 0x69, 0x5e, + 0x62, 0xde, 0x65, 0x8e, 0x39, 0xce, 0x71, 0x94, 0xbc, 0xc8, 0xa8, 0xdb, 0x0e, 0x03, 0x87, 0xb9, + 0xd5, 0xf7, 0x55, 0xb9, 0xaa, 0x5c, 0x5f, 0x55, 0xc3, 0xe9, 0x8c, 0xb9, 0x0b, 0xe4, 0xe1, 0xec, + 0xaa, 0x15, 0xf2, 0x40, 0x04, 0xa4, 0xb4, 0x27, 0xac, 0x4f, 0x3a, 0xe8, 0xe3, 0x7b, 0x52, 0x83, + 0xec, 0x0a, 0x77, 0xa6, 0xd6, 0xd0, 0x9a, 0x15, 0x2a, 0x4d, 0xf2, 0x33, 0xe4, 0x62, 0xb6, 0xde, + 0xa2, 0xa9, 0x2b, 0x2e, 0x01, 0xe4, 0x37, 0x28, 0x6d, 0x23, 0xe4, 0xd3, 0x0d, 0x0a, 0x66, 0x66, + 0x95, 0xa7, 0x28, 0x89, 0x1b, 0x14, 0x8c, 0x98, 0x50, 0x88, 0x91, 0x47, 0x5e, 0xe0, 0x9b, 0x46, + 0x43, 0x6b, 0x1a, 0xf4, 0x15, 0x92, 0x73, 0x00, 0x7c, 0x0c, 0x3d, 0x8e, 0xd1, 0x94, 0x09, 0x33, + 0xa7, 0x9c, 0xa5, 0x94, 0xe9, 0x09, 0x42, 0xc0, 0x50, 0x09, 0xf3, 0x2a, 0xa1, 0xb2, 0x65, 0xa5, + 0x48, 0x70, 0x64, 0x9b, 0xa9, 0xe7, 0x9a, 0xd0, 0xd0, 0x9a, 0x27, 0xb4, 0x98, 0x10, 0x23, 0x97, + 0xfc, 0x0e, 0xe5, 0xd4, 0xe9, 0x06, 0x3e, 0x9a, 0xe5, 0x86, 0xd6, 0x2c, 0x52, 0x48, 0xa8, 0x61, + 0xe0, 0x23, 0xf9, 0x13, 0x8c, 0x95, 0xe7, 0xbb, 0x66, 0xa5, 0xa1, 0x35, 0xab, 0x1d, 0xd2, 0xfa, + 0x3e, 0x81, 0xf1, 0x7d, 0x6b, 0xec, 0xf9, 0x2e, 0x55, 0x7e, 0xeb, 0x2f, 0x30, 0x24, 0x22, 0x05, + 0xc8, 0x8e, 0xed, 0x77, 0xb5, 0x0c, 0xa9, 0x40, 0x71, 0xd8, 0xbb, 0xeb, 0x4d, 0x25, 0xd2, 0x48, + 0x11, 0x8c, 0x37, 0xa3, 0x89, 0x5d, 0xd3, 0xad, 0x21, 0xe4, 0xc7, 0xf7, 0x13, 0x2f, 0x12, 0xe4, + 0x1c, 0xf4, 0x55, 0x6c, 0x6a, 0x8d, 0x6c, 0xb3, 0xdc, 0x39, 0x39, 0x4a, 0x4c, 0xf5, 0x55, 0x2c, + 0xfb, 0x66, 0xeb, 0x75, 0xe0, 0x4c, 0x39, 0xce, 0x55, 0xdf, 0x06, 0x2d, 0x2a, 0x82, 0xe2, 0xdc, + 0xba, 0x86, 0x9f, 0x6e, 0x98, 0xef, 0xcd, 0x31, 0x12, 0x83, 0x25, 0xf3, 0x17, 0xf8, 0x16, 0x05, + 0xe9, 0x42, 0xc1, 0x51, 0x20, 0x4a, 0xb3, 0x9e, 0x1d, 0x64, 0x3d, 0x0e, 0xa7, 0xaf, 0x91, 0xd6, + 0x07, 0x1d, 0xaa, 0xc7, 0x3e, 0x52, 0x05, 0x7d, 0xe4, 0x2a, 0x09, 0x0d, 0xaa, 0x8f, 0x5c, 0xd2, + 0x05, 0xfd, 0x36, 0x54, 0xf2, 0x55, 0x3b, 0x97, 0x3f, 0x4c, 0xd9, 0xba, 0x0d, 0x91, 0x33, 0xe1, + 0x05, 0x3e, 0xd5, 0x6f, 0x43, 0x29, 0xfb, 0x04, 0x63, 0x5c, 0x2b, 0x71, 0x4f, 0x68, 0x02, 0xc8, + 0x2f, 0x90, 0x5f, 0xe1, 0x4e, 0x2a, 0x91, 0x08, 0x9b, 0x5b, 0xe1, 0x6e, 0xe4, 0x92, 0x3e, 0x9c, + 0xa2, 0xef, 0xf0, 0x5d, 0x28, 0x3f, 0x9f, 0xb2, 0xf5, 0x22, 0x50, 0xda, 0x56, 0x8f, 0xfe, 0xc0, + 0xde, 0x47, 0xf4, 0xd6, 0x8b, 0x80, 0x56, 0xf1, 0x08, 0x93, 0x06, 0x94, 0x9d, 0x60, 0x13, 0x72, + 0x8c, 0xd4, 0xe2, 0xe4, 0x55, 0xd9, 0x43, 0xca, 0xba, 0x84, 0xd2, 0xbe, 0x47, 0x02, 0x90, 0x1f, + 0x50, 0xbb, 0x77, 0x67, 0xd7, 0x32, 0xd2, 0x1e, 0xda, 0x13, 0xfb, 0xce, 0xae, 0x69, 0x56, 0x0c, + 0xc5, 0xc1, 0x12, 0x9d, 0x55, 0xb4, 0xdd, 0x90, 0x7f, 0xc1, 0x50, 0xbd, 0x68, 0xaa, 0x97, 0xf3, + 0x83, 0x5e, 0x5e, 0x43, 0x5a, 0xb2, 0x34, 0xf7, 0xc4, 0x72, 0x43, 0x55, 0xa8, 0xdc, 0xff, 0x68, + 0xbb, 0x51, 0xc3, 0x32, 0xa8, 0x34, 0xad, 0x3f, 0xa0, 0xb4, 0x0f, 0x4a, 0xaa, 0x0e, 0xba, 0x9d, + 0x41, 0xb2, 0x21, 0x0f, 0x0f, 0xd7, 0x2c, 0x5a, 0xfe, 0x77, 0x55, 0xd3, 0x2c, 0x07, 0x0a, 0x43, + 0x26, 0xd8, 0x18, 0x77, 0x07, 0x43, 0xd2, 0x0e, 0x87, 0x44, 0xc0, 0x70, 0x99, 0x60, 0xe9, 0x1d, + 0x29, 0x5b, 0x4a, 0xe5, 0xc5, 0xe9, 0xfd, 0xe8, 0x5e, 0x2c, 0xef, 0xc3, 0xe1, 0xc8, 0x04, 0xba, + 0xf2, 0x3e, 0xe4, 0x8c, 0xb3, 0xb4, 0x94, 0x32, 0x3d, 0x61, 0xf5, 0x21, 0x77, 0xc3, 0x84, 0xb3, + 0x24, 0xbf, 0x42, 0x3e, 0xe4, 0x38, 0xf7, 0x1e, 0xd3, 0x4b, 0x4d, 0x11, 0xb9, 0x80, 0x8a, 0xb7, + 0xf0, 0x03, 0x8e, 0xd3, 0xd9, 0x4e, 0x60, 0xa4, 0x6a, 0x95, 0x68, 0x39, 0xe1, 0xfa, 0x92, 0xfa, + 0xfb, 0x0c, 0xaa, 0xc7, 0x4a, 0xc8, 0x9d, 0x67, 0x18, 0xd5, 0x32, 0xfd, 0xff, 0x3f, 0x3f, 0xd7, + 0xb5, 0xa7, 0xe7, 0xba, 0xf6, 0xf5, 0xb9, 0xae, 0x7d, 0x7c, 0xa9, 0x67, 0x9e, 0x5e, 0xea, 0x99, + 0x2f, 0x2f, 0xf5, 0xcc, 0xfb, 0x8b, 0x85, 0x27, 0x96, 0xdb, 0x59, 0xcb, 0x09, 0x36, 0x6d, 0x77, + 0xc1, 0x59, 0xb8, 0xfc, 0xc7, 0x0b, 0xda, 0xc9, 0x3c, 0xdb, 0xf1, 0x55, 0x3b, 0x9c, 0xcd, 0xf2, + 0xea, 0x49, 0xe9, 0x7e, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x16, 0xf2, 0x9d, 0x4d, 0x65, 0x04, 0x00, + 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -630,6 +671,11 @@ func (m *KV) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Kind != 0 { + i = encodeVarintBadgerpb4(dAtA, i, uint64(m.Kind)) + i-- + dAtA[i] = 0x60 + } if m.StreamDone { i-- if m.StreamDone { @@ -980,6 +1026,9 @@ func (m *KV) Size() (n int) { if m.StreamDone { n += 2 } + if m.Kind != 0 { + n += 1 + sovBadgerpb4(uint64(m.Kind)) + } return n } @@ -1346,6 +1395,25 @@ func (m *KV) Unmarshal(dAtA []byte) error { } } m.StreamDone = bool(v != 0) + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Kind", wireType) + } + m.Kind = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBadgerpb4 + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Kind |= KV_Kind(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBadgerpb4(dAtA[iNdEx:]) diff --git a/pb/badgerpb4.proto b/pb/badgerpb4.proto index 079c1cfee..8e6702574 100644 --- a/pb/badgerpb4.proto +++ b/pb/badgerpb4.proto @@ -33,6 +33,13 @@ message KV { uint32 stream_id = 10; // Stream done is used to indicate end of stream. bool stream_done = 11; + + enum Kind { + KEY = 0; + DATA_KEY = 1; + FILE = 2; + } + Kind kind = 12; } message KVList { diff --git a/stream.go b/stream.go index 21c5e9926..4c17581b3 100644 --- a/stream.go +++ b/stream.go @@ -25,8 +25,10 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/pkg/errors" "github.com/dgraph-io/badger/v4/pb" + "github.com/dgraph-io/badger/v4/table" "github.com/dgraph-io/badger/v4/y" "github.com/dgraph-io/ristretto/z" ) @@ -83,7 +85,9 @@ type Stream struct { Send func(buf *z.Buffer) error // Read data above the sinceTs. All keys with version =< sinceTs will be ignored. - SinceTs uint64 + SinceTs uint64 + // FullCopy should be set to true only when encryption mode is same for sender and receiver. + FullCopy bool readTs uint64 db *DB rangeCh chan keyRange @@ -108,9 +112,6 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() - if item.IsDeletedOrExpired() { - break - } if !bytes.Equal(key, item.Key()) { // Break out on the first encounter with another key. break @@ -128,6 +129,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { } kv.Version = item.Version() kv.ExpiresAt = item.ExpiresAt() + // As we do full copy, we need to transmit only if it is a delete key or not. + kv.Meta = []byte{item.meta & bitDelete} kv.UserMeta = a.Copy([]byte{item.UserMeta()}) list.Kv = append(list.Kv, kv) @@ -138,6 +141,12 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { if item.DiscardEarlierVersions() { break } + if item.IsDeletedOrExpired() { + // We do a FullCopy in stream. It might happen that tables from L6 contain K(version=1), + // while the table at L4 that was not copied contains K(version=2) with delete mark. + // Hence, we need to send the deleted or expired item too. + break + } } return list, nil } @@ -164,18 +173,10 @@ func (st *Stream) produceRanges(ctx context.Context) { } // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. -func (st *Stream) produceKVs(ctx context.Context, threadId int) error { +func (st *Stream) produceKVs(ctx context.Context, itr *Iterator) error { st.numProducers.Add(1) defer st.numProducers.Add(-1) - var txn *Txn - if st.readTs > 0 { - txn = st.db.NewTransactionAt(st.readTs, false) - } else { - txn = st.db.NewTransaction(false) - } - defer txn.Discard() - // produceKVs is running iterate serially. So, we can define the outList here. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs") defer func() { @@ -185,15 +186,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { }() iterate := func(kr keyRange) error { - iterOpts := DefaultIteratorOptions - iterOpts.AllVersions = true - iterOpts.Prefix = st.Prefix - iterOpts.PrefetchValues = false - iterOpts.SinceTs = st.SinceTs - itr := txn.NewIterator(iterOpts) - itr.ThreadId = threadId - defer itr.Close() - itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate") defer itr.Alloc.Release() @@ -377,6 +369,77 @@ outer: return nil } +func (st *Stream) copyTablesOver(ctx context.Context, tableMatrix [][]*table.Table) error { + // TODO: See if making this concurrent would be helpful. Most likely it won't. + // But, if it does work, then most like <3 goroutines might be sufficient. + infof := st.db.opt.Infof + // Make a copy of the manifest so that we don't have race condition. + manifest := st.db.manifest.manifest.clone() + dataKeys := make(map[uint64]struct{}) + // Iterate in reverse order so that the receiver gets the bottommost level first. + for i := len(tableMatrix) - 1; i >= 0; i-- { + level := i + tables := tableMatrix[i] + for _, t := range tables { + // This table can be picked for copying directly. + out := z.NewBuffer(int(t.Size())+1024, "Stream.Table") + if dk := t.DataKey(); dk != nil { + y.AssertTrue(dk.KeyId != 0) + // If we have a legit data key, send it over so the table can be decrypted. The same + // data key could have been used to encrypt many tables. Avoid sending it + // repeatedly. + if _, sent := dataKeys[dk.KeyId]; !sent { + infof("Sending data key with ID: %d\n", dk.KeyId) + val, err := dk.Marshal() + y.Check(err) + + // This would go to key registry in destination. + kv := &pb.KV{ + Value: val, + Kind: pb.KV_DATA_KEY, + } + KVToBuffer(kv, out) + dataKeys[dk.KeyId] = struct{}{} + } + } + + infof("Sending table ID: %d at level: %d. Size: %s\n", + t.ID(), level, humanize.IBytes(uint64(t.Size()))) + tableManifest := manifest.Tables[t.ID()] + change := pb.ManifestChange{ + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + KeyId: tableManifest.KeyID, + // Hard coding it, since we're supporting only AES for now. + EncryptionAlgo: pb.EncryptionAlgo_aes, + Compression: uint32(tableManifest.Compression), + } + + buf, err := change.Marshal() + y.Check(err) + + // We send the table along with level to the destination, so they'd know where to + // place the tables. We'd send all the tables first, before we start streaming. So, the + // destination DB would write streamed keys one level above. + kv := &pb.KV{ + // Key can be used for MANIFEST. + Key: buf, + Value: t.Data, + Kind: pb.KV_FILE, + } + KVToBuffer(kv, out) + + select { + case st.kvChan <- out: + case <-ctx.Done(): + _ = out.Release() + return ctx.Err() + } + } + } + return nil +} + // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also @@ -384,6 +447,11 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { + if st.FullCopy { + if !st.db.opt.managedTxns || st.SinceTs != 0 || st.ChooseKey != nil && st.KeyToList != nil { + panic("Got invalid stream options when doing full copy") + } + } ctx, cancel := context.WithCancel(ctx) defer cancel() st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. @@ -397,7 +465,127 @@ func (st *Stream) Orchestrate(ctx context.Context) error { st.KeyToList = st.ToList } + // Pick up key-values from kvChan and send to stream. + kvErr := make(chan error, 1) + go func() { + // Picks up KV lists from kvChan, and sends them to Output. + err := st.streamKVs(ctx) + if err != nil { + cancel() // Stop all the go routines. + } + kvErr <- err + }() + + // Pick all relevant tables from levels. We'd use this to copy them over, + // or generate iterators from them. + memTables, decr := st.db.getMemTables() + defer decr() + + opts := DefaultIteratorOptions + opts.Prefix = st.Prefix + opts.SinceTs = st.SinceTs + tableMatrix := st.db.lc.getTables(&opts) + defer func() { + for _, tables := range tableMatrix { + for _, t := range tables { + _ = t.DecrRef() + } + } + }() + y.AssertTrue(len(tableMatrix) == st.db.opt.MaxLevels) + + infof := st.db.opt.Infof + copyTables := func() error { + // Figure out which tables we can copy. Only choose from the last 2 levels. + // Say last level has data of size 100. Given a 10x level multiplier and + // assuming the tree is balanced, second last level would have 10, and the + // third last level would have 1. The third last level would only have 1% + // of the data of the last level. It's OK for us to stop there and just + // stream it, instead of trying to copy over those tables too. When we + // copy over tables to Level i, we can't stream any data to level i, i+1, + // and so on. The stream has to create tables at level i-1, so there can be + // overlap between the tables at i-1 and i. + + // Let's pick the tables which can be fully copied over from last level. + threshold := len(tableMatrix) - 2 + toCopy := make([][]*table.Table, len(tableMatrix)) + var numCopy, numStream int + for lev, tables := range tableMatrix { + // We stream only the data in the two bottommost levels. + if lev < threshold { + numStream += len(tables) + continue + } + var rem []*table.Table + cp := tables[:0] + for _, t := range tables { + // We can only copy over those tables that satisfy following conditions: + // - All the keys have version less than st.readTs + // - st.Prefix fully covers the table + if t.MaxVersion() > st.readTs || !t.CoveredByPrefix(st.Prefix) { + rem = append(rem, t) + continue + } + cp = append(cp, t) + } + toCopy[lev] = cp // Pick tables to copy. + tableMatrix[lev] = rem // Keep remaining for streaming. + numCopy += len(cp) + numStream += len(rem) + } + infof("Num tables to copy: %d. Num to stream: %d\n", numCopy, numStream) + + return st.copyTablesOver(ctx, toCopy) + } + + if st.FullCopy { + // As of now, we don't handle the non-zero SinceTs. + if err := copyTables(); err != nil { + return errors.Wrap(err, "while copying tables") + } + } + + var txn *Txn + if st.readTs > 0 { + txn = st.db.NewTransactionAt(st.readTs, false) + } else { + txn = st.db.NewTransaction(false) + } + defer txn.Discard() + + newIterator := func(threadId int) *Iterator { + var itrs []y.Iterator + for _, mt := range memTables { + itrs = append(itrs, mt.sl.NewUniIterator(false)) + } + if tables := tableMatrix[0]; len(tables) > 0 { + itrs = append(itrs, iteratorsReversed(tables, 0)...) + } + for _, tables := range tableMatrix[1:] { + if len(tables) == 0 { + continue + } + itrs = append(itrs, table.NewConcatIterator(tables, 0)) + } + + opt := DefaultIteratorOptions + opt.AllVersions = true + opt.Prefix = st.Prefix + opt.PrefetchValues = false + opt.SinceTs = st.SinceTs + + res := &Iterator{ + txn: txn, + iitr: table.NewMergeIterator(itrs, false), + opt: opt, + readTs: txn.readTs, + ThreadId: threadId, + } + return res + } + // Picks up ranges from Badger, and sends them to rangeCh. + // Just for simplicity, we'd consider all the tables for range production. go st.produceRanges(ctx) errCh := make(chan error, st.NumGo) // Stores error by consumeKeys. @@ -408,7 +596,9 @@ func (st *Stream) Orchestrate(ctx context.Context) error { go func(threadId int) { defer wg.Done() // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan. - if err := st.produceKVs(ctx, threadId); err != nil { + itr := newIterator(threadId) + defer itr.Close() + if err := st.produceKVs(ctx, itr); err != nil { select { case errCh <- err: default: @@ -417,16 +607,6 @@ func (st *Stream) Orchestrate(ctx context.Context) error { }(i) } - // Pick up key-values from kvChan and send to stream. - kvErr := make(chan error, 1) - go func() { - // Picks up KV lists from kvChan, and sends them to Output. - err := st.streamKVs(ctx) - if err != nil { - cancel() // Stop all the go routines. - } - kvErr <- err - }() wg.Wait() // Wait for produceKVs to be over. close(st.kvChan) // Now we can close kvChan. defer func() { diff --git a/stream_writer.go b/stream_writer.go index 9dc9e8fd0..406c9dffa 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -22,6 +22,7 @@ import ( "sync" humanize "github.com/dustin/go-humanize" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" "github.com/dgraph-io/badger/v4/pb" @@ -42,13 +43,18 @@ import ( // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new // DBs. type StreamWriter struct { - writeLock sync.Mutex - db *DB - done func() - throttle *y.Throttle - maxVersion uint64 - writers map[uint32]*sortedWriter - prevLevel int + writeLock sync.Mutex + db *DB + done func() + throttle *y.Throttle + maxVersion uint64 + writers map[uint32]*sortedWriter + prevLevel int + senderPrevLevel int + keyId map[uint64]*pb.DataKey // map stores reader's keyId to data key. + // Writer might receive tables first, and then receive keys. If true, that means we have + // started processing keys. + processingKeys bool } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -62,6 +68,7 @@ func (db *DB) NewStreamWriter() *StreamWriter { // concurrent streams being processed. throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), + keyId: make(map[uint64]*pb.DataKey), } } @@ -161,8 +168,64 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if _, ok := closedStreams[kv.StreamId]; ok { panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) } + switch kv.Kind { + case pb.KV_DATA_KEY: + sw.writeLock.Lock() + defer sw.writeLock.Unlock() + y.AssertTrue(len(sw.db.opt.EncryptionKey) > 0) + var dk pb.DataKey + if err := proto.Unmarshal(kv.Value, &dk); err != nil { + return errors.Wrapf(err, "unmarshal failed %s", kv.Value) + } + readerId := dk.KeyId + if _, ok := sw.keyId[readerId]; !ok { + // Insert the data key to the key registry if not already inserted. + id, err := sw.db.registry.AddKey(dk) + if err != nil { + return errors.Wrap(err, "failed to write data key") + } + dk.KeyId = id + sw.keyId[readerId] = &dk + } + return nil + case pb.KV_FILE: + sw.writeLock.Lock() + defer sw.writeLock.Unlock() + // All tables should be recieved before any of the keys. + if sw.processingKeys { + return errors.New("Received pb.KV_FILE after pb.KV_KEY") + } + var change pb.ManifestChange + if err := proto.Unmarshal(kv.Key, &change); err != nil { + return errors.Wrap(err, "unable to unmarshal manifest change") + } + level := int(change.Level) + if sw.senderPrevLevel == 0 { + // We received the first file, set the sender's and receiver's max levels. + sw.senderPrevLevel = level + sw.prevLevel = len(sw.db.lc.levels) - 1 + } + + // This is based on the assumption that the tables from the last + // level will be sent first and then the second last level tables. + // As long as the kv.Version (which stores the level) is same as + // the prevLevel, we know we're processing a last level table. + // The last level for this DB can be 8 while the DB that's sending + // this could have the last level at 7. + if sw.senderPrevLevel != level { + // If the previous level and the current level is different, we + // must be processing a table from the next last level. + sw.senderPrevLevel = level + sw.prevLevel-- + } + dk := sw.keyId[change.KeyId] + return sw.db.lc.AddTable(&kv, sw.prevLevel, dk, &change) + case pb.KV_KEY: + // Pass. The following code will handle the keys. + } sw.writeLock.Lock() + sw.processingKeys = true if sw.maxVersion < kv.Version { sw.maxVersion = kv.Version } @@ -181,6 +244,7 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if len(kv.UserMeta) > 0 { userMeta = kv.UserMeta[0] } + e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), Value: y.Copy(kv.Value), diff --git a/table/table.go b/table/table.go index 0bbc91089..06c33347a 100644 --- a/table/table.go +++ b/table/table.go @@ -144,10 +144,13 @@ func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSi // KeyCount is the total number of keys in this table. func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount } -// OnDiskSize returns the total size of key-values stored in this table (including the -// disk space occupied on the value log). +// OnDiskSize returns the total size of key-values stored in this table +// (including the disk space occupied on the value log). func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize } +// DataKey returns the encryption key +func (t *Table) DataKey() *pb.DataKey { return t.opt.DataKey } + // CompressionType returns the compression algorithm used for block compression. func (t *Table) CompressionType() options.CompressionType { return t.opt.Compression @@ -256,7 +259,21 @@ func (b *block) verifyCheckSum() error { func CreateTable(fname string, builder *Builder) (*Table, error) { bd := builder.Done() - mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size) + mf, err := newFile(fname, bd.Size) + if err != nil { + return nil, err + } + + written := bd.Copy(mf.Data) + y.AssertTrue(written == len(mf.Data)) + if err := z.Msync(mf.Data); err != nil { + return nil, y.Wrapf(err, "while calling msync on %s", fname) + } + return OpenTable(mf, *builder.opts) +} + +func newFile(fname string, sz int) (*z.MmapFile, error) { + mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, sz) if err == z.NewFile { // Expected. } else if err != nil { @@ -264,13 +281,22 @@ func CreateTable(fname string, builder *Builder) (*Table, error) { } else { return nil, errors.Errorf("file already exists: %s", fname) } + return mf, nil +} - written := bd.Copy(mf.Data) +func CreateTableFromBuffer(fname string, buf []byte, opts Options) (*Table, error) { + mf, err := newFile(fname, len(buf)) + if err != nil { + return nil, err + } + + // We cannot use the buf directly here because it is not mmapped. + written := copy(mf.Data, buf) y.AssertTrue(written == len(mf.Data)) if err := z.Msync(mf.Data); err != nil { return nil, y.Wrapf(err, "while calling msync on %s", fname) } - return OpenTable(mf, *builder.opts) + return OpenTable(mf, opts) } // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function @@ -693,6 +719,12 @@ func (t *Table) DoesNotHave(hash uint32) bool { return !mayContain } +// CoveredByPrefix returns true if all the keys in the table are prefixed by the given prefix. +func (t *Table) CoveredByPrefix(prefix []byte) bool { + return bytes.HasPrefix(y.ParseKey(t.Biggest()), prefix) && + bytes.HasPrefix(y.ParseKey(t.Smallest()), prefix) +} + // readTableIndex reads table index from the sst and returns its pb format. func (t *Table) readTableIndex() (*fb.TableIndex, error) { data := t.readNoFail(t.indexStart, t.indexLen) diff --git a/test.sh b/test.sh index 53b5477aa..e6ade0ed9 100755 --- a/test.sh +++ b/test.sh @@ -99,7 +99,6 @@ write_coverage() { cat cover_tmp.out >> cover.out && rm cover_tmp.out fi fi - } # parallel tests currently not working