From 06c278f9d98b833ae6f49e0794a07b3a1b40d603 Mon Sep 17 00:00:00 2001 From: spq Date: Tue, 7 Jan 2025 20:22:48 +0100 Subject: [PATCH 1/5] don't return useless id lookup that just results in a bad iteration order if no ide filter is present --- internal/index/search.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/index/search.go b/internal/index/search.go index 09b64ee..8690106 100644 --- a/internal/index/search.go +++ b/internal/index/search.go @@ -748,7 +748,7 @@ conditions: lookups = append(lookups, func() ([]uint32, error) { return []uint32{idx}, nil }) - } else { + } else if minIDFilter != 0 || maxIDFilter != math.MaxUint64 { lookups = append(lookups, func() ([]uint32, error) { lookup := []uint32(nil) for id, index := range r.containedStreamIds { From 50f5605cd66e8ae98002c94366c9434f414914da Mon Sep 17 00:00:00 2001 From: spq Date: Wed, 8 Jan 2025 19:32:24 +0100 Subject: [PATCH 2/5] rework lookup code to enable skipping evaluating streams for the incoming search if those can not reach the result list: if the sorting criteria has a lookup and we are able to iterate the possible streams in the order described by that lookup, we can abort the search as soon as the result list is full --- internal/index/search.go | 71 ++++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/internal/index/search.go b/internal/index/search.go index 8690106..90482d3 100644 --- a/internal/index/search.go +++ b/internal/index/search.go @@ -1103,12 +1103,9 @@ func SearchStreams(ctx context.Context, indexes []*Reader, limitIDs *bitmask.Lon if err != nil { return nil, false, nil, err } - if queryPart.possible && len(queryPart.lookups) == 0 && sortingLookup != nil { - queryPart.lookups = append(queryPart.lookups, sortingLookup) - } queryParts = append(queryParts, queryPart) } - err := idx.searchStreams(ctx, &results, allResults, queryParts, groupingData, sorter, resultLimit) + err := idx.searchStreams(ctx, &results, allResults, queryParts, groupingData, sorter, resultLimit, sortingLookup) if err != nil { return nil, false, nil, err } @@ -1129,20 +1126,28 @@ func SearchStreams(ctx context.Context, indexes []*Reader, limitIDs *bitmask.Lon return results.streams[skip:], results.resultDropped != 0, dataRegexes, nil } -func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQueryResults map[string]resultData, queryParts []queryPart, grouper *grouper, sortingLess func(a, b *Stream) bool, limit uint) error { +func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQueryResults map[string]resultData, queryParts []queryPart, grouper *grouper, sortingLess func(a, b *Stream) bool, limit uint, sortingLookup func() ([]uint32, error)) error { // check if all queries use lookups, if not don't evaluate them useLookups := true - allImpossible := true + possibleQueryParts := 0 for _, qp := range queryParts { - if qp.possible { - allImpossible = false + if !qp.possible { + continue } + possibleQueryParts++ if len(qp.lookups) == 0 { useLookups = false } } - if allImpossible { + switch possibleQueryParts { + case 0: return nil + case 1: + if sortingLookup != nil { + useLookups = true + } + default: + sortingLookup = nil } // a map of index to list of sub-queries that matched this id type streamIndex struct { @@ -1157,6 +1162,13 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery continue } streamIndexesOfQuery := []uint32(nil) + if sortingLookup != nil { + newStreamIndexes, err := sortingLookup() + if err != nil { + return err + } + streamIndexesOfQuery = newStreamIndexes + } for _, l := range qp.lookups { newStreamIndexes, err := l() if err != nil { @@ -1210,22 +1222,29 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery } // apply filters to lookup results or all streams, if no lookups could be used - filterAndAddToResult := func(activeQueryParts bitmask.ShortBitmask, si uint32) error { + filterAndAddToResult := func(activeQueryParts bitmask.ShortBitmask, si uint32) (bool, error) { if err := ctx.Err(); err != nil { - return err + return false, err } + + // check if the sorting and limit would allow any stream + limitReached := result.resultDropped != 0 && limit != 0 && uint(len(result.streams)) >= limit + if limitReached && sortingLess == nil { + return true, nil + } + s, err := r.streamByIndex(si) if err != nil { - return err + return false, err } ss, err := s.wrap(r, si) if err != nil { - return err + return false, err } // check if the sorting and limit would allow this stream - if result.resultDropped != 0 && limit != 0 && uint(len(result.streams)) >= limit && (sortingLess == nil || !sortingLess(ss, result.streams[limit-1])) { - return nil + if limitReached && !sortingLess(ss, result.streams[limit-1]) { + return true, nil } // check if the sorting within the groupKey allow this stream @@ -1237,7 +1256,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery if ok { groupPos = pos if sortingLess == nil || !sortingLess(ss, result.streams[pos]) { - return nil + return false, nil } } } @@ -1265,7 +1284,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery for _, f := range queryParts[qpIdx].filters { matching, err := f(sc, s) if err != nil { - return err + return false, err } if !matching { continue queryPart @@ -1275,7 +1294,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery matchingSearchContexts = append(matchingSearchContexts, sc) } if matchingQueryParts.IsZero() { - return nil + return false, nil } if grouper != nil && len(grouper.vars) != 0 { @@ -1306,7 +1325,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery groupPos = pos if sortingLess == nil || !sortingLess(ss, result.streams[pos]) { result.resultDropped++ - return nil + return false, nil } } } @@ -1323,7 +1342,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery } else { // we have a limit and are worse than the last result.resultDropped++ - return nil + return true, nil } } @@ -1415,7 +1434,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery } } if len(vdv) == 0 { - return nil + return false, nil } sort.Slice(vdv, func(i, j int) bool { a, b := &vdv[i], &vdv[j] @@ -1450,7 +1469,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery } d.uses++ result.variableAssociation[s.StreamID] = i - return nil + return false, nil } if freeSlot == len(result.variableData) { result.variableData = append(result.variableData, variableDataCollection{}) @@ -1460,12 +1479,14 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery data: vdv, } result.variableAssociation[s.StreamID] = freeSlot - return nil + return false, nil } if len(streamIndexes) != 0 { for _, si := range streamIndexes { - if err := filterAndAddToResult(si.activeQueryParts, si.si); err != nil { + if limitReached, err := filterAndAddToResult(si.activeQueryParts, si.si); err != nil { return err + } else if sortingLookup != nil && limitReached { + break } } } else { @@ -1476,7 +1497,7 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery } } for si, sc := 0, r.StreamCount(); si < sc; si++ { - if err := filterAndAddToResult(activeQueryParts, uint32(si)); err != nil { + if _, err := filterAndAddToResult(activeQueryParts, uint32(si)); err != nil { return err } } From 434cbac84bf7d64b165c76d7915f09894f91d356 Mon Sep 17 00:00:00 2001 From: spq Date: Wed, 8 Jan 2025 19:42:58 +0100 Subject: [PATCH 3/5] improve read performance by getting rid of reflection --- internal/index/reader.go | 82 ++++++++++++++++++++++++---------------- internal/index/search.go | 2 +- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/internal/index/reader.go b/internal/index/reader.go index 32bc4d1..1fa128d 100644 --- a/internal/index/reader.go +++ b/internal/index/reader.go @@ -95,8 +95,46 @@ func (r *Reader) readAt(offset int64, d interface{}) error { return err } -func (r *Reader) readObject(section section, objectSize, index int, d interface{}) error { - return r.readAt(r.calculateOffset(section, objectSize, index), d) +var isLittleEndian bool + +func init() { + isLittleEndian = binary.NativeEndian.Uint16([]byte("AB")) == binary.LittleEndian.Uint16([]byte("AB")) +} + +func (r *Reader) streamByIndex(index uint32) (*stream, error) { + obj := stream{} + var err error + var d interface{} + if isLittleEndian { + d = (*[unsafe.Sizeof(obj)]byte)(unsafe.Pointer(&obj)) + } else { + d = obj + } + err = r.readAt(r.calculateOffset(sectionStreams, int(unsafe.Sizeof(obj)), int(index)), d) + return &obj, err +} + +func (r *Reader) packetByIndex(index uint64) (*packet, error) { + obj := packet{} + var err error + var d interface{} + if isLittleEndian { + d = (*[unsafe.Sizeof(obj)]byte)(unsafe.Pointer(&obj)) + } else { + d = obj + } + err = r.readAt(r.calculateOffset(sectionPackets, int(unsafe.Sizeof(obj)), int(index)), d) + return &obj, err +} + +func (r *Reader) readLookup(lookup section, index int) (uint32, error) { + streamIndex := uint32(0) + err := r.readAt(r.calculateOffset(lookup, 4, index), &streamIndex) + return streamIndex, err +} + +func (r *Reader) readObjects(section section, d interface{}) error { + return r.readAt(r.calculateOffset(section, 0, 0), d) } func (r *Reader) objectCount(section section, objectSize int) int { @@ -135,11 +173,11 @@ func NewReader(filename string) (*Reader, error) { // read imports importFilenames := make([]byte, r.header.Sections[sectionImportFilenames].size()) - if err := r.readObject(sectionImportFilenames, 0, 0, importFilenames); err != nil { + if err := r.readObjects(sectionImportFilenames, importFilenames); err != nil { return err } importEntries := make([]importEntry, r.header.Sections[sectionImports].size()/int64(unsafe.Sizeof(importEntry{}))) - if err := r.readObject(sectionImports, 0, 0, importEntries); err != nil { + if err := r.readObjects(sectionImports, importEntries); err != nil { return err } for _, ie := range importEntries { @@ -153,15 +191,15 @@ func NewReader(filename string) (*Reader, error) { // read hosts v4hosts := make([]byte, r.header.Sections[sectionV4Hosts].size()) - if err := r.readObject(sectionV4Hosts, 0, 0, v4hosts); err != nil { + if err := r.readObjects(sectionV4Hosts, v4hosts); err != nil { return err } v6hosts := make([]byte, r.header.Sections[sectionV6Hosts].size()) - if err := r.readObject(sectionV6Hosts, 0, 0, v6hosts); err != nil { + if err := r.readObjects(sectionV6Hosts, v6hosts); err != nil { return err } hostGroups := make([]hostGroupEntry, r.header.Sections[sectionHostGroups].size()/int64(unsafe.Sizeof(hostGroupEntry{}))) - if err := r.readObject(sectionHostGroups, 0, 0, hostGroups); err != nil { + if err := r.readObjects(sectionHostGroups, hostGroups); err != nil { return err } for _, hg := range hostGroups { @@ -241,12 +279,6 @@ func (r *Reader) PacketCount() int { return r.objectCount(sectionPackets, int(unsafe.Sizeof(packet{}))) } -func (r *Reader) readLookup(lookup section, index int) (uint32, error) { - streamIndex := uint32(0) - err := r.readObject(lookup, 4, index, &streamIndex) - return streamIndex, err -} - func (r *Reader) minStream(lookup section) (*stream, error) { i, err := r.readLookup(lookup, 0) if err != nil { @@ -275,22 +307,6 @@ func (r *Reader) StreamIDs() map[uint64]uint32 { return r.containedStreamIds } -func (r *Reader) streamByIndex(index uint32) (*stream, error) { - s := stream{} - if err := r.readObject(sectionStreams, int(unsafe.Sizeof(stream{})), int(index), &s); err != nil { - return nil, err - } - return &s, nil -} - -func (r *Reader) packetByIndex(index uint64) (*packet, error) { - p := packet{} - if err := r.readObject(sectionPackets, int(unsafe.Sizeof(packet{})), int(index), &p); err != nil { - return nil, err - } - return &p, nil -} - func (s stream) wrap(r *Reader, idx uint32) (*Stream, error) { return &Stream{ stream: s, @@ -320,8 +336,8 @@ func (r *Reader) streamIndexByLookup(section section, f func(s *stream) (bool, e if firstError != nil { return false } - streamIndex := uint32(0) - if err := r.readObject(section, 4, i, &streamIndex); err != nil { + streamIndex, err := r.readLookup(section, i) + if err != nil { firstError = err return false } @@ -343,8 +359,8 @@ func (r *Reader) streamIndexByLookup(section section, f func(s *stream) (bool, e if idx >= r.StreamCount() { return 0, false, nil } - streamIndex := uint32(0) - if err := r.readObject(section, 4, idx, &streamIndex); err != nil { + streamIndex, err := r.readLookup(section, idx) + if err != nil { return 0, false, err } return streamIndex, true, firstError diff --git a/internal/index/search.go b/internal/index/search.go index 90482d3..6a906e0 100644 --- a/internal/index/search.go +++ b/internal/index/search.go @@ -1080,7 +1080,7 @@ func SearchStreams(ctx context.Context, indexes []*Reader, limitIDs *bitmask.Lon sortingLookup = func() ([]uint32, error) { if res == nil { res = make([]uint32, idx.StreamCount()) - if err := idx.readObject(section, 0, 0, res); err != nil { + if err := idx.readObjects(section, res); err != nil { return nil, err } if reverse { From b93e8d3a15840ede8aa0413f8e6b90c9efa25dab Mon Sep 17 00:00:00 2001 From: spq Date: Thu, 9 Jan 2025 23:06:16 +0100 Subject: [PATCH 4/5] rework search to use (sorting) lookups more efficiently - adds support for or-connected queries - disables using the sort lookup if not useful (no limit) - perform some searches in file order to make loading more efficient if no early exit is possible --- internal/index/search.go | 241 ++++++++++++++++++++++----------------- 1 file changed, 136 insertions(+), 105 deletions(-) diff --git a/internal/index/search.go b/internal/index/search.go index 6a906e0..0abcbd2 100644 --- a/internal/index/search.go +++ b/internal/index/search.go @@ -1127,100 +1127,6 @@ func SearchStreams(ctx context.Context, indexes []*Reader, limitIDs *bitmask.Lon } func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQueryResults map[string]resultData, queryParts []queryPart, grouper *grouper, sortingLess func(a, b *Stream) bool, limit uint, sortingLookup func() ([]uint32, error)) error { - // check if all queries use lookups, if not don't evaluate them - useLookups := true - possibleQueryParts := 0 - for _, qp := range queryParts { - if !qp.possible { - continue - } - possibleQueryParts++ - if len(qp.lookups) == 0 { - useLookups = false - } - } - switch possibleQueryParts { - case 0: - return nil - case 1: - if sortingLookup != nil { - useLookups = true - } - default: - sortingLookup = nil - } - // a map of index to list of sub-queries that matched this id - type streamIndex struct { - si uint32 - activeQueryParts bitmask.ShortBitmask - } - streamIndexes := []streamIndex(nil) - if useLookups { - streamIndexesPosition := map[uint32]int{} - for qpIdx, qp := range queryParts { - if !qp.possible { - continue - } - streamIndexesOfQuery := []uint32(nil) - if sortingLookup != nil { - newStreamIndexes, err := sortingLookup() - if err != nil { - return err - } - streamIndexesOfQuery = newStreamIndexes - } - for _, l := range qp.lookups { - newStreamIndexes, err := l() - if err != nil { - return err - } - if len(newStreamIndexes) == 0 { - streamIndexesOfQuery = nil - break - } - if len(streamIndexesOfQuery) == 0 { - streamIndexesOfQuery = newStreamIndexes - continue - } - newStreamIndexesMap := make(map[uint32]struct{}, len(newStreamIndexes)) - for _, si := range newStreamIndexes { - newStreamIndexesMap[si] = struct{}{} - } - // filter out old stream indexes with the new lookup - removed := 0 - for i := 0; i < len(streamIndexesOfQuery); i++ { - si := streamIndexesOfQuery[i] - if _, ok := newStreamIndexesMap[si]; !ok { - removed++ - } else if removed != 0 { - streamIndexesOfQuery[i-removed] = si - } - } - streamIndexesOfQuery = streamIndexesOfQuery[:len(streamIndexesOfQuery)-removed] - if len(streamIndexesOfQuery) == 0 { - break - } - } - for _, si := range streamIndexesOfQuery { - pos, ok := streamIndexesPosition[si] - if ok { - sis := &streamIndexes[pos] - sis.activeQueryParts.Set(uint(qpIdx)) - } else { - streamIndexesPosition[si] = len(streamIndexes) - streamIndexes = append(streamIndexes, streamIndex{ - si: si, - activeQueryParts: bitmask.ShortBitmask{}, - }) - streamIndexes[len(streamIndexes)-1].activeQueryParts.Set(uint(qpIdx)) - } - } - } - if len(streamIndexes) == 0 { - return nil - } - } - // apply filters to lookup results or all streams, if no lookups could be used filterAndAddToResult := func(activeQueryParts bitmask.ShortBitmask, si uint32) (bool, error) { if err := ctx.Err(); err != nil { @@ -1481,26 +1387,151 @@ func (r *Reader) searchStreams(ctx context.Context, result *resultData, subQuery result.variableAssociation[s.StreamID] = freeSlot return false, nil } - if len(streamIndexes) != 0 { - for _, si := range streamIndexes { - if limitReached, err := filterAndAddToResult(si.activeQueryParts, si.si); err != nil { + + // check if all queries use lookups, if not don't use lookups + activeQueryParts := bitmask.ShortBitmask{} + lookupMissing := false + for qpIdx, qp := range queryParts { + if !qp.possible { + continue + } + activeQueryParts.Set(uint(qpIdx)) + if len(qp.lookups) == 0 { + lookupMissing = true + } + } + if activeQueryParts.OnesCount() == 0 { + return nil + } + + // if we don't have a limit, we should not use the sorting lookup as no early exit is possible + if limit == 0 { + sortingLookup = nil + } + + if lookupMissing { + // we miss a lookup for at least one query part, so we will be doing a full table scan + + // without sorting lookup, we will evaluate in file order without early exit + if sortingLookup == nil { + for si, sc := 0, r.StreamCount(); si < sc; si++ { + if _, err := filterAndAddToResult(activeQueryParts, uint32(si)); err != nil { + return err + } + } + return nil + } + + // with sorting lookup, we might be able to exit early if we reach the limit + sortedStreamIndexes, err := sortingLookup() + if err != nil { + return err + } + for _, si := range sortedStreamIndexes { + if limitReached, err := filterAndAddToResult(activeQueryParts, si); err != nil { + return err + } else if limitReached { + break + } + } + } + + // all query parts have lookups, build a map of stream indexes to active query parts + type streamIndex struct { + si uint32 + activeQueryParts bitmask.ShortBitmask + } + streamIndexes := []streamIndex(nil) + + // build a list of stream indexes that match any query part + streamIndexesPosition := map[uint32]int{} + for qpIdx, qp := range queryParts { + if !qp.possible { + continue + } + streamIndexesOfQuery := []uint32(nil) + for _, l := range qp.lookups { + newStreamIndexes, err := l() + if err != nil { return err - } else if sortingLookup != nil && limitReached { + } + if len(newStreamIndexes) == 0 { + streamIndexesOfQuery = nil + break + } + if len(streamIndexesOfQuery) == 0 { + streamIndexesOfQuery = newStreamIndexes + continue + } + newStreamIndexesMap := make(map[uint32]struct{}, len(newStreamIndexes)) + for _, si := range newStreamIndexes { + newStreamIndexesMap[si] = struct{}{} + } + // filter out old stream indexes with the new lookup + removed := 0 + for i := 0; i < len(streamIndexesOfQuery); i++ { + si := streamIndexesOfQuery[i] + if _, ok := newStreamIndexesMap[si]; !ok { + removed++ + } else if removed != 0 { + streamIndexesOfQuery[i-removed] = si + } + } + streamIndexesOfQuery = streamIndexesOfQuery[:len(streamIndexesOfQuery)-removed] + if len(streamIndexesOfQuery) == 0 { break } } - } else { - activeQueryParts := bitmask.ShortBitmask{} - for qpIdx, qp := range queryParts { - if qp.possible { - activeQueryParts.Set(uint(qpIdx)) + for _, si := range streamIndexesOfQuery { + pos, ok := streamIndexesPosition[si] + if ok { + sis := &streamIndexes[pos] + sis.activeQueryParts.Set(uint(qpIdx)) + } else { + streamIndexesPosition[si] = len(streamIndexes) + streamIndexes = append(streamIndexes, streamIndex{ + si: si, + activeQueryParts: bitmask.ShortBitmask{}, + }) + streamIndexes[len(streamIndexes)-1].activeQueryParts.Set(uint(qpIdx)) } } - for si, sc := 0, r.StreamCount(); si < sc; si++ { - if _, err := filterAndAddToResult(activeQueryParts, uint32(si)); err != nil { + } + + // without sorting lookup, we can just evaluate the streams potentially matching + // any of the query parts in file order, no early exit is possible + if sortingLookup == nil { + // sort the stream indexes to allow evaluating the streams in file order + sort.Slice(streamIndexes, func(i, j int) bool { + return streamIndexes[i].si < streamIndexes[j].si + }) + for _, si := range streamIndexes { + _, err := filterAndAddToResult(si.activeQueryParts, si.si) + if err != nil { return err } } + return nil + } + + sortedStreamIndexes, err := sortingLookup() + if err != nil { + return err + } + + // evaluate the steams using the sort order lookup and test + // each index against the information from the lookups + for _, si := range sortedStreamIndexes { + pos, ok := streamIndexesPosition[si] + if !ok { + continue + } + aqp := streamIndexes[pos].activeQueryParts + if limitReached, err := filterAndAddToResult(aqp, si); err != nil { + return err + } else if limitReached { + break + } } return nil } From fd6f35934a96a0e23dee33ceb5e14b211fc3853d Mon Sep 17 00:00:00 2001 From: spq Date: Thu, 9 Jan 2025 23:29:16 +0100 Subject: [PATCH 5/5] add more test cases to improve coverage of new lookup related code --- internal/index/search_test.go | 54 ++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/internal/index/search_test.go b/internal/index/search_test.go index b09a4ab..d6a8be8 100644 --- a/internal/index/search_test.go +++ b/internal/index/search_test.go @@ -474,6 +474,50 @@ func TestSearchStreams(t *testing.T) { "sort:cport,sport", []uint64{1, 2, 0}, }, + { + "impossible filter", + []streamInfo{ + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*1), []string{"foo"}), + }, + "id:123", + nil, + }, + { + "partially impossible filter", + []streamInfo{ + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*1), []string{"foo"}), + }, + "id:123 or id::100", + []uint64{0}, + }, + { + "sort with allowed early exit", + []streamInfo{ + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*1), []string{"foo"}), + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*2), []string{"foo"}), + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*3), []string{"foo"}), + }, + "sort:id limit:2", + []uint64{0, 1}, + }, + { + "sort with allowed early exit", + []streamInfo{ + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*1), []string{"foo"}), + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*2), []string{"foo"}), + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*3), []string{"foo"}), + }, + "id:0:2 sort:id limit:2", + []uint64{0, 1}, + }, + { + "impossible filter supporting lookup", + []streamInfo{ + makeStream("1.2.3.4:1234", "1.2.3.4:1234", t1.Add(time.Hour*1), []string{"foo"}), + }, + "id:123: limit:2", + nil, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -493,7 +537,11 @@ func TestSearchStreams(t *testing.T) { if err != nil { t.Errorf("Error parsing query: %v", err) } - results, _, _, err := SearchStreams(context.Background(), []*Reader{r}, nil, q.ReferenceTime, q.Conditions, q.Grouping, q.Sorting, 100, 0, nil, converters, false) + l := uint(100) + if q.Limit != nil { + l = *q.Limit + } + results, _, _, err := SearchStreams(context.Background(), []*Reader{r}, nil, q.ReferenceTime, q.Conditions, q.Grouping, q.Sorting, l, 0, nil, converters, false) if err != nil { t.Fatalf("Error searching streams: %v", err) } @@ -507,3 +555,7 @@ func TestSearchStreams(t *testing.T) { }) } } + +func TestSearch(t *testing.T) { + +}