Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Feb 18, 2025
1 parent 0f8d83c commit faf7998
Show file tree
Hide file tree
Showing 4 changed files with 633 additions and 596 deletions.
48 changes: 26 additions & 22 deletions pkg/limits/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package frontend

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
"github.com/axiomhq/hyperloglog"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -131,9 +133,15 @@ func (s *RingIngestLimitsService) forGivenReplicaSet(ctx context.Context, replic
}

func (s *RingIngestLimitsService) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error) {
hashes := make([]uint64, 0, len(req.Streams))
for _, s := range req.Streams {
hashes = append(hashes, s.StreamHash)
}

resps, err := s.forAllBackends(ctx, func(_ context.Context, client logproto.IngestLimitsClient) (*logproto.GetStreamUsageResponse, error) {
return client.GetStreamUsage(ctx, &logproto.GetStreamUsageRequest{
Tenant: req.Tenant,
HasStreamHashes: hashes,
})
})
if err != nil {
Expand All @@ -142,41 +150,37 @@ func (s *RingIngestLimitsService) ExceedsLimits(ctx context.Context, req *logpro

maxGlobalStreams := s.limits.MaxGlobalStreamsPerUser(req.Tenant)

var (
activeStreamsTotal uint64
uniqueStreamHashes = make(map[uint64]bool)
)
knownHashes := make(map[uint64]struct{})

log := hyperloglog.New16()
for _, resp := range resps {
var duplicates uint64
// Record the unique stream hashes
// and count duplicates active streams
// to be subtracted from the total
for _, stream := range resp.Response.RecordedStreams {
if uniqueStreamHashes[stream.StreamHash] {
duplicates++
continue
}
uniqueStreamHashes[stream.StreamHash] = true
for _, hash := range resp.Response.KnownStreamHashes {
knownHashes[hash] = struct{}{}
}

activeStreamsTotal += resp.Response.ActiveStreams - duplicates

if duplicates > 0 {
s.metrics.tenantDuplicateStreamsFound.WithLabelValues(req.Tenant).Inc()
if resp.Response.ActiveStreams > 0 {
tmp := hyperloglog.New16()
if err := tmp.UnmarshalBinary(resp.Response.Sketch); err != nil {
return nil, err
}
if err := log.Merge(tmp); err != nil {
return nil, err
}
}

}

s.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))
// s.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))

if activeStreamsTotal < uint64(maxGlobalStreams) {
fmt.Println("log estimate", log.Estimate(), "max_global_streams", maxGlobalStreams)
if log.Estimate() < uint64(maxGlobalStreams) {
return &logproto.ExceedsLimitsResponse{
Tenant: req.Tenant,
}, nil
}

var rejectedStreams []*logproto.RejectedStream
for _, stream := range req.Streams {
if !uniqueStreamHashes[stream.StreamHash] {
if _, ok := knownHashes[stream.StreamHash]; !ok {
rejectedStreams = append(rejectedStreams, &logproto.RejectedStream{
StreamHash: stream.StreamHash,
Reason: RejectedStreamReasonExceedsGlobalLimit,
Expand Down
33 changes: 25 additions & 8 deletions pkg/limits/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/axiomhq/hyperloglog"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -275,6 +276,8 @@ func (s *IngestLimits) updateMetadata(metadata *logproto.StreamMetadata, tenant
s.metadata[tenant] = make(map[uint64]int64)
}

fmt.Println("ingesting stream hash", metadata.StreamHash)

// Use the provided lastSeenAt timestamp as the last seen time
recordTime := lastSeenAt.UnixNano()
if current, ok := s.metadata[tenant][metadata.StreamHash]; !ok || recordTime > current {
Expand Down Expand Up @@ -372,29 +375,43 @@ func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStream
return &logproto.GetStreamUsageResponse{
Tenant: req.Tenant,
ActiveStreams: 0,
Sketch: nil,
KnownStreamHashes: nil,
}, nil
}

hashes := make(map[uint64]struct{})
for _, hash := range req.HasStreamHashes {
hashes[hash] = struct{}{}
}

knownHashes := make([]uint64, 0)

// Count total active streams for the tenant
// across all assigned partitions and record
// the streams that have been seen within the
// window
var (
activeStreams uint64
recordedStreams = make([]*logproto.RecordedStreams, 0, len(streams))
)
var activeStreams uint64
hll := hyperloglog.New16()
for hash, lastSeen := range streams {
if lastSeen >= cutoff {
activeStreams++
recordedStreams = append(recordedStreams, &logproto.RecordedStreams{
StreamHash: hash,
})
hll.InsertHash(hash)
if _, ok := hashes[hash]; ok {
knownHashes = append(knownHashes, hash)
}
}
}

b, err := hll.MarshalBinary()
if err != nil {
return nil, err
}

return &logproto.GetStreamUsageResponse{
Tenant: req.Tenant,
ActiveStreams: activeStreams,
RecordedStreams: recordedStreams,
Sketch: b,
KnownStreamHashes: knownHashes,
}, nil
}
Loading

0 comments on commit faf7998

Please sign in to comment.