Skip to content

Commit

Permalink
feat(policies): Add PoliciesStreamMapping to loghttp limits interface (
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Feb 5, 2025
1 parent fe315ef commit c2e1e88
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 11 deletions.
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil, false)
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil, nil, false)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
7 changes: 7 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type Distributor struct {
streamShardCount prometheus.Counter
tenantPushSanitizedStructuredMetadata *prometheus.CounterVec

policyResolver push.PolicyResolver
usageTracker push.UsageTracker
ingesterTasks chan pushIngesterTask
ingesterTaskWg sync.WaitGroup
Expand Down Expand Up @@ -223,6 +224,11 @@ func New(
return client.New(internalCfg, addr)
}

policyResolver := push.PolicyResolver(func(userID string, lbs labels.Labels) string {
mappings := overrides.PoliciesStreamMapping(userID)
return mappings.PolicyFor(lbs)
})

validator, err := NewValidator(overrides, usageTracker)
if err != nil {
return nil, err
Expand Down Expand Up @@ -280,6 +286,7 @@ func New(
healthyInstancesCount: atomic.NewUint32(0),
rateLimitStrat: rateLimitStrat,
tee: tee,
policyResolver: policyResolver,
usageTracker: usageTracker,
ingesterTasks: make(chan pushIngesterTask),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
}

logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, d.policyResolver, logPushRequestStreams)
if err != nil {
if !errors.Is(err, push.ErrAllLogsFiltered) {
if d.tenantConfigs.LogPushRequest(tenantID) {
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *fakeParser) parseRequest(
_ push.TenantsRetention,
_ push.Limits,
_ push.UsageTracker,
_ push.PolicyResolver,
_ bool,
_ log.Logger,
) (*logproto.PushRequest, *push.Stats, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newPushStats() *Stats {
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, _ PolicyResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
}

type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, policyResolver PolicyResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger)
PolicyResolver func(userID string, lbs labels.Labels) string
)

type Stats struct {
Expand All @@ -113,8 +114,8 @@ type Stats struct {
IsAggregatedMetric bool
}

func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, policyResolver PolicyResolver, logPushRequestStreams bool) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, policyResolver, logPushRequestStreams, logger)
if err != nil && !errors.Is(err, ErrAllLogsFiltered) {
return nil, err
}
Expand Down Expand Up @@ -171,7 +172,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
return req, err
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, _ PolicyResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
Expand Down
9 changes: 5 additions & 4 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func TestParseRequest(t *testing.T) {
&fakeLimits{enabled: test.enableServiceDiscovery},
ParseLokiRequest,
tracker,
nil,
false,
)

Expand Down Expand Up @@ -364,7 +365,7 @@ func Test_ServiceDetection(t *testing.T) {
request := createRequest("/loki/api/v1/push", strings.NewReader(body))

limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker, false)
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker, nil, false)

require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
Expand All @@ -375,7 +376,7 @@ func Test_ServiceDetection(t *testing.T) {
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{enabled: true}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, nil, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})
Expand All @@ -389,7 +390,7 @@ func Test_ServiceDetection(t *testing.T) {
labels: []string{"special"},
indexAttributes: []string{"special"},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, nil, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})
Expand All @@ -403,7 +404,7 @@ func Test_ServiceDetection(t *testing.T) {
labels: []string{"special"},
indexAttributes: []string{},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, nil, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
})
Expand Down

0 comments on commit c2e1e88

Please sign in to comment.