diff --git a/pkg/parquetquery/iters.go b/pkg/parquetquery/iters.go index 23d22a21a0f..086763f8de1 100644 --- a/pkg/parquetquery/iters.go +++ b/pkg/parquetquery/iters.go @@ -508,10 +508,12 @@ type SyncIterator struct { currRowGroupMax RowNumber currChunk *ColumnChunkHelper currPage pq.Page + currPageMin RowNumber currPageMax RowNumber currValues pq.ValueReader currBuf []pq.Value currBufN int + currPageN int } var _ Iterator = (*SyncIterator)(nil) @@ -583,6 +585,8 @@ func (c *SyncIterator) SeekTo(to RowNumber, definitionLevel int) (*IteratorResul return nil, nil } + c.seekWithinPage(to, definitionLevel) + // The row group and page have been selected to where this value is possibly // located. Now scan through the page and look for it. for { @@ -710,6 +714,72 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo return false, nil } +// seekWithinPage decides if it should reslice the current page to jump directly to the desired row number +// or allow the iterator to call Next() until it finds the desired row number. it uses the magicThreshold +// as its balance point. if the number of Next()s to skip is less than the magicThreshold, it will not reslice +func (c *SyncIterator) seekWithinPage(to RowNumber, definitionLevel int) { + rowSkipRelative := int(to[0] - c.curr[0]) + const magicThreshold = 1000 + shouldSkip := false + + if definitionLevel == 0 { + // if definition level is 0 there is always a 1:1 ratio between Next()s and rows. it's only deeper + // levels of nesting we have to manually count + shouldSkip = rowSkipRelative > magicThreshold + } else { + // this is a nested iterator, let's count the Next()s required to get to the desired row number + // and decide if we should skip or not + replvls := c.currPage.RepetitionLevels() + nextsRequired := 0 + + for i := c.currPageN; i < len(replvls); i++ { + nextsRequired++ + + if nextsRequired > magicThreshold { + shouldSkip = true + break + } + + if replvls[i] == 0 { // 0 rep lvl indicates a new row + rowSkipRelative-- // decrement the number of rows we need to skip + if rowSkipRelative <= 0 { + // if we hit here we skipped all rows and did not exceed the magic threshold, so we're leaving shouldSkip false + break + } + } + } + } + + if !shouldSkip { + return + } + + // skips are calculated off the start of the page + rowSkip := to[0] - c.currPageMin[0] + if rowSkip < 1 { + return + } + if rowSkip > c.currPage.NumRows() { + return + } + + // reslice the page to jump directly to the desired row number + pg := c.currPage.Slice(rowSkip-1, c.currPage.NumRows()) + + // remove all detail below the row number + c.curr = TruncateRowNumber(0, to) + c.curr = c.curr.Preceding() + + // reset buffers and other vars + pq.Release(c.currPage) + c.currPage = pg + c.currPageMin = c.curr + c.currValues = pg.Values() + c.currPageN = 0 + syncIteratorPoolPut(c.currBuf) + c.currBuf = nil +} + // next is the core functionality of this iterator and returns the next matching result. This // may involve inspecting multiple row groups, pages, and values until a match is found. When // we run out of things to inspect, it returns nil. The reason this method is distinct from @@ -778,6 +848,7 @@ func (c *SyncIterator) next() (RowNumber, *pq.Value, error) { // even if the value is filtered out next. c.curr.Next(v.RepetitionLevel(), v.DefinitionLevel()) c.currBufN++ + c.currPageN++ if c.filter != nil && !c.filter.KeepValue(*v) { continue @@ -808,7 +879,9 @@ func (c *SyncIterator) setPage(pg pq.Page) { // Reset value buffers c.currValues = nil c.currPageMax = EmptyRowNumber() + c.currPageMin = EmptyRowNumber() c.currBufN = 0 + c.currPageN = 0 // If we don't immediately have a new incoming page // then return the buffer to the pool. @@ -822,6 +895,7 @@ func (c *SyncIterator) setPage(pg pq.Page) { rn := c.curr rn.Skip(pg.NumRows() + 1) // Exclusive upper bound, points at the first rownumber in the next page c.currPage = pg + c.currPageMin = c.curr c.currPageMax = rn c.currValues = pg.Values() } diff --git a/tempodb/encoding/vparquet3/block_traceql_test.go b/tempodb/encoding/vparquet3/block_traceql_test.go index c4bc3034d38..9d578173ee6 100644 --- a/tempodb/encoding/vparquet3/block_traceql_test.go +++ b/tempodb/encoding/vparquet3/block_traceql_test.go @@ -584,7 +584,7 @@ func BenchmarkBackendBlockTraceQL(b *testing.B) { ctx := context.TODO() tenantID := "1" - blockID := uuid.MustParse("000d37d0-1e66-4f4e-bbd4-f85c1deb6e5e") + blockID := uuid.MustParse("00000c2f-8133-4a60-a62a-7748bd146938") // blockID := uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5") r, _, _, err := local.New(&local.Config{