Skip to content

Commit

Permalink
Merge branch 'main' of github.com:QuantumEnigmaa/loki into helm-add-d…
Browse files Browse the repository at this point in the history
…edicated-serviceaccount-ruler
  • Loading branch information
QuantumEnigmaa committed Feb 18, 2025
2 parents 75a7a95 + 5cb26f6 commit a8eb5a6
Show file tree
Hide file tree
Showing 33 changed files with 1,310 additions and 1,456 deletions.
2 changes: 1 addition & 1 deletion cmd/dataobj-inspect/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.1

replace github.com/grafana/loki/v3 => ../..

require github.com/grafana/loki/v3 v3.4.1
require github.com/grafana/loki/v3 v3.4.2

require (
github.com/axiomhq/hyperloglog v0.2.3 // indirect
Expand Down
2 changes: 1 addition & 1 deletion cmd/segment-inspect/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.5

require (
github.com/dustin/go-humanize v1.0.1
github.com/grafana/loki/v3 v3.4.1
github.com/grafana/loki/v3 v3.4.2
)

require (
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3641,6 +3641,11 @@ otlp_config:
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion for policy until the configured date. The time should be in
# RFC3339 format. The policy is based on the policy_stream_mapping
# configuration.
[block_ingestion_policy_until: <map of string to Time>]

# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/fatih/color v1.18.0
github.com/felixge/fgprof v0.9.5
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
github.com/fsouza/fake-gcs-server v1.52.1
github.com/fsouza/fake-gcs-server v1.52.2
github.com/go-kit/log v0.2.1
github.com/go-logfmt/logfmt v0.6.0
github.com/gocql/gocql v1.7.0
Expand Down Expand Up @@ -69,7 +69,7 @@ require (
github.com/klauspost/pgzip v1.2.6
github.com/leodido/go-syslog/v4 v4.2.0
github.com/mattn/go-ieproxy v0.0.12
github.com/minio/minio-go/v7 v7.0.85
github.com/minio/minio-go/v7 v7.0.86
github.com/mitchellh/go-wordwrap v1.0.1
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/modern-go/reflect2 v1.0.2
Expand Down Expand Up @@ -173,7 +173,7 @@ require (
github.com/go-ini/ini v1.67.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-redsync/redsync/v4 v4.13.0 // indirect
github.com/goccy/go-json v0.10.4 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/gorilla/handlers v1.5.2 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/imdario/mergo v0.3.16 // indirect
Expand All @@ -182,6 +182,7 @@ require (
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mdlayher/socket v0.5.1 // indirect
github.com/mdlayher/vsock v1.2.1 // indirect
github.com/minio/crc64nvme v1.0.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/ncw/swift v1.0.53 // indirect
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fsouza/fake-gcs-server v1.52.1 h1:Hx3G2ZpyBzHGmW7cHURWWoTm6jM3M5fcWMIAHBYlJyc=
github.com/fsouza/fake-gcs-server v1.52.1/go.mod h1:Paxf25VmSNMN52L+2/cVulF5fkLUA0YJIYjTGJiwf3c=
github.com/fsouza/fake-gcs-server v1.52.2 h1:j6ne83nqHrlX5EEor7WWVIKdBsztGtwJ1J2mL+k+iio=
github.com/fsouza/fake-gcs-server v1.52.2/go.mod h1:47HKyIkz6oLTes1R8vEaHLwXfzYsGfmDUk1ViHHAUsA=
github.com/fullstorydev/emulators/storage v0.0.0-20240401123056-edc69752f474 h1:TufioMBjkJ6/Oqmlye/ReuxHFS35HyLmypj/BNy/8GY=
github.com/fullstorydev/emulators/storage v0.0.0-20240401123056-edc69752f474/go.mod h1:PQwxF4UU8wuL+srGxr3BOhIW5zXqgucwVlO/nPZLsxw=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
Expand Down Expand Up @@ -481,8 +481,8 @@ github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM=
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
Expand Down Expand Up @@ -850,10 +850,12 @@ github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKju
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/miekg/dns v1.1.62 h1:cN8OuEF1/x5Rq6Np+h1epln8OiyPWV+lROx9LxcGgIQ=
github.com/miekg/dns v1.1.62/go.mod h1:mvDlcItzm+br7MToIKqkglaGhlFMHJ9DTNNWONWXbNQ=
github.com/minio/crc64nvme v1.0.0 h1:MeLcBkCTD4pAoU7TciAfwsfxgkhM2u5hCe48hSEVFr0=
github.com/minio/crc64nvme v1.0.0/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.85 h1:9psTLS/NTvC3MWoyjhjXpwcKoNbkongaCSF3PNpSuXo=
github.com/minio/minio-go/v7 v7.0.85/go.mod h1:57YXpvc5l3rjPdhqNrDsvVlY0qPI6UTk1bflAe+9doY=
github.com/minio/minio-go/v7 v7.0.86 h1:DcgQ0AUjLJzRH6y/HrxiZ8CXarA70PAIufXHodP4s+k=
github.com/minio/minio-go/v7 v7.0.86/go.mod h1:VbfO4hYwUu3Of9WqGLBZ8vl3Hxnxo4ngxK4hzQDf4x4=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
Expand Down
34 changes: 17 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
d.validator.reportDiscardedData(validation.MissingEnforcedLabels, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))
continue
}

if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block {
d.writeFailuresManager.Log(tenantID, err)
discardedBytes := util.EntriesTotalSize(stream.Entries)
d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if statusCode == http.StatusOK {
// do not add error to validationErrors.
continue
}

validationErrors.Add(err)
continue
}

Expand Down Expand Up @@ -639,21 +654,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}

return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

Expand Down
125 changes: 121 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
Expand Down Expand Up @@ -51,6 +50,8 @@ import (
loki_net "github.com/grafana/loki/v3/pkg/util/net"
"github.com/grafana/loki/v3/pkg/util/test"
"github.com/grafana/loki/v3/pkg/validation"

"github.com/grafana/loki/pkg/push"
)

const (
Expand Down Expand Up @@ -441,6 +442,7 @@ func Test_MissingEnforcedLabels(t *testing.T) {
// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1")

assert.False(t, missing)
assert.Empty(t, missingLabels)

Expand All @@ -462,25 +464,42 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
flagext.DefaultValues(limits)

// makeWriteRequest only contains a `{foo="bar"}` label.
req := makeWriteRequest(100, 100)
req := makeWriteRequest(100, 100) // 100 lines of 100 bytes each
limits.EnforcedLabels = []string{"app", "env"}
distributors, _ := prepare(t, 1, 3, limits, nil)

// reset metrics in case they were set from a previous test.
validation.DiscardedBytes.Reset()
validation.DiscardedSamples.Reset()

// enforced labels configured, but all labels are missing.
_, err := distributors[0].Push(ctx, req)
require.Error(t, err)
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
require.EqualError(t, err, expectedErr.Error())

// Verify metrics for discarded samples due to missing enforced labels
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) // 100 lines * 100 bytes
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) // 100 lines

// enforced labels, but all labels are present.
req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// Metrics should not have increased since this push was successful
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))

// no enforced labels, so no errors.
limits.EnforcedLabels = []string{}
distributors, _ = prepare(t, 1, 3, limits, nil)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// Metrics should remain unchanged
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
}

func TestDistributorPushConcurrently(t *testing.T) {
Expand Down Expand Up @@ -1672,7 +1691,105 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()
defaultErrCode := 260

for _, tc := range []struct {
name string
blockUntil map[string]time.Time
policy string
labels string
expectError bool
expectedErrorMsg string
yes bool
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
yes: true,
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
},
} {
t.Run(tc.name, func(t *testing.T) {
if !tc.yes {
return
}
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}

// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}

distributors, _ := prepare(t, 1, 3, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string

Expand Down
Loading

0 comments on commit a8eb5a6

Please sign in to comment.