Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Feb 3, 2025
1 parent 6a7b517 commit 0f8d83c
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 48 deletions.
60 changes: 60 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
ingester_client "github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
Expand Down Expand Up @@ -177,6 +178,10 @@ type Distributor struct {
ingesterTasks chan pushIngesterTask
ingesterTaskWg sync.WaitGroup

// Will succeed usage tracker in future.
limitsFrontendRing ring.ReadRing
limitsFrontends *ring_client.Pool

// kafka
kafkaWriter KafkaProducer
partitionRing ring.PartitionRingReader
Expand All @@ -201,6 +206,8 @@ func New(
metricsNamespace string,
tee Tee,
usageTracker push.UsageTracker,
limitsFrontendCfg limits_frontend_client.Config,
limitsFrontendRing ring.ReadRing,
logger log.Logger,
) (*Distributor, error) {
ingesterClientFactory := cfg.factory
Expand All @@ -221,6 +228,10 @@ func New(
return nil, err
}

limitsFrontendClientFactory := ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) {
return limits_frontend_client.New(limitsFrontendCfg, addr)
})

// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifecycler *ring.BasicLifecycler
Expand Down Expand Up @@ -318,6 +329,14 @@ func New(
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
limitsFrontendRing: limitsFrontendRing,
limitsFrontends: limits_frontend_client.NewPool(
"ingest-limits-frontend",
limitsFrontendCfg.PoolConfig,
limitsFrontendRing,
limitsFrontendClientFactory,
logger,
),
}

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
Expand Down Expand Up @@ -450,6 +469,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, httpgrpc.Errorf(http.StatusUnprocessableEntity, validation.MissingStreamsErrorMsg)
}

exceedsLimits, err := d.exceedsLimits(ctx, tenantID, req.Streams)
if err != nil {
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits", "err", err)
} else if len(exceedsLimits.RejectedStreams) > 0 {
level.Error(d.logger).Log("msg", "tenant request exceeded limits", "tenant", tenantID)
} else {
level.Info(d.logger).Log("msg", "within limits", "tenant", tenantID)
}

// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
Expand Down Expand Up @@ -1065,6 +1093,38 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
}
}

func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, streams []logproto.Stream) (*logproto.ExceedsLimitsResponse, error) {
streamsWithSize := make([]*logproto.StreamMetadataWithSize, 0, len(streams))
for _, stream := range streams {
streamsWithSize = append(streamsWithSize, &logproto.StreamMetadataWithSize{
StreamHash: stream.Hash,
})
}
req := logproto.ExceedsLimitsRequest{
Tenant: tenantID,
Streams: streamsWithSize,
}

var key uint32
var descs [5]ring.InstanceDesc
rs, err := d.limitsFrontendRing.Get(key, limits_frontend_client.LimitsRead, descs[0:], nil, nil)
if err != nil {
return nil, err
}

if len(rs.Instances) == 0 {

}

instance := rs.Instances[0]
c, err := d.limitsFrontends.GetClientFor(instance.Addr)
if err != nil {
return nil, err
}
client := c.(logproto.IngestLimitsFrontendClient)
return client.ExceedsLimits(ctx, &req)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {
c, err := d.ingesterClients.GetClientFor(ingester.Addr)
Expand Down
25 changes: 14 additions & 11 deletions pkg/limits/frontend/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/grafana/loki/v3/pkg/util/server"
)

var (
LimitsRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
)

var (
frontendClients = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "loki_ingest_limits_frontend_clients",
Expand All @@ -41,9 +45,9 @@ type Config struct {
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".limits-frontend-client", f)
cfg.PoolConfig.RegisterFlagsWithPrefix(prefix, f)
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend-client", f)
cfg.PoolConfig.RegisterFlagsWithPrefix("ingest-limits-frontend-client", f)
}

// PoolConfig contains the config for a pool of ingest-limits-frontend clients.
Expand All @@ -59,16 +63,15 @@ func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 1*time.Second, "Timeout for the health check.")
}

