From 9b5ee6ebdd9a004e12f167ed37b89035897e06dc Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Mon, 17 Jul 2023 16:23:49 +0800 Subject: [PATCH 1/4] Add stacktrace rewriter --- pkg/phlaredb/compact.go | 423 ++++++++++++++++++++++------ pkg/phlaredb/schemas/v1/profiles.go | 18 +- 2 files changed, 350 insertions(+), 91 deletions(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 7d1f57370..077d1a184 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -27,7 +27,37 @@ type BlockReader interface { Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - // todo symbdb + Symbols() SymbolsResolver +} + +// TODO(kolesnikovae): Refactor to symdb. + +// ProfileSymbols represents symbolic information associated with a profile. +type ProfileSymbols struct { + StacktracePartition uint64 + StacktraceIDs []uint32 + + Stacktraces []*schemav1.Stacktrace + Locations []*schemav1.InMemoryLocation + Mappings []*schemav1.InMemoryMapping + Functions []*schemav1.InMemoryFunction + Strings []string +} + +type SymbolsResolver interface { + Stacktraces(iter.Iterator[uint32]) iter.Iterator[*schemav1.Stacktrace] + Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] + Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] + Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] + Strings(iter.Iterator[uint32]) iter.Iterator[string] +} + +type SymbolsAppender interface { + AppendStacktrace(*schemav1.Stacktrace) uint32 + AppendLocation(*schemav1.InMemoryLocation) uint32 + AppendMapping(*schemav1.InMemoryMapping) uint32 + AppendFunction(*schemav1.InMemoryFunction) uint32 + AppendString(string) uint32 } func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { @@ -50,16 +80,18 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er return block.Meta{}, err } profileWriter := newProfileWriter(profileFile) - - // todo new symbdb + symw, err := newSymbolsWriter(dst) + if err != nil { + return block.Meta{}, err + } rowsIt, err := newMergeRowProfileIterator(src) if err != nil { return block.Meta{}, err } seriesRewriter := newSeriesRewriter(rowsIt, indexw) - symbolsRewriter := newSymbolsRewriter(seriesRewriter) - reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symbolsRewriter)) + symRewriter := newSymbolsRewriter(seriesRewriter, src, symw) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symRewriter)) total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) if err != nil { @@ -77,7 +109,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er // todo: block meta files. meta.Stats.NumProfiles = total meta.Stats.NumSeries = seriesRewriter.NumSeries() - meta.Stats.NumSamples = symbolsRewriter.NumSamples() + meta.Stats.NumSamples = symRewriter.NumSamples() if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { return block.Meta{}, err @@ -138,10 +170,13 @@ type profileRow struct { labels phlaremodel.Labels fp model.Fingerprint row schemav1.ProfileRow + + blockReader BlockReader } type profileRowIterator struct { profiles iter.Iterator[parquet.Row] + blockReader BlockReader index IndexReader allPostings index.Postings err error @@ -150,15 +185,16 @@ type profileRowIterator struct { chunks []index.ChunkMeta } -func newProfileRowIterator(reader parquet.RowReader, idx IndexReader) (*profileRowIterator, error) { +func newProfileRowIterator(reader parquet.RowReader, s BlockReader) (*profileRowIterator, error) { k, v := index.AllPostingsKey() - allPostings, err := idx.Postings(k, nil, v) + allPostings, err := s.Index().Postings(k, nil, v) if err != nil { return nil, err } return &profileRowIterator{ profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 1024), - index: idx, + blockReader: s, + index: s.Index(), allPostings: allPostings, currentRow: profileRow{ seriesRef: math.MaxUint32, @@ -175,6 +211,7 @@ func (p *profileRowIterator) Next() bool { if !p.profiles.Next() { return false } + p.currentRow.blockReader = p.blockReader p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) seriesIndex := p.currentRow.row.SeriesIndex() p.currentRow.timeNanos = p.currentRow.row.TimeNanos() @@ -217,10 +254,7 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e for i, s := range src { // todo: may be we could merge rowgroups in parallel but that requires locking. reader := parquet.MultiRowGroup(s.Profiles()...).Rows() - it, err := newProfileRowIterator( - reader, - s.Index(), - ) + it, err := newProfileRowIterator(reader, s) if err != nil { return nil, err } @@ -252,80 +286,6 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e }, nil } -type noopStacktraceRewriter struct{} - -func (noopStacktraceRewriter) RewriteStacktraces(src, dst []uint32) error { - copy(dst, src) - return nil -} - -type StacktraceRewriter interface { - RewriteStacktraces(src, dst []uint32) error -} - -type symbolsRewriter struct { - iter.Iterator[profileRow] - err error - - rewriter StacktraceRewriter - src, dst []uint32 - numSamples uint64 -} - -// todo remap symbols & ingest symbols -func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { - return &symbolsRewriter{ - Iterator: it, - rewriter: noopStacktraceRewriter{}, - } -} - -func (s *symbolsRewriter) NumSamples() uint64 { - return s.numSamples -} - -func (s *symbolsRewriter) Next() bool { - if !s.Iterator.Next() { - return false - } - var err error - s.Iterator.At().row.ForStacktraceIDsValues(func(values []parquet.Value) { - s.numSamples += uint64(len(values)) - s.loadStacktracesID(values) - err = s.rewriter.RewriteStacktraces(s.src, s.dst) - if err != nil { - return - } - for i, v := range values { - values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) - } - }) - if err != nil { - s.err = err - return false - } - return true -} - -func (s *symbolsRewriter) Err() error { - if s.err != nil { - return s.err - } - return s.Iterator.Err() -} - -func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { - if cap(s.src) < len(values) { - s.src = make([]uint32, len(values)*2) - s.dst = make([]uint32, len(values)*2) - } - s.src = s.src[:len(values)] - s.dst = s.dst[:len(values)] - for i := range values { - s.src[i] = values[i].Uint32() - } -} - type seriesRewriter struct { iter.Iterator[profileRow] @@ -443,3 +403,292 @@ func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) return indexw, nil } + +type symbolsRewriter struct { + profiles iter.Iterator[profileRow] + stacktraces, dst []uint32 + err error + + rewriters map[BlockReader]*stacktraceRewriter + + numSamples uint64 +} + +func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, a SymbolsAppender) *symbolsRewriter { + sr := symbolsRewriter{ + profiles: it, + rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), + } + for _, b := range blocks { + sr.rewriters[b] = newStacktraceRewriter() + } + return &sr +} + +func (s *symbolsRewriter) NumSamples() uint64 { return s.numSamples } + +func (s *symbolsRewriter) At() profileRow { return s.profiles.At() } + +func (s *symbolsRewriter) Close() error { return s.profiles.Close() } + +func (s *symbolsRewriter) Err() error { + if s.err != nil { + return s.err + } + return s.profiles.Err() +} + +func (s *symbolsRewriter) Next() bool { + if !s.profiles.Next() { + return false + } + var err error + profile := s.profiles.At() + profile.row.ForStacktraceIDsValues(func(values []parquet.Value) { + s.loadStacktracesID(values) + r := s.rewriters[profile.blockReader] + if err = r.rewriteStacktraces(profile.row.StacktracePartitionID(), s.stacktraces); err != nil { + return + } + s.numSamples += uint64(len(values)) + for i, v := range values { + values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + } + }) + if err != nil { + s.err = err + return false + } + return true +} + +func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { + if cap(s.stacktraces) < len(values) { + s.stacktraces = make([]uint32, len(values)*2) + s.dst = make([]uint32, len(values)*2) + } + s.stacktraces = s.stacktraces[:len(values)] + s.dst = s.dst[:len(values)] + for i := range values { + s.stacktraces[i] = values[i].Uint32() + } +} + +type stacktraceRewriter struct { + partition uint64 + stacktraces map[uint64]*lookupTable[*schemav1.Stacktrace] + + locations *lookupTable[*schemav1.InMemoryLocation] + mappings *lookupTable[*schemav1.InMemoryMapping] + functions *lookupTable[*schemav1.InMemoryFunction] + strings *lookupTable[string] +} + +func newStacktraceRewriter() *stacktraceRewriter { + // TODO(kolesnikovae): + return new(stacktraceRewriter) +} + +const ( + marker = 1 << 31 + markedMask = math.MaxUint32 >> 1 +) + +type lookupTable[T any] struct { + // Index is source ID, and the value is the destination ID. + // If destination ID is not known, the element is index to 'unresolved' (marked). + resolved []uint32 + // Source IDs. + unresolved []uint32 + values []T +} + +func newLookupTable[T any](size int) *lookupTable[T] { + var t lookupTable[T] + t.init(size) + return &t +} + +func (t *lookupTable[T]) init(size int) { + if cap(t.resolved) < size { + t.resolved = make([]uint32, size) + return + } + t.resolved = t.resolved[:size] + for i := range t.resolved { + t.resolved[i] = 0 + } +} + +func (t *lookupTable[T]) reset() { t.unresolved = t.unresolved[:0] } + +func (t *lookupTable[T]) tryLookup(x uint32) uint32 { + if v := t.resolved[x]; v != 0 { + return v - 1 + } + v := uint32(len(t.unresolved)) | marker + t.unresolved = append(t.unresolved, x) + return v +} + +func (t *lookupTable[T]) storeResolved(i, v uint32) { t.resolved[i] = v + 1 } + +func (t *lookupTable[T]) lookupUnresolved(x uint32) uint32 { + if x&marker == 0 { + // Already resolved. + return x + } + return t.unresolved[x&markedMask] +} + +func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + t.values = make([]T, len(t.resolved)) + return &lookupTableIterator[T]{ + values: t.values, + } +} + +// TODO(kolesnikovae): +type lookupTableIterator[T any] struct { + cur uint32 + values []T +} + +func (t *lookupTableIterator[T]) set(v T) { t.values[t.cur] = v } + +func (r *stacktraceRewriter) symbolsResolver() SymbolsResolver { + // TODO(kolesnikovae): + return nil +} + +func (r *stacktraceRewriter) symbolsAppender() SymbolsAppender { + // TODO(kolesnikovae): + return nil +} + +func (r *stacktraceRewriter) reset(partition uint64) { + r.partition = partition + r.stacktraces[partition].reset() + r.locations.reset() + r.mappings.reset() + r.functions.reset() + r.strings.reset() +} + +func (r *stacktraceRewriter) hasUnresolved() bool { + return len(r.stacktraces[r.partition].unresolved)+ + len(r.locations.unresolved)+ + len(r.mappings.unresolved)+ + len(r.functions.unresolved)+ + len(r.strings.unresolved) > 0 +} + +func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) error { + r.reset(partition) + r.populateUnresolved(stacktraces) + if r.hasUnresolved() { + r.append(stacktraces) + } + return nil +} + +func (r *stacktraceRewriter) populateUnresolved(stacktraces []uint32) { + // Filter out all stack traces that have been already resolved. + src := r.stacktraces[r.partition] + for i, v := range stacktraces { + stacktraces[i] = src.tryLookup(v) + } + if len(src.unresolved) == 0 { + return + } + + // Resolve locations for new stack traces. + var stacktrace *schemav1.Stacktrace + unresolvedStacktraces := src.iter() + p := r.symbolsResolver() + for i := p.Stacktraces(unresolvedStacktraces); i.Next(); stacktrace = i.At() { + for i, loc := range stacktrace.LocationIDs { + stacktrace.LocationIDs[i] = uint64(r.locations.tryLookup(uint32(loc))) + } + unresolvedStacktraces.set(stacktrace) + } + + // Resolve functions and mappings for new locations. + var location *schemav1.InMemoryLocation + unresolvedLocs := r.locations.iter() + for i := p.Locations(unresolvedLocs); i.Next(); location = i.At() { + location.MappingId = r.mappings.tryLookup(location.MappingId) + for j, line := range location.Line { + location.Line[j].FunctionId = r.functions.tryLookup(line.FunctionId) + } + unresolvedLocs.set(location) + } + + // Resolve strings. + var mapping *schemav1.InMemoryMapping + unresolvedMappings := r.mappings.iter() + for i := p.Mappings(unresolvedMappings); i.Next(); mapping = i.At() { + mapping.BuildId = r.strings.tryLookup(mapping.BuildId) + mapping.Filename = r.strings.tryLookup(mapping.Filename) + unresolvedMappings.set(mapping) + } + var function *schemav1.InMemoryFunction + unresolvedFunctions := r.functions.iter() + for i := p.Functions(unresolvedFunctions); i.Next(); function = i.At() { + function.Name = r.strings.tryLookup(function.Name) + function.Filename = r.strings.tryLookup(function.Filename) + function.SystemName = r.strings.tryLookup(function.SystemName) + unresolvedFunctions.set(function) + } + var str string + unresolvedStrings := r.strings.iter() + for i := p.Strings(unresolvedStrings); i.Next(); str = i.At() { + unresolvedStrings.set(str) + } +} + +func (r *stacktraceRewriter) append(stacktraces []uint32) { + a := r.symbolsAppender() + for _, str := range r.strings.values { + r.functions.storeResolved(0, a.AppendString(str)) + } + + for _, function := range r.functions.values { + function.Name = r.strings.lookupUnresolved(function.Name) + function.Filename = r.strings.lookupUnresolved(function.Filename) + function.SystemName = r.strings.lookupUnresolved(function.SystemName) + r.functions.storeResolved(0, a.AppendFunction(function)) + } + + for _, mapping := range r.mappings.values { + mapping.BuildId = r.strings.lookupUnresolved(mapping.BuildId) + mapping.Filename = r.strings.lookupUnresolved(mapping.Filename) + r.mappings.storeResolved(0, a.AppendMapping(mapping)) + } + + for _, location := range r.locations.values { + location.MappingId = r.mappings.lookupUnresolved(location.MappingId) + for j, line := range location.Line { + location.Line[j].FunctionId = r.functions.lookupUnresolved(line.FunctionId) + } + r.locations.storeResolved(0, a.AppendLocation(location)) + } + + src := r.stacktraces[r.partition] + for _, stacktrace := range src.values { + for j, v := range stacktrace.LocationIDs { + stacktrace.LocationIDs[j] = uint64(r.locations.lookupUnresolved(uint32(v))) + } + src.storeResolved(0, a.AppendStacktrace(stacktrace)) + } + for i, v := range stacktraces { + stacktraces[i] = src.lookupUnresolved(v) + } +} + +type symbolsWriter struct { + // TODO(kolesnikovae): + SymbolsAppender +} + +func newSymbolsWriter(dst string) (*symbolsWriter, error) { return &symbolsWriter{}, nil } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index c3806ced9..a39381d58 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -42,10 +42,11 @@ var ( phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))), }) - maxProfileRow parquet.Row - seriesIndexColIndex int - stacktraceIDColIndex int - timeNanoColIndex int + maxProfileRow parquet.Row + seriesIndexColIndex int + stacktraceIDColIndex int + timeNanoColIndex int + stacktracePartitionColIndex int ) func init() { @@ -68,6 +69,11 @@ func init() { panic(fmt.Errorf("StacktraceID column not found")) } stacktraceIDColIndex = stacktraceIDCol.ColumnIndex + stacktracePartitionCol, ok := profilesSchema.Lookup("StacktracePartition") + if !ok { + panic(fmt.Errorf("StacktracePartition column not found")) + } + stacktracePartitionColIndex = stacktracePartitionCol.ColumnIndex } type Sample struct { @@ -471,6 +477,10 @@ func (p ProfileRow) SeriesIndex() uint32 { return p[seriesIndexColIndex].Uint32() } +func (p ProfileRow) StacktracePartitionID() uint64 { + return p[stacktracePartitionColIndex].Uint64() +} + func (p ProfileRow) TimeNanos() int64 { var ts int64 for i := len(p) - 1; i >= 0; i-- { From fa98adeb232f25d8cd0f620eb247b452d33ca0df Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Mon, 17 Jul 2023 20:28:55 +0800 Subject: [PATCH 2/4] Fixes --- pkg/phlaredb/compact.go | 408 ++++++++++++++++++++++++++-------------- 1 file changed, 262 insertions(+), 146 deletions(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 077d1a184..8d2513005 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -5,6 +5,7 @@ import ( "math" "os" "path/filepath" + "sort" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -27,21 +28,13 @@ type BlockReader interface { Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - Symbols() SymbolsResolver + SymbolsReader } // TODO(kolesnikovae): Refactor to symdb. -// ProfileSymbols represents symbolic information associated with a profile. -type ProfileSymbols struct { - StacktracePartition uint64 - StacktraceIDs []uint32 - - Stacktraces []*schemav1.Stacktrace - Locations []*schemav1.InMemoryLocation - Mappings []*schemav1.InMemoryMapping - Functions []*schemav1.InMemoryFunction - Strings []string +type SymbolsReader interface { + SymbolsResolver(partition uint64) (SymbolsResolver, error) } type SymbolsResolver interface { @@ -50,6 +43,19 @@ type SymbolsResolver interface { Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] Strings(iter.Iterator[uint32]) iter.Iterator[string] + WriteStats(*SymbolStats) +} + +type SymbolStats struct { + StacktracesTotal int + LocationsTotal int + MappingsTotal int + FunctionsTotal int + StringsTotal int +} + +type SymbolsWriter interface { + SymbolsAppender(partition uint64) (SymbolsAppender, error) } type SymbolsAppender interface { @@ -58,6 +64,7 @@ type SymbolsAppender interface { AppendMapping(*schemav1.InMemoryMapping) uint32 AppendFunction(*schemav1.InMemoryFunction) uint32 AppendString(string) uint32 + Flush() error } func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { @@ -405,22 +412,21 @@ func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) } type symbolsRewriter struct { - profiles iter.Iterator[profileRow] - stacktraces, dst []uint32 - err error - - rewriters map[BlockReader]*stacktraceRewriter + profiles iter.Iterator[profileRow] + rewriters map[BlockReader]*stacktraceRewriter + stacktraces []uint32 + err error numSamples uint64 } -func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, a SymbolsAppender) *symbolsRewriter { +func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w SymbolsWriter) *symbolsRewriter { sr := symbolsRewriter{ profiles: it, rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), } - for _, b := range blocks { - sr.rewriters[b] = newStacktraceRewriter() + for _, r := range blocks { + sr.rewriters[r] = newStacktraceRewriter(r, w) } return &sr } @@ -452,7 +458,7 @@ func (s *symbolsRewriter) Next() bool { } s.numSamples += uint64(len(values)) for i, v := range values { - values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) } }) if err != nil { @@ -465,114 +471,71 @@ func (s *symbolsRewriter) Next() bool { func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { if cap(s.stacktraces) < len(values) { s.stacktraces = make([]uint32, len(values)*2) - s.dst = make([]uint32, len(values)*2) } s.stacktraces = s.stacktraces[:len(values)] - s.dst = s.dst[:len(values)] for i := range values { s.stacktraces[i] = values[i].Uint32() } } type stacktraceRewriter struct { - partition uint64 - stacktraces map[uint64]*lookupTable[*schemav1.Stacktrace] + reader SymbolsReader + writer SymbolsWriter + // Stack trace identifiers are only valid within the partition. + stacktraces map[uint64]*lookupTable[*schemav1.Stacktrace] + // Objects below have global addressing. locations *lookupTable[*schemav1.InMemoryLocation] mappings *lookupTable[*schemav1.InMemoryMapping] functions *lookupTable[*schemav1.InMemoryFunction] strings *lookupTable[string] -} - -func newStacktraceRewriter() *stacktraceRewriter { - // TODO(kolesnikovae): - return new(stacktraceRewriter) -} - -const ( - marker = 1 << 31 - markedMask = math.MaxUint32 >> 1 -) -type lookupTable[T any] struct { - // Index is source ID, and the value is the destination ID. - // If destination ID is not known, the element is index to 'unresolved' (marked). - resolved []uint32 - // Source IDs. - unresolved []uint32 - values []T + partition uint64 + resolver SymbolsResolver + appender SymbolsAppender + stats SymbolStats } -func newLookupTable[T any](size int) *lookupTable[T] { - var t lookupTable[T] - t.init(size) - return &t +func newStacktraceRewriter(r SymbolsReader, w SymbolsWriter) *stacktraceRewriter { + return &stacktraceRewriter{ + reader: r, + writer: w, + } } -func (t *lookupTable[T]) init(size int) { - if cap(t.resolved) < size { - t.resolved = make([]uint32, size) - return +func (r *stacktraceRewriter) init(partition uint64) (err error) { + r.partition = partition + if r.appender, err = r.writer.SymbolsAppender(partition); err != nil { + return err } - t.resolved = t.resolved[:size] - for i := range t.resolved { - t.resolved[i] = 0 + if r.resolver, err = r.reader.SymbolsResolver(partition); err != nil { + return err } -} - -func (t *lookupTable[T]) reset() { t.unresolved = t.unresolved[:0] } + r.resolver.WriteStats(&r.stats) -func (t *lookupTable[T]) tryLookup(x uint32) uint32 { - if v := t.resolved[x]; v != 0 { - return v - 1 + // Only stacktraces are yet partitioned. + if r.stacktraces == nil { + r.stacktraces = make(map[uint64]*lookupTable[*schemav1.Stacktrace]) } - v := uint32(len(t.unresolved)) | marker - t.unresolved = append(t.unresolved, x) - return v -} - -func (t *lookupTable[T]) storeResolved(i, v uint32) { t.resolved[i] = v + 1 } - -func (t *lookupTable[T]) lookupUnresolved(x uint32) uint32 { - if x&marker == 0 { - // Already resolved. - return x + p, ok := r.stacktraces[partition] + if !ok { + p = newLookupTable[*schemav1.Stacktrace](r.stats.StacktracesTotal) + r.stacktraces[partition] = p } - return t.unresolved[x&markedMask] -} + p.reset() -func (t *lookupTable[T]) iter() *lookupTableIterator[T] { - t.values = make([]T, len(t.resolved)) - return &lookupTableIterator[T]{ - values: t.values, + if r.locations == nil { + r.locations = newLookupTable[*schemav1.InMemoryLocation](r.stats.LocationsTotal) + r.mappings = newLookupTable[*schemav1.InMemoryMapping](r.stats.MappingsTotal) + r.functions = newLookupTable[*schemav1.InMemoryFunction](r.stats.FunctionsTotal) + r.strings = newLookupTable[string](r.stats.StringsTotal) + return nil } -} - -// TODO(kolesnikovae): -type lookupTableIterator[T any] struct { - cur uint32 - values []T -} - -func (t *lookupTableIterator[T]) set(v T) { t.values[t.cur] = v } - -func (r *stacktraceRewriter) symbolsResolver() SymbolsResolver { - // TODO(kolesnikovae): - return nil -} - -func (r *stacktraceRewriter) symbolsAppender() SymbolsAppender { - // TODO(kolesnikovae): - return nil -} - -func (r *stacktraceRewriter) reset(partition uint64) { - r.partition = partition - r.stacktraces[partition].reset() r.locations.reset() r.mappings.reset() r.functions.reset() r.strings.reset() + return nil } func (r *stacktraceRewriter) hasUnresolved() bool { @@ -583,112 +546,265 @@ func (r *stacktraceRewriter) hasUnresolved() bool { len(r.strings.unresolved) > 0 } -func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) error { - r.reset(partition) - r.populateUnresolved(stacktraces) +func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) (err error) { + if err = r.init(partition); err != nil { + return err + } + if err = r.populateUnresolved(stacktraces); err != nil { + return err + } if r.hasUnresolved() { - r.append(stacktraces) + if err = r.appendRewrite(stacktraces); err != nil { + return err + } } return nil } -func (r *stacktraceRewriter) populateUnresolved(stacktraces []uint32) { +func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { // Filter out all stack traces that have been already resolved. src := r.stacktraces[r.partition] - for i, v := range stacktraces { - stacktraces[i] = src.tryLookup(v) + for i, v := range stacktraceIDs { + stacktraceIDs[i] = src.tryLookup(v) } if len(src.unresolved) == 0 { - return + return nil } // Resolve locations for new stack traces. var stacktrace *schemav1.Stacktrace unresolvedStacktraces := src.iter() - p := r.symbolsResolver() - for i := p.Stacktraces(unresolvedStacktraces); i.Next(); stacktrace = i.At() { - for i, loc := range stacktrace.LocationIDs { - stacktrace.LocationIDs[i] = uint64(r.locations.tryLookup(uint32(loc))) + stacktraces := r.resolver.Stacktraces(unresolvedStacktraces) + for ; stacktraces.Err() == nil && stacktraces.Next(); stacktrace = stacktraces.At() { + for j, loc := range stacktrace.LocationIDs { + stacktrace.LocationIDs[j] = uint64(r.locations.tryLookup(uint32(loc))) } - unresolvedStacktraces.set(stacktrace) + // TODO(kolesnikovae): Copy. + unresolvedStacktraces.setValue(stacktrace) + } + if err := stacktraces.Err(); err != nil { + return err } // Resolve functions and mappings for new locations. var location *schemav1.InMemoryLocation unresolvedLocs := r.locations.iter() - for i := p.Locations(unresolvedLocs); i.Next(); location = i.At() { + locations := r.resolver.Locations(unresolvedLocs) + for ; locations.Err() == nil && locations.Next(); location = locations.At() { location.MappingId = r.mappings.tryLookup(location.MappingId) for j, line := range location.Line { location.Line[j].FunctionId = r.functions.tryLookup(line.FunctionId) } - unresolvedLocs.set(location) + unresolvedLocs.setValue(location) + } + if err := locations.Err(); err != nil { + return err } // Resolve strings. var mapping *schemav1.InMemoryMapping unresolvedMappings := r.mappings.iter() - for i := p.Mappings(unresolvedMappings); i.Next(); mapping = i.At() { + mappings := r.resolver.Mappings(unresolvedMappings) + for ; mappings.Err() == nil && mappings.Next(); mapping = mappings.At() { mapping.BuildId = r.strings.tryLookup(mapping.BuildId) mapping.Filename = r.strings.tryLookup(mapping.Filename) - unresolvedMappings.set(mapping) + unresolvedMappings.setValue(mapping) + } + if err := mappings.Err(); err != nil { + return err } + var function *schemav1.InMemoryFunction unresolvedFunctions := r.functions.iter() - for i := p.Functions(unresolvedFunctions); i.Next(); function = i.At() { + functions := r.resolver.Functions(unresolvedFunctions) + for ; functions.Err() == nil && functions.Next(); function = functions.At() { function.Name = r.strings.tryLookup(function.Name) function.Filename = r.strings.tryLookup(function.Filename) function.SystemName = r.strings.tryLookup(function.SystemName) - unresolvedFunctions.set(function) + unresolvedFunctions.setValue(function) + } + if err := functions.Err(); err != nil { + return err } + var str string unresolvedStrings := r.strings.iter() - for i := p.Strings(unresolvedStrings); i.Next(); str = i.At() { - unresolvedStrings.set(str) + strings := r.resolver.Strings(unresolvedStrings) + for ; strings.Err() == nil && strings.Next(); str = strings.At() { + unresolvedStrings.setValue(str) } + return strings.Err() } -func (r *stacktraceRewriter) append(stacktraces []uint32) { - a := r.symbolsAppender() - for _, str := range r.strings.values { - r.functions.storeResolved(0, a.AppendString(str)) +func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { + for _, v := range r.strings.unresolved { + r.strings.storeResolved(v.uid, r.appender.AppendString(v.val)) } - for _, function := range r.functions.values { - function.Name = r.strings.lookupUnresolved(function.Name) - function.Filename = r.strings.lookupUnresolved(function.Filename) - function.SystemName = r.strings.lookupUnresolved(function.SystemName) - r.functions.storeResolved(0, a.AppendFunction(function)) + for _, v := range r.functions.unresolved { + function := v.val + function.Name = r.strings.lookupResolved(function.Name) + function.Filename = r.strings.lookupResolved(function.Filename) + function.SystemName = r.strings.lookupResolved(function.SystemName) + r.functions.storeResolved(v.uid, r.appender.AppendFunction(function)) } - for _, mapping := range r.mappings.values { - mapping.BuildId = r.strings.lookupUnresolved(mapping.BuildId) - mapping.Filename = r.strings.lookupUnresolved(mapping.Filename) - r.mappings.storeResolved(0, a.AppendMapping(mapping)) + for _, v := range r.mappings.unresolved { + mapping := v.val + mapping.BuildId = r.strings.lookupResolved(mapping.BuildId) + mapping.Filename = r.strings.lookupResolved(mapping.Filename) + r.mappings.storeResolved(v.uid, r.appender.AppendMapping(mapping)) } - for _, location := range r.locations.values { - location.MappingId = r.mappings.lookupUnresolved(location.MappingId) + for _, v := range r.locations.unresolved { + location := v.val + location.MappingId = r.mappings.lookupResolved(location.MappingId) for j, line := range location.Line { - location.Line[j].FunctionId = r.functions.lookupUnresolved(line.FunctionId) + location.Line[j].FunctionId = r.functions.lookupResolved(line.FunctionId) } - r.locations.storeResolved(0, a.AppendLocation(location)) + r.locations.storeResolved(v.uid, r.appender.AppendLocation(location)) } src := r.stacktraces[r.partition] - for _, stacktrace := range src.values { - for j, v := range stacktrace.LocationIDs { - stacktrace.LocationIDs[j] = uint64(r.locations.lookupUnresolved(uint32(v))) + for _, v := range src.unresolved { + stacktrace := v.val + for j, lid := range stacktrace.LocationIDs { + stacktrace.LocationIDs[j] = uint64(r.locations.lookupResolved(uint32(lid))) } - src.storeResolved(0, a.AppendStacktrace(stacktrace)) + src.storeResolved(v.uid, r.appender.AppendStacktrace(stacktrace)) } for i, v := range stacktraces { - stacktraces[i] = src.lookupUnresolved(v) + stacktraces[i] = src.lookupResolved(v) + } + + return r.appender.Flush() +} + +const ( + marker = 1 << 31 + markedMask = math.MaxUint32 >> 1 +) + +type lookupTable[T any] struct { + // Index is source ID, and the value is the destination ID. + // If destination ID is not known, the element is index to 'unresolved' (marked). + resolved []uint32 + unresolved []lookupTableValue[T] + refs []lookupTableRef +} + +type lookupTableValue[T any] struct { + rid uint32 // Index to resolved. + uid uint32 // Original index (unresolved). + val T +} + +type lookupTableRef struct{ rid, uid uint32 } + +func newLookupTable[T any](size int) *lookupTable[T] { + var t lookupTable[T] + t.init(size) + return &t +} + +func (t *lookupTable[T]) init(size int) { + if cap(t.resolved) < size { + t.resolved = make([]uint32, size) + return + } + t.resolved = t.resolved[:size] + for i := range t.resolved { + t.resolved[i] = 0 } } -type symbolsWriter struct { - // TODO(kolesnikovae): - SymbolsAppender +func (t *lookupTable[T]) reset() { + t.unresolved = t.unresolved[:0] + t.refs = t.refs[:0] } -func newSymbolsWriter(dst string) (*symbolsWriter, error) { return &symbolsWriter{}, nil } +// tryLookup looks up the value at x in resolved. +// If x is has not been resolved yet, the x is memorized +// for future resolve, and returned values is the marked +// index to unresolved. +func (t *lookupTable[T]) tryLookup(x uint32) uint32 { + if v := t.resolved[x]; v != 0 { + if v&marker > 0 { + return v // Already marked for resolve. + } + return v - 1 // Already resolved. + } + u := uint32(len(t.unresolved)) + t.unresolved = append(t.unresolved, lookupTableValue[T]{ + rid: x, + uid: u, + }) + u |= marker + t.resolved[x] = u + return u +} + +func (t *lookupTable[T]) storeResolved(uid, v uint32) { + t.resolved[t.unresolved[uid].rid] = v + 1 +} + +func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { + if x&marker > 0 { + return t.resolved[t.unresolved[x&markedMask].rid] - 1 + } + return x // Already resolved. +} + +func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + if cap(t.refs) < len(t.unresolved) { + t.refs = make([]lookupTableRef, len(t.unresolved)) + } else { + t.refs = t.refs[:len(t.unresolved)] + } + for i, v := range t.unresolved { + t.refs[i] = lookupTableRef{ + rid: v.rid, + uid: v.uid, + } + } + sort.Slice(t.refs, func(i, j int) bool { + return t.refs[i].rid < t.refs[j].rid + }) + return &lookupTableIterator[T]{table: t} +} + +type lookupTableIterator[T any] struct { + table *lookupTable[T] + cur uint32 +} + +func (t *lookupTableIterator[T]) Next() bool { + return t.cur < uint32(len(t.table.refs)) +} + +func (t *lookupTableIterator[T]) At() uint32 { + x := t.table.refs[t.cur].rid + t.cur++ + return x +} + +func (t *lookupTableIterator[T]) setValue(v T) { + uid := t.table.refs[t.cur].uid + t.table.unresolved[uid].val = v +} + +func (t *lookupTableIterator[T]) Close() error { return nil } + +func (t *lookupTableIterator[T]) Err() error { return nil } + +// TODO(kolesnikovae): + +type symbolsWriter struct{} + +func newSymbolsWriter(dst string) (*symbolsWriter, error) { + return &symbolsWriter{}, nil +} + +func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) { + return nil, nil +} From c046c3508284ecd35380f9800aeee82a8a541cfc Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Mon, 17 Jul 2023 21:06:28 +0800 Subject: [PATCH 3/4] Add lookup table test --- pkg/phlaredb/block_querier.go | 5 ++ pkg/phlaredb/compact.go | 58 +++++++++--------- pkg/phlaredb/compact_test.go | 107 +++++++++++++++++++++++++++++++++- 3 files changed, 138 insertions(+), 32 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index e431f14f5..0ee0b98a3 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -435,6 +435,11 @@ func (b *singleBlockQuerier) Index() IndexReader { return b.index } +func (b *singleBlockQuerier) Symbols() SymbolsReader { + // TODO(kolesnikovae) + return nil +} + func (b *singleBlockQuerier) Meta() block.Meta { if b.meta == nil { return block.Meta{} diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 8d2513005..4694d57c1 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -28,7 +28,7 @@ type BlockReader interface { Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - SymbolsReader + Symbols() SymbolsReader } // TODO(kolesnikovae): Refactor to symdb. @@ -426,7 +426,7 @@ func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w Sy rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), } for _, r := range blocks { - sr.rewriters[r] = newStacktraceRewriter(r, w) + sr.rewriters[r] = newStacktraceRewriter(r.Symbols(), w) } return &sr } @@ -458,6 +458,7 @@ func (s *symbolsRewriter) Next() bool { } s.numSamples += uint64(len(values)) for i, v := range values { + // FIXME: the original order is not preserved, which will affect encoding. values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) } }) @@ -638,7 +639,7 @@ func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { for _, v := range r.strings.unresolved { - r.strings.storeResolved(v.uid, r.appender.AppendString(v.val)) + r.strings.storeResolved(v.rid, r.appender.AppendString(v.val)) } for _, v := range r.functions.unresolved { @@ -646,14 +647,14 @@ func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { function.Name = r.strings.lookupResolved(function.Name) function.Filename = r.strings.lookupResolved(function.Filename) function.SystemName = r.strings.lookupResolved(function.SystemName) - r.functions.storeResolved(v.uid, r.appender.AppendFunction(function)) + r.functions.storeResolved(v.rid, r.appender.AppendFunction(function)) } for _, v := range r.mappings.unresolved { mapping := v.val mapping.BuildId = r.strings.lookupResolved(mapping.BuildId) mapping.Filename = r.strings.lookupResolved(mapping.Filename) - r.mappings.storeResolved(v.uid, r.appender.AppendMapping(mapping)) + r.mappings.storeResolved(v.rid, r.appender.AppendMapping(mapping)) } for _, v := range r.locations.unresolved { @@ -662,7 +663,7 @@ func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { for j, line := range location.Line { location.Line[j].FunctionId = r.functions.lookupResolved(line.FunctionId) } - r.locations.storeResolved(v.uid, r.appender.AppendLocation(location)) + r.locations.storeResolved(v.rid, r.appender.AppendLocation(location)) } src := r.stacktraces[r.partition] @@ -671,7 +672,7 @@ func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { for j, lid := range stacktrace.LocationIDs { stacktrace.LocationIDs[j] = uint64(r.locations.lookupResolved(uint32(lid))) } - src.storeResolved(v.uid, r.appender.AppendStacktrace(stacktrace)) + src.storeResolved(v.rid, r.appender.AppendStacktrace(stacktrace)) } for i, v := range stacktraces { stacktraces[i] = src.lookupResolved(v) @@ -682,7 +683,7 @@ func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { const ( marker = 1 << 31 - markedMask = math.MaxUint32 >> 1 + markerMask = math.MaxUint32 >> 1 ) type lookupTable[T any] struct { @@ -695,11 +696,13 @@ type lookupTable[T any] struct { type lookupTableValue[T any] struct { rid uint32 // Index to resolved. - uid uint32 // Original index (unresolved). val T } -type lookupTableRef struct{ rid, uid uint32 } +type lookupTableRef struct { + rid uint32 // Index to resolved. + uid uint32 // Original index (unresolved). +} func newLookupTable[T any](size int) *lookupTable[T] { var t lookupTable[T] @@ -734,45 +737,42 @@ func (t *lookupTable[T]) tryLookup(x uint32) uint32 { } return v - 1 // Already resolved. } - u := uint32(len(t.unresolved)) - t.unresolved = append(t.unresolved, lookupTableValue[T]{ - rid: x, - uid: u, - }) - u |= marker + t.unresolved = append(t.unresolved, lookupTableValue[T]{rid: x}) + u := uint32(len(t.unresolved)) | marker t.resolved[x] = u return u } -func (t *lookupTable[T]) storeResolved(uid, v uint32) { - t.resolved[t.unresolved[uid].rid] = v + 1 -} +func (t *lookupTable[T]) storeResolved(rid, v uint32) { t.resolved[rid] = v + 1 } func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { if x&marker > 0 { - return t.resolved[t.unresolved[x&markedMask].rid] - 1 + return t.resolved[t.unresolved[x&markerMask-1].rid] - 1 } return x // Already resolved. } func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + t.initRefs() + sort.Sort(t) + return &lookupTableIterator[T]{table: t} +} + +func (t *lookupTable[T]) initRefs() { if cap(t.refs) < len(t.unresolved) { t.refs = make([]lookupTableRef, len(t.unresolved)) } else { t.refs = t.refs[:len(t.unresolved)] } for i, v := range t.unresolved { - t.refs[i] = lookupTableRef{ - rid: v.rid, - uid: v.uid, - } + t.refs[i] = lookupTableRef{rid: v.rid, uid: uint32(i)} } - sort.Slice(t.refs, func(i, j int) bool { - return t.refs[i].rid < t.refs[j].rid - }) - return &lookupTableIterator[T]{table: t} } +func (t *lookupTable[T]) Len() int { return len(t.refs) } +func (t *lookupTable[T]) Less(i, j int) bool { return t.refs[i].rid < t.refs[j].rid } +func (t *lookupTable[T]) Swap(i, j int) { t.refs[i], t.refs[j] = t.refs[j], t.refs[i] } + type lookupTableIterator[T any] struct { table *lookupTable[T] cur uint32 @@ -789,7 +789,7 @@ func (t *lookupTableIterator[T]) At() uint32 { } func (t *lookupTableIterator[T]) setValue(v T) { - uid := t.table.refs[t.cur].uid + uid := t.table.refs[t.cur-1].uid t.table.unresolved[uid].val = v } diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index 213ddac24..2d277411a 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -3,13 +3,12 @@ package phlaredb import ( "context" "net/http" + _ "net/http/pprof" "sort" "sync" "testing" "time" - _ "net/http/pprof" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" @@ -132,6 +131,15 @@ func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst st "numSamples", new.Stats.NumSamples) } +type blockReaderMock struct { + BlockReader + idxr IndexReader +} + +func (m *blockReaderMock) Index() IndexReader { + return m.idxr +} + func TestProfileRowIterator(t *testing.T) { filePath := t.TempDir() + "/index.tsdb" idxw, err := index.NewWriter(context.Background(), filePath) @@ -158,7 +166,7 @@ func TestProfileRowIterator(t *testing.T) { {SeriesIndex: 1, TimeNanos: 2}, {SeriesIndex: 2, TimeNanos: 3}, }, - ), idxr) + ), &blockReaderMock{idxr: idxr}) require.NoError(t, err) assert.True(t, it.Next()) @@ -191,3 +199,96 @@ func addSeries(t *testing.T, idxw *index.Writer, idx int, labels phlaremodel.Lab t.Helper() require.NoError(t, idxw.AddSeries(storage.SeriesRef(idx), labels, model.Fingerprint(labels.Hash()), index.ChunkMeta{SeriesIndex: uint32(idx)})) } + +func Test_lookupTable(t *testing.T) { + // Given the source data set. + // Copy arbitrary subsets of those items to dst. + var dst []string + src := []string{ + "zero", + "one", + "two", + "three", + "four", + "five", + "six", + "seven", + } + + type testCase struct { + description string + input []uint32 + expected []string + } + + testCases := []testCase{ + { + description: "empty table", + input: []uint32{5, 0, 3, 1, 2, 2, 4}, + expected: []string{"five", "zero", "three", "one", "two", "two", "four"}, + }, + { + description: "no new values", + input: []uint32{2, 1, 2, 3}, + expected: []string{"two", "one", "two", "three"}, + }, + { + description: "new value mixed", + input: []uint32{2, 1, 6, 2, 3}, + expected: []string{"two", "one", "six", "two", "three"}, + }, + } + + // Try to lookup values in src lazily. + // Table size must be greater or equal + // to the source data set. + l := newLookupTable[string](10) + + populate := func(t *testing.T, x []uint32) { + for i, v := range x { + x[i] = l.tryLookup(v) + } + // Resolve unknown yet values. + // Mind the order and deduplication. + p := -1 + for it := l.iter(); it.Err() == nil && it.Next(); { + m := int(it.At()) + if m <= p { + t.Fatal("iterator order invalid") + } + p = m + it.setValue(src[m]) + } + } + + resolveAppend := func() { + // Populate dst with the newly resolved values. + // Note that order in dst does not have to match src. + for _, n := range l.unresolved { + l.storeResolved(n.rid, uint32(len(dst))) + dst = append(dst, n.val) + } + } + + resolve := func(x []uint32) []string { + // Lookup resolved values. + var resolved []string + for _, v := range x { + resolved = append(resolved, dst[l.lookupResolved(v)]) + } + return resolved + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.description, func(t *testing.T) { + l.reset() + populate(t, tc.input) + resolveAppend() + assert.Equal(t, tc.expected, resolve(tc.input)) + }) + } + + assert.Len(t, dst, 7) + assert.NotContains(t, dst, "seven") +} From f3e67f00d5f733343a52073711da7d755bd40a7e Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 18 Jul 2023 17:19:51 +0800 Subject: [PATCH 4/4] Symbols reader integration --- pkg/iter/iter.go | 14 ++ pkg/phlaredb/block_querier.go | 22 ++-- pkg/phlaredb/block_symbols_appender.go | 19 +++ pkg/phlaredb/block_symbols_reader.go | 92 +++++++++++++ pkg/phlaredb/compact.go | 176 ++++++++++++++----------- pkg/phlaredb/sample_merge.go | 16 +-- pkg/phlaredb/symdb/interfaces.go | 1 + 7 files changed, 238 insertions(+), 102 deletions(-) create mode 100644 pkg/phlaredb/block_symbols_appender.go create mode 100644 pkg/phlaredb/block_symbols_reader.go diff --git a/pkg/iter/iter.go b/pkg/iter/iter.go index 47fbe6f4e..34a16f7db 100644 --- a/pkg/iter/iter.go +++ b/pkg/iter/iter.go @@ -101,6 +101,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] { } } +type slicePositionIterator[T constraints.Integer, M any] struct { + i Iterator[T] + s []M +} + +func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] { + return slicePositionIterator[T, M]{s: s, i: i} +} + +func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() } +func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] } +func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() } +func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() } + type sliceSeekIterator[A constraints.Ordered] struct { *sliceIterator[A] } diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 0ee0b98a3..ad0c9ab75 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -305,7 +305,7 @@ type singleBlockQuerier struct { type StacktraceDB interface { Open(ctx context.Context) error Close() error - Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error + Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error } type stacktraceResolverV1 struct { @@ -321,14 +321,17 @@ func (r *stacktraceResolverV1) Close() error { return r.stacktraces.Close() } -func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { +func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs)) defer stacktraces.Close() - + t := make([]int32, 0, 64) for stacktraces.Next() { + t = t[:0] s := stacktraces.At() - locs.addFromParquet(int64(s.Row), s.Values) - + for i, v := range s.Values { + t[i] = v.Int32() + } + locs.InsertStacktrace(s.Row, t) } return stacktraces.Err() } @@ -351,19 +354,14 @@ func (r *stacktraceResolverV2) Close() error { return nil } -func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { +func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { mr, ok := r.reader.MappingReader(mapping) if !ok { return nil } resolver := mr.StacktraceResolver() defer resolver.Release() - - return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn( - func(stacktraceID uint32, locations []int32) { - locs.add(int64(stacktraceID), locations) - }, - ), stacktraceIDs) + return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs) } func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier { diff --git a/pkg/phlaredb/block_symbols_appender.go b/pkg/phlaredb/block_symbols_appender.go new file mode 100644 index 000000000..b39adb9e2 --- /dev/null +++ b/pkg/phlaredb/block_symbols_appender.go @@ -0,0 +1,19 @@ +package phlaredb + +import schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsWriter interface { + SymbolsAppender(partition uint64) (SymbolsAppender, error) +} + +type SymbolsAppender interface { + AppendStacktrace([]int32) uint32 + AppendLocation(*schemav1.InMemoryLocation) uint32 + AppendMapping(*schemav1.InMemoryMapping) uint32 + AppendFunction(*schemav1.InMemoryFunction) uint32 + AppendString(string) uint32 + + Flush() error +} diff --git a/pkg/phlaredb/block_symbols_reader.go b/pkg/phlaredb/block_symbols_reader.go new file mode 100644 index 000000000..6c6df7a52 --- /dev/null +++ b/pkg/phlaredb/block_symbols_reader.go @@ -0,0 +1,92 @@ +package phlaredb + +import ( + "context" + + "github.com/grafana/phlare/pkg/iter" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/symdb" +) + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsReader interface { + SymbolsResolver(partition uint64) (SymbolsResolver, error) +} + +type SymbolsResolver interface { + ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error + + Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] + Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] + Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] + Strings(iter.Iterator[uint32]) iter.Iterator[string] + + WriteStats(*SymbolStats) +} + +type SymbolStats struct { + StacktracesTotal int + LocationsTotal int + MappingsTotal int + FunctionsTotal int + StringsTotal int +} + +type inMemorySymbolsReader struct { + partitions map[uint64]*inMemorySymbolsResolver + + // TODO(kolesnikovae): Split into partitions. + strings inMemoryparquetReader[string, *schemav1.StringPersister] + functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister] + locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister] + mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister] + stacktraces StacktraceDB +} + +func (r *inMemorySymbolsReader) Symbols(partition uint64) SymbolsResolver { + p, ok := r.partitions[partition] + if !ok { + p = &inMemorySymbolsResolver{ + partition: 0, + ctx: nil, + reader: nil, + } + r.partitions[partition] = p + } + return p +} + +type inMemorySymbolsResolver struct { + partition uint64 + ctx context.Context + reader *inMemorySymbolsReader +} + +func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error { + return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces) +} + +func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] { + return iter.NewSliceIndexIterator(s.reader.locations.cache, i) +} + +func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] { + return iter.NewSliceIndexIterator(s.reader.mappings.cache, i) +} + +func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] { + return iter.NewSliceIndexIterator(s.reader.functions.cache, i) +} + +func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] { + return iter.NewSliceIndexIterator(s.reader.strings.cache, i) +} + +func (s inMemorySymbolsResolver) WriteStats(stats *SymbolStats) { + stats.StacktracesTotal = 0 // TODO + stats.LocationsTotal = int(s.reader.locations.NumRows()) + stats.MappingsTotal = int(s.reader.mappings.NumRows()) + stats.FunctionsTotal = int(s.reader.functions.NumRows()) + stats.StringsTotal = int(s.reader.strings.NumRows()) +} diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 4694d57c1..bae2fadc7 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -31,42 +31,6 @@ type BlockReader interface { Symbols() SymbolsReader } -// TODO(kolesnikovae): Refactor to symdb. - -type SymbolsReader interface { - SymbolsResolver(partition uint64) (SymbolsResolver, error) -} - -type SymbolsResolver interface { - Stacktraces(iter.Iterator[uint32]) iter.Iterator[*schemav1.Stacktrace] - Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] - Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] - Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] - Strings(iter.Iterator[uint32]) iter.Iterator[string] - WriteStats(*SymbolStats) -} - -type SymbolStats struct { - StacktracesTotal int - LocationsTotal int - MappingsTotal int - FunctionsTotal int - StringsTotal int -} - -type SymbolsWriter interface { - SymbolsAppender(partition uint64) (SymbolsAppender, error) -} - -type SymbolsAppender interface { - AppendStacktrace(*schemav1.Stacktrace) uint32 - AppendLocation(*schemav1.InMemoryLocation) uint32 - AppendMapping(*schemav1.InMemoryMapping) uint32 - AppendFunction(*schemav1.InMemoryFunction) uint32 - AppendString(string) uint32 - Flush() error -} - func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { if len(src) <= 1 { return block.Meta{}, errors.New("not enough blocks to compact") @@ -484,7 +448,9 @@ type stacktraceRewriter struct { writer SymbolsWriter // Stack trace identifiers are only valid within the partition. - stacktraces map[uint64]*lookupTable[*schemav1.Stacktrace] + stacktraces map[uint64]*lookupTable[[]int32] + inserter *stacktraceInserter + // Objects below have global addressing. locations *lookupTable[*schemav1.InMemoryLocation] mappings *lookupTable[*schemav1.InMemoryMapping] @@ -516,11 +482,12 @@ func (r *stacktraceRewriter) init(partition uint64) (err error) { // Only stacktraces are yet partitioned. if r.stacktraces == nil { - r.stacktraces = make(map[uint64]*lookupTable[*schemav1.Stacktrace]) + r.stacktraces = make(map[uint64]*lookupTable[[]int32]) + r.inserter = new(stacktraceInserter) } p, ok := r.stacktraces[partition] if !ok { - p = newLookupTable[*schemav1.Stacktrace](r.stats.StacktracesTotal) + p = newLookupTable[[]int32](r.stats.StacktracesTotal) r.stacktraces[partition] = p } p.reset() @@ -536,6 +503,13 @@ func (r *stacktraceRewriter) init(partition uint64) (err error) { r.mappings.reset() r.functions.reset() r.strings.reset() + + r.inserter = &stacktraceInserter{ + slt: p, + llt: r.locations, + s: r.inserter.s, + } + return nil } @@ -563,35 +537,20 @@ func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces [] } func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { - // Filter out all stack traces that have been already resolved. - src := r.stacktraces[r.partition] - for i, v := range stacktraceIDs { - stacktraceIDs[i] = src.tryLookup(v) + // Filter out all stack traces that have been already + // resolved and populate locations lookup table. + if err := r.resolveStacktraces(stacktraceIDs); err != nil { + return err } - if len(src.unresolved) == 0 { + if len(r.locations.unresolved) == 0 { return nil } - // Resolve locations for new stack traces. - var stacktrace *schemav1.Stacktrace - unresolvedStacktraces := src.iter() - stacktraces := r.resolver.Stacktraces(unresolvedStacktraces) - for ; stacktraces.Err() == nil && stacktraces.Next(); stacktrace = stacktraces.At() { - for j, loc := range stacktrace.LocationIDs { - stacktrace.LocationIDs[j] = uint64(r.locations.tryLookup(uint32(loc))) - } - // TODO(kolesnikovae): Copy. - unresolvedStacktraces.setValue(stacktrace) - } - if err := stacktraces.Err(); err != nil { - return err - } - // Resolve functions and mappings for new locations. - var location *schemav1.InMemoryLocation unresolvedLocs := r.locations.iter() locations := r.resolver.Locations(unresolvedLocs) - for ; locations.Err() == nil && locations.Next(); location = locations.At() { + for locations.Err() == nil && locations.Next() { + location := locations.At() location.MappingId = r.mappings.tryLookup(location.MappingId) for j, line := range location.Line { location.Line[j].FunctionId = r.functions.tryLookup(line.FunctionId) @@ -603,10 +562,10 @@ func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { } // Resolve strings. - var mapping *schemav1.InMemoryMapping unresolvedMappings := r.mappings.iter() mappings := r.resolver.Mappings(unresolvedMappings) - for ; mappings.Err() == nil && mappings.Next(); mapping = mappings.At() { + for mappings.Err() == nil && mappings.Next() { + mapping := mappings.At() mapping.BuildId = r.strings.tryLookup(mapping.BuildId) mapping.Filename = r.strings.tryLookup(mapping.Filename) unresolvedMappings.setValue(mapping) @@ -615,10 +574,10 @@ func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { return err } - var function *schemav1.InMemoryFunction unresolvedFunctions := r.functions.iter() functions := r.resolver.Functions(unresolvedFunctions) - for ; functions.Err() == nil && functions.Next(); function = functions.At() { + for functions.Err() == nil && functions.Next() { + function := functions.At() function.Name = r.strings.tryLookup(function.Name) function.Filename = r.strings.tryLookup(function.Filename) function.SystemName = r.strings.tryLookup(function.SystemName) @@ -628,11 +587,10 @@ func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { return err } - var str string unresolvedStrings := r.strings.iter() strings := r.resolver.Strings(unresolvedStrings) - for ; strings.Err() == nil && strings.Next(); str = strings.At() { - unresolvedStrings.setValue(str) + for strings.Err() == nil && strings.Next() { + unresolvedStrings.setValue(strings.At()) } return strings.Err() } @@ -669,8 +627,8 @@ func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { src := r.stacktraces[r.partition] for _, v := range src.unresolved { stacktrace := v.val - for j, lid := range stacktrace.LocationIDs { - stacktrace.LocationIDs[j] = uint64(r.locations.lookupResolved(uint32(lid))) + for j, lid := range stacktrace { + stacktrace[j] = int32(r.locations.lookupResolved(uint32(lid))) } src.storeResolved(v.rid, r.appender.AppendStacktrace(stacktrace)) } @@ -681,6 +639,48 @@ func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { return r.appender.Flush() } +func (r *stacktraceRewriter) resolveStacktraces(stacktraceIDs []uint32) error { + stacktraces := r.stacktraces[r.partition] + for i, v := range stacktraceIDs { + stacktraceIDs[i] = stacktraces.tryLookup(v) + } + if len(stacktraces.unresolved) == 0 { + return nil + } + + // Gather and sort references to unresolved stacks. + stacktraces.initRefs() + sort.Sort(stacktraces) + grow(r.inserter.s, len(stacktraces.refs)) + for j, u := range stacktraces.refs { + r.inserter.s[j] = u.rid + } + + return r.resolver.ResolveStacktraces(context.TODO(), r.inserter, r.inserter.s) +} + +type stacktraceInserter struct { + slt *lookupTable[[]int32] + llt *lookupTable[*schemav1.InMemoryLocation] + s []uint32 + c int +} + +func (i *stacktraceInserter) InsertStacktrace(stacktrace uint32, locations []int32) { + // Resolve locations for new stack traces. + for j, loc := range locations { + locations[j] = int32(i.llt.tryLookup(uint32(loc))) + } + // Update the unresolved value. + v := i.slt.referenceAt(i.c) + if v.rid != stacktrace { + panic("unexpected stack trace") + } + grow(v.val, len(locations)) + copy(v.val, locations) + i.c++ +} + const ( marker = 1 << 31 markerMask = math.MaxUint32 >> 1 @@ -737,16 +737,33 @@ func (t *lookupTable[T]) tryLookup(x uint32) uint32 { } return v - 1 // Already resolved. } - t.unresolved = append(t.unresolved, lookupTableValue[T]{rid: x}) - u := uint32(len(t.unresolved)) | marker + u := t.newUnresolvedValue(x) | marker t.resolved[x] = u return u } +func (t *lookupTable[T]) newUnresolvedValue(rid uint32) uint32 { + x := len(t.unresolved) + if x < cap(t.unresolved) { + // Try to reuse previously allocated value. + x++ + t.unresolved = t.unresolved[:x] + t.unresolved[x].rid = rid + } else { + t.unresolved = append(t.unresolved, lookupTableValue[T]{rid: rid}) + } + return uint32(x) +} + +func (t *lookupTable[T]) referenceAt(x int) *lookupTableValue[T] { + u := t.refs[x].uid + return &t.unresolved[u] +} + func (t *lookupTable[T]) storeResolved(rid, v uint32) { t.resolved[rid] = v + 1 } func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { - if x&marker > 0 { + if x&marker > 0 { // TODO: why? return t.resolved[t.unresolved[x&markerMask-1].rid] - 1 } return x // Already resolved. @@ -759,11 +776,7 @@ func (t *lookupTable[T]) iter() *lookupTableIterator[T] { } func (t *lookupTable[T]) initRefs() { - if cap(t.refs) < len(t.unresolved) { - t.refs = make([]lookupTableRef, len(t.unresolved)) - } else { - t.refs = t.refs[:len(t.unresolved)] - } + grow(t.refs, len(t.unresolved)) for i, v := range t.unresolved { t.refs[i] = lookupTableRef{rid: v.rid, uid: uint32(i)} } @@ -797,6 +810,13 @@ func (t *lookupTableIterator[T]) Close() error { return nil } func (t *lookupTableIterator[T]) Err() error { return nil } +func grow[T any](s []T, n int) []T { + if cap(s) < n { + return make([]T, n, 2*n) + } + return s[:n] +} + // TODO(kolesnikovae): type symbolsWriter struct{} diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index 14065e8a3..6d2389f25 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -56,20 +56,12 @@ func newLocationsIdsByStacktraceID(size int) locationsIdsByStacktraceID { } } -func (l locationsIdsByStacktraceID) addFromParquet(stacktraceID int64, locs []parquet.Value) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) - for i, locationID := range locs { - locID := locationID.Uint64() - l.ids[int64(locID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = int32(locID) - } -} - -func (l locationsIdsByStacktraceID) add(stacktraceID int64, locs []int32) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) +func (l locationsIdsByStacktraceID) InsertStacktrace(stacktraceID uint32, locs []int32) { + s := make([]int32, len(locs)) + l.byStacktraceID[int64(stacktraceID)] = s for i, locationID := range locs { l.ids[int64(locationID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = locationID + s[i] = locationID } } diff --git a/pkg/phlaredb/symdb/interfaces.go b/pkg/phlaredb/symdb/interfaces.go index fe7c03b20..cd07358d1 100644 --- a/pkg/phlaredb/symdb/interfaces.go +++ b/pkg/phlaredb/symdb/interfaces.go @@ -10,6 +10,7 @@ import ( // collection. https://github.com/google/pprof/blob/main/proto/README.md // // In the package, Mapping represents all the version of a binary. +// TODO(kolesnikovae): Rename mapping to Partition type MappingWriter interface { // StacktraceAppender provides exclusive write access