From 805cf6db4b95a233df541abe4675e7a5434b63fb Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 25 Feb 2025 15:57:21 +0100 Subject: [PATCH] feat(policies): Support global policy --- docs/sources/shared/configuration.md | 12 +- pkg/distributor/distributor.go | 6 +- pkg/distributor/distributor_test.go | 15 ++- pkg/distributor/validator.go | 22 ++-- pkg/distributor/validator_test.go | 158 +++++++++++++++++++++++++++ pkg/validation/ingestion_policies.go | 4 + pkg/validation/limits.go | 21 ++-- 7 files changed, 207 insertions(+), 31 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index f46590e918683..a1005308eca22 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3650,9 +3650,10 @@ otlp_config: # drop them altogether [log_attributes: ] -# Block ingestion for policy until the configured date. The time should be in -# RFC3339 format. The policy is based on the policy_stream_mapping -# configuration. +# Block ingestion for policy until the configured date. The policy '*' is the +# global policy, which is applied to all streams and can be overridden by other +# policies. The time should be in RFC3339 format. The policy is based on the +# policy_stream_mapping configuration. [block_ingestion_policy_until: ] # Block ingestion until the configured date. The time should be in RFC3339 @@ -3672,7 +3673,8 @@ otlp_config: # CLI flag: -validation.enforced-labels [enforced_labels: | default = []] -# Map of policies to enforced labels. Example: +# Map of policies to enforced labels. The policy '*' is the global policy, which +# is applied to all streams and can be extended by other policies. Example: # policy_enforced_labels: # policy1: # - label1 @@ -3680,6 +3682,8 @@ otlp_config: # policy2: # - label3 # - label4 +# '*': +# - label5 [policy_enforced_labels: ] # Map of policies to stream selectors with a priority. Experimental. Example: diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 52ac75d1cd296..bd8bae6217196 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -776,16 +776,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // It also returns the first label that is missing if any (for the case of multiple labels missing). func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string, policy string) (bool, []string) { perPolicyEnforcedLabels := d.validator.Limits.PolicyEnforcedLabels(tenantID, policy) - globalEnforcedLabels := d.validator.Limits.EnforcedLabels(tenantID) + tenantEnforcedLabels := d.validator.Limits.EnforcedLabels(tenantID) - requiredLbs := append(globalEnforcedLabels, perPolicyEnforcedLabels...) + requiredLbs := append(tenantEnforcedLabels, perPolicyEnforcedLabels...) if len(requiredLbs) == 0 { // no enforced labels configured. return false, []string{} } // Use a map to deduplicate the required labels. Duplicates may happen if the same label is configured - // in both global and per-policy enforced labels. + // in both the per-tenant and per-policy enforced labels. seen := make(map[string]struct{}) missingLbs := []string{} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 01b22b8423ec1..c39c0a645034b 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -431,10 +431,11 @@ func Test_MissingEnforcedLabels(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) - limits.EnforcedLabels = []string{"app", "env"} + limits.EnforcedLabels = []string{"app"} limits.PolicyEnforcedLabels = map[string][]string{ - "policy1": {"cluster", "namespace"}, - "policy2": {"namespace"}, + "policy1": {"cluster", "namespace"}, + "policy2": {"namespace"}, + validation.GlobalPolicy: {"env"}, } distributors, _ := prepare(t, 1, 5, limits, nil) @@ -446,12 +447,18 @@ func Test_MissingEnforcedLabels(t *testing.T) { assert.False(t, missing) assert.Empty(t, missingLabels) - // request missing the `app` label from global enforced labels and `cluster` label from policy enforced labels. + // request missing the `app` label from per-tenant enforced labels and `cluster` label from policy enforced labels. lbs = labels.FromMap(map[string]string{"env": "prod", "namespace": "ns1"}) missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy1") assert.True(t, missing) assert.EqualValues(t, []string{"app", "cluster"}, missingLabels) + // request missing the `env` label from global policy enforced labels and `cluster` label from policy1 enforced labels. + lbs = labels.FromMap(map[string]string{"app": "foo", "namespace": "ns1"}) + missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy1") + assert.True(t, missing) + assert.EqualValues(t, []string{"env", "cluster"}, missingLabels) + // request missing all required labels. lbs = labels.FromMap(map[string]string{"pod": "distributor-abc"}) missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy2") diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 083473a5a9b4c..acff95641a546 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -200,9 +200,11 @@ func (v Validator) reportDiscardedDataWithTracker(ctx context.Context, reason st } // ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. +// priority is: Per-tenant block > named policy block > Global policy block func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) { - if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block { - return block, code, reason, err + if block, until, code := v.shouldBlockTenant(ctx, now); block { + err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, until.Format(time.RFC3339), code) + return true, code, validation.BlockedIngestion, err } if block, until, code := v.shouldBlockPolicy(ctx, policy, now); block { @@ -213,27 +215,21 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, po return false, 0, "", nil } -func (v Validator) shouldBlockGlobalPolicy(ctx validationContext, now time.Time) (bool, int, string, error) { +func (v Validator) shouldBlockTenant(ctx validationContext, now time.Time) (bool, time.Time, int) { if ctx.blockIngestionUntil.IsZero() { - return false, 0, "", nil + return false, time.Time{}, 0 } if now.Before(ctx.blockIngestionUntil) { - err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode) - return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err + return true, ctx.blockIngestionUntil, ctx.blockIngestionStatusCode } - return false, 0, "", nil + return false, time.Time{}, 0 } // ShouldBlockPolicy checks if ingestion should be blocked for the given policy. // It returns true if ingestion should be blocked, along with the block until time and status code. -func (v *Validator) shouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { - // No policy provided, don't block - if policy == "" { - return false, time.Time{}, 0 - } - +func (v Validator) shouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { // Check if this policy is blocked in tenant configs blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy) if blockUntil.IsZero() { diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 73a9d1aa0cc38..360dcecf80178 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -238,6 +238,164 @@ func TestValidator_ValidateLabels(t *testing.T) { } } +func TestShouldBlockIngestion(t *testing.T) { + for _, tc := range []struct { + name string + policy string + time time.Time + overrides validation.TenantLimits + + expectBlock bool + expectStatusCode int + expectReason string + }{ + { + name: "no block configured", + time: testTime, + overrides: fakeLimits{ + &validation.Limits{}, + }, + }, + { + name: "all configured tenant blocked priority", + time: testTime, + policy: "policy1", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionUntil: flagext.Time(testTime.Add(time.Hour)), + BlockIngestionPolicyUntil: map[string]flagext.Time{ + validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)), + "policy1": flagext.Time(testTime.Add(time.Hour)), + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: true, + expectStatusCode: 1234, + expectReason: validation.BlockedIngestion, + }, + { + name: "all configured named policy priority", + time: testTime, + policy: "policy1", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionUntil: flagext.Time(testTime.Add(-time.Hour)), // Not active anymore + BlockIngestionPolicyUntil: map[string]flagext.Time{ + validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)), + "policy1": flagext.Time(testTime.Add(time.Hour)), + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: true, + expectStatusCode: 1234, + expectReason: validation.BlockedIngestionPolicy, + }, + { + name: "all configured global policy priority", + time: testTime, + policy: "policy1", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionUntil: flagext.Time(testTime.Add(-time.Hour)), // Not active anymore + BlockIngestionPolicyUntil: map[string]flagext.Time{ + validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)), + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: true, + expectStatusCode: 1234, + expectReason: validation.BlockedIngestionPolicy, + }, + { + name: "named policy overrides global policy", + time: testTime, + policy: "policy1", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionPolicyUntil: map[string]flagext.Time{ + validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)), + "policy1": flagext.Time(testTime.Add(-time.Hour)), // Not blocked overriding block from global quota + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: false, + }, + { + name: "no policy maps to global policy", + time: testTime, + policy: "", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionPolicyUntil: map[string]flagext.Time{ + validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)), + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: true, + expectStatusCode: 1234, + expectReason: validation.BlockedIngestionPolicy, + }, + { + name: "unknown policy maps to global policy", + time: testTime, + policy: "notExists", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionPolicyUntil: map[string]flagext.Time{ + validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)), + "policy1": flagext.Time(testTime.Add(2 * time.Hour)), + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: true, + expectStatusCode: 1234, + expectReason: validation.BlockedIngestionPolicy, + }, + { + name: "no global policy", + time: testTime, + policy: "notExists", + overrides: fakeLimits{ + &validation.Limits{ + BlockIngestionPolicyUntil: map[string]flagext.Time{ + "policy1": flagext.Time(testTime.Add(2 * time.Hour)), + }, + BlockIngestionStatusCode: 1234, + }, + }, + expectBlock: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + l := &validation.Limits{} + flagext.DefaultValues(l) + + o, err := validation.NewOverrides(*l, tc.overrides) + assert.NoError(t, err) + v, err := NewValidator(o, nil) + assert.NoError(t, err) + + block, statusCode, reason, err := v.ShouldBlockIngestion(v.getValidationContextForTime(testTime, "fake"), testTime, tc.policy) + assert.Equal(t, tc.expectBlock, block) + if tc.expectBlock { + assert.Equal(t, tc.expectStatusCode, statusCode) + assert.Equal(t, tc.expectReason, reason) + assert.Error(t, err) + t.Logf("block: %v, statusCode: %d, reason: %s, err: %v", block, statusCode, reason, err) + } else { + assert.NoError(t, err) + } + }) + } + +} + func mustParseLabels(s string) labels.Labels { ls, err := syntax.ParseLabels(s) if err != nil { diff --git a/pkg/validation/ingestion_policies.go b/pkg/validation/ingestion_policies.go index 2a614afaaa9a8..d5734f6324e5c 100644 --- a/pkg/validation/ingestion_policies.go +++ b/pkg/validation/ingestion_policies.go @@ -9,6 +9,10 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" ) +const ( + GlobalPolicy = "*" +) + type PriorityStream struct { Priority int `yaml:"priority" json:"priority" doc:"description=The larger the value, the higher the priority."` Selector string `yaml:"selector" json:"selector" doc:"description=Stream selector expression."` diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 726d6a208bd3c..fefbd4521daea 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -227,11 +227,11 @@ type Limits struct { OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"` GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"` - BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."` + BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The policy '*' is the global policy, which is applied to all streams and can be overridden by other policies. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."` BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until" category:"experimental"` BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"` - PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"` + PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. The policy '*' is the global policy, which is applied to all streams and can be extended by other policies. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4\n '*':\n - label5"` PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"` IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` @@ -1117,15 +1117,19 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { return o.getOverridesForUser(userID).BlockIngestionStatusCode } +// BlockIngestionPolicyUntil returns the time until the ingestion policy is blocked for a given user. +// Order of priority is: global policy block > Per-tenant block func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time { limits := o.getOverridesForUser(userID) - if limits == nil || limits.BlockIngestionPolicyUntil == nil { - return time.Time{} // Zero time means no blocking + + if forPolicy, ok := limits.BlockIngestionPolicyUntil[policy]; ok { + return time.Time(forPolicy) } - if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok { - return time.Time(blockUntil) + if forPolicy, ok := limits.BlockIngestionPolicyUntil[GlobalPolicy]; ok { + return time.Time(forPolicy) } + return time.Time{} // Zero time means no blocking } @@ -1133,8 +1137,11 @@ func (o *Overrides) EnforcedLabels(userID string) []string { return o.getOverridesForUser(userID).EnforcedLabels } +// PolicyEnforcedLabels returns the labels enforced by the policy for a given user. +// The output is the union of the global and policy specific labels. func (o *Overrides) PolicyEnforcedLabels(userID string, policy string) []string { - return o.getOverridesForUser(userID).PolicyEnforcedLabels[policy] + limits := o.getOverridesForUser(userID) + return append(limits.PolicyEnforcedLabels[GlobalPolicy], limits.PolicyEnforcedLabels[policy]...) } func (o *Overrides) PoliciesStreamMapping(userID string) PolicyStreamMapping {