// IngestLimitsFrontendClient is a gRPC client for the ingest-limits-frontend.
type IngestLimitsFrontendClient struct {
// Client is a gRPC client for the ingest-limits-frontend.
type Client struct {
logproto.IngestLimitsFrontendClient
grpc_health_v1.HealthClient
io.Closer
}

// NewIngestLimitsFrontendClient returns a new IngestLimitsFrontendClient for the
// specified ingest-limits-frontend.
func NewIngestLimitsFrontendClient(cfg Config, addr string) (*IngestLimitsFrontendClient, error) {
// New returns a new Client for the specified ingest-limits-frontend.
func New(cfg Config, addr string) (*Client, error) {
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
}
Expand All @@ -82,7 +85,7 @@ func NewIngestLimitsFrontendClient(cfg Config, addr string) (*IngestLimitsFronte
if err != nil {
return nil, err
}
return &IngestLimitsFrontendClient{
return &Client{
IngestLimitsFrontendClient: logproto.NewIngestLimitsFrontendClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Closer: conn,
Expand Down Expand Up @@ -111,8 +114,8 @@ func getGRPCInterceptors(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.Str
return unaryInterceptors, streamInterceptors
}

// NewIngestLimitsFrontendClientPool returns a new pool of IngestLimitsFrontendClients.
func NewIngestLimitsFrontendClientPool(
// NewPool returns a new pool of clients for the ingest-limits-frontend.
func NewPool(
name string,
cfg PoolConfig,
ring ring.ReadRing,
Expand Down
11 changes: 5 additions & 6 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -47,15 +46,15 @@ type Frontend struct {
}

// New returns a new Frontend.
func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) {
func New(cfg Config, ringName string, limitsRing ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) {
var servs []services.Service

factory := ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) {
return NewIngestLimitsBackendClient(cfg.ClientConfig, addr)
})

pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, readRing, factory, logger)
limitsSrv := NewRingIngestLimitsService(readRing, pool, limits, logger, reg)
pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, limitsRing, factory, logger)
limitsSrv := NewRingIngestLimitsService(limitsRing, pool, limits, logger, reg)

f := &Frontend{
cfg: cfg,
Expand All @@ -76,7 +75,7 @@ func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, log
servs = append(servs, pool)
mgr, err := services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "services manager")
return nil, err
}

f.subservices = mgr
Expand Down Expand Up @@ -121,7 +120,7 @@ func (f *Frontend) running(ctx context.Context) error {
case <-ctx.Done():
return nil
case err := <-f.subservicesWatcher.Chan():
return errors.Wrap(err, "ingest limits frontend subservice failed")
return fmt.Errorf("ingest limits frontend subservice failed: %w", err)
}
}

Expand Down
65 changes: 34 additions & 31 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/limits"
limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/loki/common"
"github.com/grafana/loki/v3/pkg/lokifrontend"
Expand Down Expand Up @@ -83,37 +84,38 @@ type Config struct {
HTTPPrefix string `yaml:"http_prefix" doc:"hidden"`
BallastBytes int `yaml:"ballast_bytes"`

Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty" doc:"hidden"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
RulerStorage rulestore.Config `yaml:"ruler_storage,omitempty" doc:"hidden"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"`
BlockScheduler blockscheduler.Config `yaml:"block_scheduler,omitempty"`
Pattern pattern.Config `yaml:"pattern_ingester,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"`
BloomGateway bloomgateway.Config `yaml:"bloom_gateway,omitempty" category:"experimental"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
CompactorHTTPClient compactorclient.HTTPConfig `yaml:"compactor_client,omitempty" doc:"hidden"`
CompactorGRPCClient compactorclient.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
KafkaConfig kafka.Config `yaml:"kafka_config,omitempty" category:"experimental"`
DataObjExplorer explorer.Config `yaml:"dataobj_explorer,omitempty" category:"experimental"`
IngestLimits limits.Config `yaml:"ingest_limits,omitempty" category:"experimental"`
IngestLimitsFrontend limits_frontend.Config `yaml:"ingest_limits_frontend,omitempty" category:"experimental"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty" doc:"hidden"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
RulerStorage rulestore.Config `yaml:"ruler_storage,omitempty" doc:"hidden"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"`
BlockScheduler blockscheduler.Config `yaml:"block_scheduler,omitempty"`
Pattern pattern.Config `yaml:"pattern_ingester,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"`
BloomGateway bloomgateway.Config `yaml:"bloom_gateway,omitempty" category:"experimental"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
CompactorHTTPClient compactorclient.HTTPConfig `yaml:"compactor_client,omitempty" doc:"hidden"`
CompactorGRPCClient compactorclient.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
KafkaConfig kafka.Config `yaml:"kafka_config,omitempty" category:"experimental"`
DataObjExplorer explorer.Config `yaml:"dataobj_explorer,omitempty" category:"experimental"`
IngestLimits limits.Config `yaml:"ingest_limits,omitempty" category:"experimental"`
IngestLimitsFrontend limits_frontend.Config `yaml:"ingest_limits_frontend,omitempty" category:"experimental"`
IngestLimitsFrontendClient limits_frontend_client.Config `yaml:"ingest_limits_frontend_client,omitempty" category:"experimental"`

RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
OperationalConfig runtime.Config `yaml:"operational_config,omitempty"`
Expand Down Expand Up @@ -197,6 +199,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.BlockScheduler.RegisterFlags(f)
c.IngestLimits.RegisterFlags(f)
c.IngestLimitsFrontend.RegisterFlags(f)
c.IngestLimitsFrontendClient.RegisterFlags(f)
c.DataObjExplorer.RegisterFlags(f)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ func (t *Loki) initDistributor() (services.Service, error) {
t.Cfg.MetricsNamespace,
t.Tee,
t.UsageTracker,
t.Cfg.IngestLimitsFrontendClient,
t.ingestLimitsFrontendRing,
logger,
)
if err != nil {
Expand Down

0 comments on commit 0f8d83c

Please sign in to comment.