Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Feb 18, 2025
1 parent 0f8d83c commit d2395e7
Show file tree
Hide file tree
Showing 6 changed files with 827 additions and 790 deletions.
48 changes: 26 additions & 22 deletions pkg/limits/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package frontend

import (
"context"
"fmt"

"github.com/axiomhq/hyperloglog"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -131,9 +133,15 @@ func (s *RingIngestLimitsService) forGivenReplicaSet(ctx context.Context, replic
}

func (s *RingIngestLimitsService) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error) {
hashes := make([]uint64, 0, len(req.Streams))
for _, s := range req.Streams {
hashes = append(hashes, s.StreamHash)
}

resps, err := s.forAllBackends(ctx, func(_ context.Context, client logproto.IngestLimitsClient) (*logproto.GetStreamUsageResponse, error) {
return client.GetStreamUsage(ctx, &logproto.GetStreamUsageRequest{
Tenant: req.Tenant,
Tenant: req.Tenant,
HasStreamHashes: hashes,
})
})
if err != nil {
Expand All @@ -142,41 +150,37 @@ func (s *RingIngestLimitsService) ExceedsLimits(ctx context.Context, req *logpro

maxGlobalStreams := s.limits.MaxGlobalStreamsPerUser(req.Tenant)

var (
activeStreamsTotal uint64
uniqueStreamHashes = make(map[uint64]bool)
)
knownHashes := make(map[uint64]struct{})

log := hyperloglog.New16()
for _, resp := range resps {
var duplicates uint64
// Record the unique stream hashes
// and count duplicates active streams
// to be subtracted from the total
for _, stream := range resp.Response.RecordedStreams {
if uniqueStreamHashes[stream.StreamHash] {
duplicates++
continue
for _, hash := range resp.Response.KnownStreamHashes {
knownHashes[hash] = struct{}{}
}
if resp.Response.ActiveStreams > 0 {
tmp := hyperloglog.New16()
if err := tmp.UnmarshalBinary(resp.Response.Sketch); err != nil {
return nil, err
}
if err := log.Merge(tmp); err != nil {
return nil, err
}
uniqueStreamHashes[stream.StreamHash] = true
}

activeStreamsTotal += resp.Response.ActiveStreams - duplicates

if duplicates > 0 {
s.metrics.tenantDuplicateStreamsFound.WithLabelValues(req.Tenant).Inc()
}
}

s.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))
// s.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))

if activeStreamsTotal < uint64(maxGlobalStreams) {
fmt.Println("log estimate", log.Estimate(), "max_global_streams", maxGlobalStreams)
if log.Estimate() < uint64(maxGlobalStreams) {
return &logproto.ExceedsLimitsResponse{
Tenant: req.Tenant,
}, nil
}

var rejectedStreams []*logproto.RejectedStream
for _, stream := range req.Streams {
if !uniqueStreamHashes[stream.StreamHash] {
if _, ok := knownHashes[stream.StreamHash]; !ok {
rejectedStreams = append(rejectedStreams, &logproto.RejectedStream{
StreamHash: stream.StreamHash,
Reason: RejectedStreamReasonExceedsGlobalLimit,
Expand Down
Loading

0 comments on commit d2395e7

Please sign in to comment.