Skip to content

Commit

Permalink
ptree: index copying (#23)
Browse files Browse the repository at this point in the history
Adds index copying to `ptree.Copy` function.  Copying indexes makes `Copy` `O(log(n))` where `n` is the number of nodes in the tree.
  • Loading branch information
brendoncarroll authored Dec 28, 2021
1 parent 2d2ed69 commit ee2974c
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 153 deletions.
11 changes: 11 additions & 0 deletions pkg/gotkv/kvstreams/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func ForEach(ctx context.Context, it Iterator, fn func(ent Entry) error) error {
return nil
}

func Collect(ctx context.Context, it Iterator) ([]Entry, error) {
var ents []Entry
if err := ForEach(ctx, it, func(ent Entry) error {
ents = append(ents, ent.Clone())
return nil
}); err != nil {
return nil, err
}
return ents, nil
}

// A span of keys [Start, End)
// If you want to include a specific end key, use the KeyAfter function.
// nil is interpretted as no bound, not as a 0 length key. This behaviour is only releveant for End.
Expand Down
44 changes: 25 additions & 19 deletions pkg/gotkv/ptree/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,43 @@ func NewBuilder(s cadata.Store, op *gdat.Operator, avgSize, maxSize int, seed *[

func (b *Builder) makeWriter(i int) *StreamWriter {
return NewStreamWriter(b.s, b.op, b.avgSize, b.maxSize, b.seed, func(idx Index) error {
switch {
case b.isDone && i == len(b.levels)-1:
if b.isDone && i == len(b.levels)-1 {
b.root = &Root{
Ref: idx.Ref,
First: append([]byte{}, idx.First...),
Depth: uint8(i),
}
return nil
case i == len(b.levels)-1:
b.levels = append(b.levels, b.makeWriter(i+1))
fallthrough
default:
return b.levels[i+1].Append(b.ctx, Entry{
Key: idx.First,
Value: gdat.MarshalRef(idx.Ref),
})
}
return b.getWriter(i+1).Append(b.ctx, Entry{
Key: idx.First,
Value: gdat.MarshalRef(idx.Ref),
})
})
}

func (b *Builder) getWriter(level int) *StreamWriter {
for len(b.levels) <= level {
i := len(b.levels)
b.levels = append(b.levels, b.makeWriter(i))
}
return b.levels[level]
}

func (b *Builder) Put(ctx context.Context, key, value []byte) error {
return b.put(ctx, 0, key, value)
}

func (b *Builder) put(ctx context.Context, level int, key, value []byte) error {
b.ctx = ctx
defer func() { b.ctx = nil }()

if b.isDone {
return errors.Errorf("builder is closed")
}
err := b.levels[0].Append(ctx, Entry{
if b.syncLevel() < level {
return errors.Errorf("cannot put at level %d", level)
}
err := b.getWriter(level).Append(ctx, Entry{
Key: key,
Value: value,
})
Expand Down Expand Up @@ -98,14 +107,11 @@ func (b *Builder) Finish(ctx context.Context) (*Root, error) {
return b.root, nil
}

func (b *Builder) SyncedBelow(depth int) bool {
if len(b.levels) <= depth {
return false
}
for i := range b.levels[:depth] {
func (b *Builder) syncLevel() int {
for i := range b.levels {
if b.levels[i].Buffered() > 0 {
return false
return i
}
}
return true
return maxTreeDepth - 1 // allow copying at any depth
}
217 changes: 125 additions & 92 deletions pkg/gotkv/ptree/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,145 +15,178 @@ type Iterator struct {
op *gdat.Operator
root Root
span Span
srs []*StreamReader
pos []byte

levels [][]Entry
pos []byte
}

func NewIterator(s cadata.Store, op *gdat.Operator, root Root, span Span) *Iterator {
it := &Iterator{
s: s,
op: op,
root: root,
span: span.Clone(),
srs: make([]*StreamReader, root.Depth+1),
s: s,
op: op,
root: root,
span: span.Clone(),
levels: make([][]Entry, root.Depth+2),
}
it.levels[root.Depth+1] = []Entry{indexToEntry(rootToIndex(root))}
it.setPos(span.Start)
return it
}

func (it *Iterator) Next(ctx context.Context, ent *Entry) error {
if err := it.initRoot(ctx); err != nil {
return err
}
if err := it.withReader(ctx, 0, func(sr *StreamReader) error {
return sr.Next(ctx, ent)
}); err != nil {
return err
}
it.setPosAfter(ent.Key)
return it.checkAfterSpan(ent)
return it.next(ctx, 0, ent)
}

func (it *Iterator) Peek(ctx context.Context, ent *Entry) error {
if err := it.initRoot(ctx); err != nil {
return err
return it.peek(ctx, 0, ent)
}

func (it *Iterator) Seek(ctx context.Context, gteq []byte) error {
it.levels[it.root.Depth+1] = []Entry{indexToEntry(rootToIndex(it.root))}
it.setPos(gteq)
for i := len(it.levels) - 1; i >= 0; i-- {
if i == 0 {
it.levels[i] = filterEntries(it.levels[i], it.getSpan())
} else {
it.levels[i] = filterIndexes(it.levels[i], it.getSpan())
}
}
return nil
}

func (it *Iterator) next(ctx context.Context, level int, ent *Entry) error {
if it.syncLevel() < level {
return errors.Errorf("cannot read from level %d, only synced to %d", level, it.syncLevel())
}
if err := it.withReader(ctx, 0, func(sr *StreamReader) error {
return sr.Peek(ctx, ent)
}); err != nil {
entries, err := it.getEntries(ctx, level)
if err != nil {
return err
}
return it.checkAfterSpan(ent)
ent2 := entries[0]
ent.Key = append(ent.Key[:0], ent2.Key...)
ent.Value = append(ent.Value[:0], ent2.Value...)
it.advanceLevel(level, true)
return nil
}

func (it *Iterator) Seek(ctx context.Context, gteq []byte) error {
it.setPos(gteq)
for i := range it.srs {
it.srs[i] = nil
func (it *Iterator) peek(ctx context.Context, level int, ent *Entry) error {
if it.syncLevel() < level {
return errors.Errorf("cannot read from level %d, only synced to %d", level, it.syncLevel())
}
return it.initRoot(ctx)
entries, err := it.getEntries(ctx, level)
if err != nil {
return err
}
ent2 := entries[0]
ent.Key = append(ent.Key[:0], ent2.Key...)
ent.Value = append(ent.Value[:0], ent2.Value...)
return nil
}

func (it *Iterator) withReader(ctx context.Context, i int, fn func(sr *StreamReader) error) error {
func (it *Iterator) getEntries(ctx context.Context, level int) ([]Entry, error) {
if level >= len(it.levels) {
return nil, kvstreams.EOS
}
if len(it.levels[level]) > 0 {
return it.levels[level], nil
}
for {
sr, err := it.getReader(ctx, i)
above, err := it.getEntries(ctx, level+1)
if err != nil {
return err
return nil, err
}
if err := fn(sr); err != nil {
if err == kvstreams.EOS {
it.srs[i] = nil
continue
}
return err
idx, err := entryToIndex(above[0])
if err != nil {
return nil, errors.Wrapf(err, "converting entry to index at level %d", level)
}
it.advanceLevel(level+1, false)
ents, err := ListEntries(ctx, it.s, it.op, idx)
if err != nil {
return nil, err
}
if level == 0 {
ents = filterEntries(ents, it.getSpan())
} else {
return nil
ents = filterIndexes(ents, it.getSpan())
}
if len(ents) > 0 {
it.levels[level] = ents
return it.levels[level], nil
}
}
}

func (it *Iterator) getReader(ctx context.Context, i int) (*StreamReader, error) {
if i >= len(it.srs) {
return nil, kvstreams.EOS
}
if it.srs[i] != nil {
return it.srs[i], nil
}
if err := it.withReader(ctx, i+1, func(srAbove *StreamReader) error {
idxs, err := readIndexes(ctx, srAbove)
if err != nil {
return err
func (it *Iterator) syncLevel() int {
// bot is the index below which all levels are synced
var bot int
for i := range it.levels {
bot = i
if len(it.levels[i]) > 0 {
break
}
it.srs[i+1] = nil
it.srs[i] = NewStreamReader(it.s, it.op, idxs)
if i == 0 {
return it.srs[i].Seek(ctx, it.pos)
} else {
return it.srs[i].SeekIndexes(ctx, it.pos)
}
// top is maximum index where the level has more than 1 entry
// top is required because indexes at the right most side of the tree cannot be copied
// since they could point to incomplete nodes.
// the iterator's span causes us to consider some otherwise complete nodes incomplete.
var top int
for i := len(it.levels) - 1; i >= 0; i-- {
top = i
if len(it.levels[i]) > 1 && (it.span.End == nil || bytes.Compare(it.levels[i][1].Key, it.span.End) < 0) {
break
}
}); err != nil {
return nil, err
}
return it.srs[i], nil
return min(bot, top)
}

func (it *Iterator) checkAfterSpan(ent *Entry) error {
if it.span.LessThan(ent.Key) {
return kvstreams.EOS
func (it *Iterator) advanceLevel(level int, updatePos bool) {
entries := it.levels[level]
it.levels[level] = entries[1:]
if !updatePos {
return
}
return nil
for i := level; i < len(it.levels); i++ {
entries := it.levels[i]
if len(entries) > 0 {
it.setPos(entries[0].Key)
return
}
}
it.pos = nil // end of the stream
}

func (it *Iterator) setPos(x []byte) {
it.pos = append(it.pos[:0], x...)
}

func (it *Iterator) setPosAfter(x []byte) {
it.setPos(x)
it.pos = append(it.pos, 0x00)
func (it *Iterator) getSpan() Span {
return Span{
Start: it.pos,
End: it.span.End,
}
}

func (it *Iterator) initRoot(ctx context.Context) error {
i := len(it.srs) - 1
if it.srs[i] != nil {
return nil
}
it.srs[i] = NewStreamReader(it.s, it.op, []Index{rootToIndex(it.root)})
if i == 0 {
return it.srs[i].Seek(ctx, it.pos)
} else {
return it.srs[i].SeekIndexes(ctx, it.pos)
func filterEntries(xs []Entry, span Span) []Entry {
ret := xs[:0]
for i := range xs {
if span.Contains(xs[i].Key) {
ret = append(ret, xs[i])
}
}
return ret
}

func readIndexes(ctx context.Context, it kvstreams.Iterator) ([]Index, error) {
var idxs []Index
if err := kvstreams.ForEach(ctx, it, func(ent Entry) error {
idx, err := entryToIndex(ent)
if err != nil {
return err
// filterIndexes removes indexes that could not point to items in span.
func filterIndexes(xs []Entry, span Span) []Entry {
ret := xs[:0]
for i := range xs {
if span.LessThan(xs[i].Key) {
continue
}
if len(idxs) > 0 {
prev := idxs[len(idxs)-1].First
next := idx.First
if bytes.Compare(prev, next) >= 0 {
return errors.Errorf("ptree: indexes out of order %q >= %q", prev, next)
}
if i+1 < len(xs) && bytes.Compare(span.Start, xs[i+1].Key) >= 0 {
continue
}
idxs = append(idxs, idx)
return nil
}); err != nil {
return nil, err
ret = append(ret, xs[i])
}
return idxs, nil
return ret
}
Loading

0 comments on commit ee2974c

Please sign in to comment.