Skip to content

Commit

Permalink
feat(policies): Support global policy
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Feb 25, 2025
1 parent 290c14a commit 805cf6d
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 31 deletions.
12 changes: 8 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3650,9 +3650,10 @@ otlp_config:
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion for policy until the configured date. The time should be in
# RFC3339 format. The policy is based on the policy_stream_mapping
# configuration.
# Block ingestion for policy until the configured date. The policy '*' is the
# global policy, which is applied to all streams and can be overridden by other
# policies. The time should be in RFC3339 format. The policy is based on the
# policy_stream_mapping configuration.
[block_ingestion_policy_until: <map of string to Time>]

# Block ingestion until the configured date. The time should be in RFC3339
Expand All @@ -3672,14 +3673,17 @@ otlp_config:
# CLI flag: -validation.enforced-labels
[enforced_labels: <list of strings> | default = []]

# Map of policies to enforced labels. Example:
# Map of policies to enforced labels. The policy '*' is the global policy, which
# is applied to all streams and can be extended by other policies. Example:
# policy_enforced_labels:
# policy1:
# - label1
# - label2
# policy2:
# - label3
# - label4
# '*':
# - label5
[policy_enforced_labels: <map of string to list of strings>]

# Map of policies to stream selectors with a priority. Experimental. Example:
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,16 +776,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// It also returns the first label that is missing if any (for the case of multiple labels missing).
func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string, policy string) (bool, []string) {
perPolicyEnforcedLabels := d.validator.Limits.PolicyEnforcedLabels(tenantID, policy)
globalEnforcedLabels := d.validator.Limits.EnforcedLabels(tenantID)
tenantEnforcedLabels := d.validator.Limits.EnforcedLabels(tenantID)

requiredLbs := append(globalEnforcedLabels, perPolicyEnforcedLabels...)
requiredLbs := append(tenantEnforcedLabels, perPolicyEnforcedLabels...)
if len(requiredLbs) == 0 {
// no enforced labels configured.
return false, []string{}
}

// Use a map to deduplicate the required labels. Duplicates may happen if the same label is configured
// in both global and per-policy enforced labels.
// in both the per-tenant and per-policy enforced labels.
seen := make(map[string]struct{})
missingLbs := []string{}

Expand Down
15 changes: 11 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,11 @@ func Test_MissingEnforcedLabels(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.EnforcedLabels = []string{"app", "env"}
limits.EnforcedLabels = []string{"app"}
limits.PolicyEnforcedLabels = map[string][]string{
"policy1": {"cluster", "namespace"},
"policy2": {"namespace"},
"policy1": {"cluster", "namespace"},
"policy2": {"namespace"},
validation.GlobalPolicy: {"env"},
}

distributors, _ := prepare(t, 1, 5, limits, nil)
Expand All @@ -446,12 +447,18 @@ func Test_MissingEnforcedLabels(t *testing.T) {
assert.False(t, missing)
assert.Empty(t, missingLabels)

// request missing the `app` label from global enforced labels and `cluster` label from policy enforced labels.
// request missing the `app` label from per-tenant enforced labels and `cluster` label from policy enforced labels.
lbs = labels.FromMap(map[string]string{"env": "prod", "namespace": "ns1"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy1")
assert.True(t, missing)
assert.EqualValues(t, []string{"app", "cluster"}, missingLabels)

// request missing the `env` label from global policy enforced labels and `cluster` label from policy1 enforced labels.
lbs = labels.FromMap(map[string]string{"app": "foo", "namespace": "ns1"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy1")
assert.True(t, missing)
assert.EqualValues(t, []string{"env", "cluster"}, missingLabels)

// request missing all required labels.
lbs = labels.FromMap(map[string]string{"pod": "distributor-abc"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy2")
Expand Down
22 changes: 9 additions & 13 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ func (v Validator) reportDiscardedDataWithTracker(ctx context.Context, reason st
}

// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.
// priority is: Per-tenant block > named policy block > Global policy block
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) {
if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block {
return block, code, reason, err
if block, until, code := v.shouldBlockTenant(ctx, now); block {
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, until.Format(time.RFC3339), code)
return true, code, validation.BlockedIngestion, err
}

if block, until, code := v.shouldBlockPolicy(ctx, policy, now); block {
Expand All @@ -213,27 +215,21 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, po
return false, 0, "", nil
}

func (v Validator) shouldBlockGlobalPolicy(ctx validationContext, now time.Time) (bool, int, string, error) {
func (v Validator) shouldBlockTenant(ctx validationContext, now time.Time) (bool, time.Time, int) {
if ctx.blockIngestionUntil.IsZero() {
return false, 0, "", nil
return false, time.Time{}, 0
}

if now.Before(ctx.blockIngestionUntil) {
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode)
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err
return true, ctx.blockIngestionUntil, ctx.blockIngestionStatusCode
}

return false, 0, "", nil
return false, time.Time{}, 0
}

// ShouldBlockPolicy checks if ingestion should be blocked for the given policy.
// It returns true if ingestion should be blocked, along with the block until time and status code.
func (v *Validator) shouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) {
// No policy provided, don't block
if policy == "" {
return false, time.Time{}, 0
}

func (v Validator) shouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) {
// Check if this policy is blocked in tenant configs
blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy)
if blockUntil.IsZero() {
Expand Down
158 changes: 158 additions & 0 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,164 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
}

func TestShouldBlockIngestion(t *testing.T) {
for _, tc := range []struct {
name string
policy string
time time.Time
overrides validation.TenantLimits

expectBlock bool
expectStatusCode int
expectReason string
}{
{
name: "no block configured",
time: testTime,
overrides: fakeLimits{
&validation.Limits{},
},
},
{
name: "all configured tenant blocked priority",
time: testTime,
policy: "policy1",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionUntil: flagext.Time(testTime.Add(time.Hour)),
BlockIngestionPolicyUntil: map[string]flagext.Time{
validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)),
"policy1": flagext.Time(testTime.Add(time.Hour)),
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: true,
expectStatusCode: 1234,
expectReason: validation.BlockedIngestion,
},
{
name: "all configured named policy priority",
time: testTime,
policy: "policy1",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionUntil: flagext.Time(testTime.Add(-time.Hour)), // Not active anymore
BlockIngestionPolicyUntil: map[string]flagext.Time{
validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)),
"policy1": flagext.Time(testTime.Add(time.Hour)),
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: true,
expectStatusCode: 1234,
expectReason: validation.BlockedIngestionPolicy,
},
{
name: "all configured global policy priority",
time: testTime,
policy: "policy1",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionUntil: flagext.Time(testTime.Add(-time.Hour)), // Not active anymore
BlockIngestionPolicyUntil: map[string]flagext.Time{
validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)),
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: true,
expectStatusCode: 1234,
expectReason: validation.BlockedIngestionPolicy,
},
{
name: "named policy overrides global policy",
time: testTime,
policy: "policy1",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionPolicyUntil: map[string]flagext.Time{
validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)),
"policy1": flagext.Time(testTime.Add(-time.Hour)), // Not blocked overriding block from global quota
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: false,
},
{
name: "no policy maps to global policy",
time: testTime,
policy: "",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionPolicyUntil: map[string]flagext.Time{
validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)),
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: true,
expectStatusCode: 1234,
expectReason: validation.BlockedIngestionPolicy,
},
{
name: "unknown policy maps to global policy",
time: testTime,
policy: "notExists",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionPolicyUntil: map[string]flagext.Time{
validation.GlobalPolicy: flagext.Time(testTime.Add(time.Hour)),
"policy1": flagext.Time(testTime.Add(2 * time.Hour)),
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: true,
expectStatusCode: 1234,
expectReason: validation.BlockedIngestionPolicy,
},
{
name: "no global policy",
time: testTime,
policy: "notExists",
overrides: fakeLimits{
&validation.Limits{
BlockIngestionPolicyUntil: map[string]flagext.Time{
"policy1": flagext.Time(testTime.Add(2 * time.Hour)),
},
BlockIngestionStatusCode: 1234,
},
},
expectBlock: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
l := &validation.Limits{}
flagext.DefaultValues(l)

o, err := validation.NewOverrides(*l, tc.overrides)
assert.NoError(t, err)
v, err := NewValidator(o, nil)
assert.NoError(t, err)

block, statusCode, reason, err := v.ShouldBlockIngestion(v.getValidationContextForTime(testTime, "fake"), testTime, tc.policy)
assert.Equal(t, tc.expectBlock, block)
if tc.expectBlock {
assert.Equal(t, tc.expectStatusCode, statusCode)
assert.Equal(t, tc.expectReason, reason)
assert.Error(t, err)
t.Logf("block: %v, statusCode: %d, reason: %s, err: %v", block, statusCode, reason, err)
} else {
assert.NoError(t, err)
}
})
}

}

func mustParseLabels(s string) labels.Labels {
ls, err := syntax.ParseLabels(s)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/validation/ingestion_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
)

const (
GlobalPolicy = "*"
)

type PriorityStream struct {
Priority int `yaml:"priority" json:"priority" doc:"description=The larger the value, the higher the priority."`
Selector string `yaml:"selector" json:"selector" doc:"description=Stream selector expression."`
Expand Down
21 changes: 14 additions & 7 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ type Limits struct {
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`

BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."`
BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The policy '*' is the global policy, which is applied to all streams and can be overridden by other policies. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."`
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until" category:"experimental"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. The policy '*' is the global policy, which is applied to all streams and can be extended by other policies. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4\n '*':\n - label5"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`
Expand Down Expand Up @@ -1117,24 +1117,31 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}

// BlockIngestionPolicyUntil returns the time until the ingestion policy is blocked for a given user.
// Order of priority is: global policy block > Per-tenant block
func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time {
limits := o.getOverridesForUser(userID)
if limits == nil || limits.BlockIngestionPolicyUntil == nil {
return time.Time{} // Zero time means no blocking

if forPolicy, ok := limits.BlockIngestionPolicyUntil[policy]; ok {
return time.Time(forPolicy)
}

if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok {
return time.Time(blockUntil)
if forPolicy, ok := limits.BlockIngestionPolicyUntil[GlobalPolicy]; ok {
return time.Time(forPolicy)
}

return time.Time{} // Zero time means no blocking
}

func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}

// PolicyEnforcedLabels returns the labels enforced by the policy for a given user.
// The output is the union of the global and policy specific labels.
func (o *Overrides) PolicyEnforcedLabels(userID string, policy string) []string {
return o.getOverridesForUser(userID).PolicyEnforcedLabels[policy]
limits := o.getOverridesForUser(userID)
return append(limits.PolicyEnforcedLabels[GlobalPolicy], limits.PolicyEnforcedLabels[policy]...)
}

func (o *Overrides) PoliciesStreamMapping(userID string) PolicyStreamMapping {
Expand Down

0 comments on commit 805cf6d

Please sign in to comment.