Skip to content

Commit

Permalink
TraceQL: support mixed-type attribute querying (int/float)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndk committed Feb 6, 2025
1 parent f947882 commit 9ac7d86
Show file tree
Hide file tree
Showing 13 changed files with 1,338 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ integration/e2e/deployments/e2e_integration_test[0-9]*
/tmp
gh-token.txt
.cache
.devcontainer
testdata
k6
102 changes: 99 additions & 3 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"fmt"
"io"
"math"
"slices"
"strings"
"sync"
"sync/atomic"

"github.com/grafana/tempo/pkg/parquetquery/intern"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util"
pq "github.com/parquet-go/parquet-go"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -1805,10 +1808,12 @@ func (j *LeftJoinIterator) String() string {
for _, r := range j.required {
srequired += "\n\t" + util.TabOut(r)
}
soptional := "optional: "
for _, o := range j.optional {
soptional += "\n\t" + util.TabOut(o)
optional := make([]string, len(j.optional))
for i, o := range j.optional {
optional[i] = "\n\t" + util.TabOut(o)
}
slices.Sort(optional)
soptional := "optional: " + strings.Join(optional, "")
return fmt.Sprintf("LeftJoinIterator: %d: %s\n%s\n%s", j.definitionLevel, j.pred, srequired, soptional)
}

Expand Down Expand Up @@ -2121,6 +2126,97 @@ func (u *UnionIterator) Close() {
}
}

// FilterNilIterator filters out rows where the given key is nil.
type FilterNilIterator struct {
base Iterator
filterKey string
}

func NewFilterNilIterator(base Iterator, filterKey string) Iterator {
return &FilterNilIterator{
base: base,
filterKey: filterKey,
}
}

func (it *FilterNilIterator) String() string {
return fmt.Sprintf("FilterNilIterator(%q):\n\t%s", it.filterKey, util.TabOut(it.base))
}

func (it *FilterNilIterator) Close() {
it.base.Close()
}

func (it *FilterNilIterator) Next() (*IteratorResult, error) {
res, err := it.base.Next()
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}

removeNilsIfDupKey(res, it.filterKey)

return res, nil
}

func (it *FilterNilIterator) SeekTo(to RowNumber, definitionLevel int) (*IteratorResult, error) {
res, err := it.base.SeekTo(to, definitionLevel)
if err != nil || res == nil {
return res, err
}
removeNilsIfDupKey(res, it.filterKey)
return res, nil
}

func removeNilsIfDupKey(res *IteratorResult, filterKey string) {
isNilStatic := func(v interface{}) bool {
st, ok := v.(traceql.Static)
return ok && st.Type == traceql.TypeNil
}

// Check if we have a non-nil value for the filter key
haveNonNil := false
for _, entry := range res.Entries {
if entry.Key == filterKey && !entry.Value.IsNull() {
haveNonNil = true
break
}
}
if !haveNonNil {
for _, entry := range res.OtherEntries {
if entry.Key == filterKey && !isNilStatic(entry.Value) {
haveNonNil = true
break
}
}
}

// If we have a non-nil value for the filter key, remove all nil values for the filter key
if haveNonNil {
{
entries := res.Entries[:0]
for _, entry := range res.Entries {
if entry.Key != filterKey || !entry.Value.IsNull() {
entries = append(entries, entry)
}
}
res.Entries = entries
}

{
otherEntries := res.OtherEntries[:0]
for _, entry := range res.OtherEntries {
if entry.Key != filterKey || !isNilStatic(entry.Value) {
otherEntries = append(otherEntries, entry)
}
}
res.OtherEntries = otherEntries
}
}
}

type GroupPredicate interface {
fmt.Stringer

Expand Down
81 changes: 81 additions & 0 deletions tempodb/encoding/vparquet2/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,71 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet
}
}

// createIntPredicateFromFloat adapts float-based queries to integer columns.
// If the float operand has no fractional part, it's treated as an integer directly.
// Otherwise, specific shifts are applied (e.g., floor or ceil) depending on the operator,
// or conclude that equality is impossible.
//
// Example: { spanAttr > 3.5 } but 'spanAttr' is stored as int. We'll look for rows where 'spanAttr' >= 4.
func createIntPredicateFromFloat(op traceql.Operator, operands traceql.Operands) (parquetquery.Predicate, error) {
if op == traceql.OpNone {
return nil, nil
}

f := operands[0].Float()
// Check if f has a fractional part
if _, frac := math.Modf(f); frac == 0 {
// If it's an integer float, treat it purely as int
intOperands := traceql.Operands{traceql.NewStaticInt(int(f))}
return createIntPredicate(op, intOperands)
}

switch op {
case traceql.OpEqual:
// No integer can be strictly equal to a float with a fractional part
return nil, nil
case traceql.OpNotEqual:
// An integer will always differ from a float that has a fractional part
return parquetquery.NewCallbackPredicate(func() bool { return true }), nil
case traceql.OpGreater, traceql.OpGreaterEqual:
// For > 3.5 or >= 3.5, effectively we do >= 4
// For > -3.5 or >= -3.5, effectively we do >= -3
i := int(f)
if i > 0 {
i++
}
return createIntPredicate(traceql.OpGreaterEqual, traceql.Operands{traceql.NewStaticInt(i)})
case traceql.OpLess, traceql.OpLessEqual:
// For < 3.5 or <= 3.5, effectively we do <= 3
// For < -3.5 or <= -3.5, effectively we do <= -4
i := int(f)
if i < 0 {
i--
}
return createIntPredicate(traceql.OpLessEqual, traceql.Operands{traceql.NewStaticInt(i)})
default:
return nil, fmt.Errorf("unsupported operator for float to int conversion: %v", op)
}
}

