Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Feb 18, 2025
1 parent 0f8d83c commit e7b6bc1
Show file tree
Hide file tree
Showing 8 changed files with 843 additions and 794 deletions.
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import (
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
ingester_client "github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
Expand Down Expand Up @@ -180,7 +180,7 @@ type Distributor struct {

// Will succeed usage tracker in future.
limitsFrontendRing ring.ReadRing
limitsFrontends *ring_client.Pool
limitsFrontends *ring_client.Pool

// kafka
kafkaWriter KafkaProducer
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, stream
})
}
req := logproto.ExceedsLimitsRequest{
Tenant: tenantID,
Tenant: tenantID,
Streams: streamsWithSize,
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
Expand Down Expand Up @@ -1743,6 +1745,15 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
ring: partitionRing,
}

limitsFrontendRing, err := ring.New(ring.Config{
KVStore: kv.Config{
Mock: kvStore,
},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 1,
}, limits_frontend.RingKey, limits_frontend.RingKey, nil, nil)
require.NoError(t, err)

loopbackName, err := loki_net.LoopbackInterfaceName()
require.NoError(t, err)

Expand All @@ -1769,8 +1780,9 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
require.NoError(t, err)

ingesterConfig := ingester.Config{MaxChunkAge: 2 * time.Hour}
limitsFrontendCfg := limits_frontend_client.Config{}

d, err := New(distributorConfig, ingesterConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
d, err := New(distributorConfig, ingesterConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, limitsFrontendCfg, limitsFrontendRing, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
Expand Down
48 changes: 26 additions & 22 deletions pkg/limits/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package frontend

import (
"context"
"fmt"

"github.com/axiomhq/hyperloglog"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -131,9 +133,15 @@ func (s *RingIngestLimitsService) forGivenReplicaSet(ctx context.Context, replic
}

func (s *RingIngestLimitsService) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error) {
hashes := make([]uint64, 0, len(req.Streams))
for _, s := range req.Streams {
hashes = append(hashes, s.StreamHash)
}

resps, err := s.forAllBackends(ctx, func(_ context.Context, client logproto.IngestLimitsClient) (*logproto.GetStreamUsageResponse, error) {
return client.GetStreamUsage(ctx, &logproto.GetStreamUsageRequest{
Tenant: req.Tenant,
Tenant: req.Tenant,
HasStreamHashes: hashes,
})
})
if err != nil {
Expand All @@ -142,41 +150,37 @@ func (s *RingIngestLimitsService) ExceedsLimits(ctx context.Context, req *logpro

maxGlobalStreams := s.limits.MaxGlobalStreamsPerUser(req.Tenant)

var (
activeStreamsTotal uint64
uniqueStreamHashes = make(map[uint64]bool)
)
knownHashes := make(map[uint64]struct{})

log := hyperloglog.New16()
for _, resp := range resps {
var duplicates uint64
// Record the unique stream hashes
// and count duplicates active streams
// to be subtracted from the total
for _, stream := range resp.Response.RecordedStreams {
if uniqueStreamHashes[stream.StreamHash] {
duplicates++
continue
for _, hash := range resp.Response.KnownStreamHashes {
knownHashes[hash] = struct{}{}
}
if resp.Response.ActiveStreams > 0 {
tmp := hyperloglog.New16()
if err := tmp.UnmarshalBinary(resp.Response.Sketch); err != nil {
return nil, err
}
if err := log.Merge(tmp); err != nil {
return nil, err
}
uniqueStreamHashes[stream.StreamHash] = true
}

activeStreamsTotal += resp.Response.ActiveStreams - duplicates

if duplicates > 0 {
s.metrics.tenantDuplicateStreamsFound.WithLabelValues(req.Tenant).Inc()
}
}

s.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))
// s.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))

if activeStreamsTotal < uint64(maxGlobalStreams) {
fmt.Println("log estimate", log.Estimate(), "max_global_streams", maxGlobalStreams)
if log.Estimate() < uint64(maxGlobalStreams) {
return &logproto.ExceedsLimitsResponse{
Tenant: req.Tenant,
}, nil
}

var rejectedStreams []*logproto.RejectedStream
for _, stream := range req.Streams {
if !uniqueStreamHashes[stream.StreamHash] {
if _, ok := knownHashes[stream.StreamHash]; !ok {
rejectedStreams = append(rejectedStreams, &logproto.RejectedStream{
StreamHash: stream.StreamHash,
Reason: RejectedStreamReasonExceedsGlobalLimit,
Expand Down
Loading

0 comments on commit e7b6bc1

Please sign in to comment.