// createFloatPredicateFromInt adapts integer-based queries to float columns.
// If the operand can be interpreted as an integer, it's converted to float
// and we delegate further processing to createFloatPredicate.
//
// Example: { spanAttr = 5 } but 'spanAttr' is stored as float. We'll look for rows where 'spanAttr' = 5.0.
func createFloatPredicateFromInt(op traceql.Operator, operands traceql.Operands) (parquetquery.Predicate, error) {
if op == traceql.OpNone {
return nil, nil
}

if i, ok := operands[0].Int(); ok {
floatOperands := traceql.Operands{traceql.NewStaticFloat(float64(i))}
return createFloatPredicate(op, floatOperands)
}

return nil, nil
}

func createFloatPredicate(op traceql.Operator, operands traceql.Operands) (parquetquery.Predicate, error) {
if op == traceql.OpNone {
return nil, nil
Expand Down Expand Up @@ -1846,13 +1911,29 @@ func createAttributeIterator(makeIter makeIterFn, conditions []traceql.Condition
attrStringPreds = append(attrStringPreds, pred)

case traceql.TypeInt:
// Create a predicate specifically for integer comparisons
pred, err := createIntPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, fmt.Errorf("creating attribute predicate: %w", err)
}
attrIntPreds = append(attrIntPreds, pred)

// If the operand can be interpreted as a float, create an additional predicate
if pred, err := createFloatPredicateFromInt(cond.Op, cond.Operands); err != nil {
return nil, fmt.Errorf("creating float attribute predicate from int: %w", err)
} else if pred != nil {
attrFltPreds = append(attrFltPreds, pred)
}

case traceql.TypeFloat:
// Attempt to create a predicate for integer comparisons, if applicable
if pred, err := createIntPredicateFromFloat(cond.Op, cond.Operands); err != nil {
return nil, fmt.Errorf("creating int attribute predicate from float: %w", err)
} else if pred != nil {
attrIntPreds = append(attrIntPreds, pred)
}

// Create a predicate specifically for float comparisons
pred, err := createFloatPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, fmt.Errorf("creating attribute predicate: %w", err)
Expand Down
74 changes: 73 additions & 1 deletion tempodb/encoding/vparquet2/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,39 @@ func TestBackendBlockSearchTraceQL(t *testing.T) {
parse(t, `{resource.`+LabelServiceName+` <= 124}`),
},
},
// Cross-type comparisons
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint > 122.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint >= 122.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint <= 123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint = 123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint != 123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint >= 123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint < 123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint <= 123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint > -123.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint >= -123.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint <= -123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint = -123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint != -123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint >= -123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint < -122.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint <= -122.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag > 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag >= 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag <= 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag = 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag != 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag >= 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag <= 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag < 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag != 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag > 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag >= 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag > 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag >= 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag <= 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag < 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag != 457}`),
}

for _, req := range searchesThatMatch {
Expand Down Expand Up @@ -316,6 +349,41 @@ func TestBackendBlockSearchTraceQL(t *testing.T) {
parse(t, `{`+LabelDuration+` = 100s }`), // Match
},
},
// Cross-type comparisons
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint < 122.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint = 122.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint <= 122.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint < 123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint != 123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint > 123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint >= 123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint = 123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossint > 123.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint < -123.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint = -122.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint = -123.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint <= -123.9}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint < -123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint != -123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint > -123.0}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint >= -122.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint = -122.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossnint > -122.1}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag < 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag = 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag <= 455}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag < 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag != 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag > 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag >= 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag = 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_nofrag > 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag < 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag = 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag <= 456}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag >= 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag = 457}`),
traceql.MustExtractFetchSpansRequestWithMetadata(`{.crossfloat_frag > 457}`),
}

for _, req := range searchesThatDontMatch {
Expand Down Expand Up @@ -431,7 +499,11 @@ func fullyPopulatedTestTrace(id common.ID) *Trace {
{Key: "bar", ValueInt: intPtr(123)},
{Key: "float", ValueDouble: fltPtr(456.78)},
{Key: "bool", ValueBool: boolPtr(false)},

// For cross-type comparisons
{Key: "crossint", ValueInt: intPtr(123)},
{Key: "crossnint", ValueInt: intPtr(-123)},
{Key: "crossfloat_nofrag", ValueDouble: fltPtr(456.0)},
{Key: "crossfloat_frag", ValueDouble: fltPtr(456.78)},
// Edge-cases
{Key: LabelName, Value: strPtr("Bob")}, // Conflicts with intrinsic but still looked up by .name
{Key: LabelServiceName, Value: strPtr("spanservicename")}, // Overrides resource-level dedicated column
Expand Down
Loading

0 comments on commit 9ac7d86

Please sign in to comment.