From b50c4ad00f70d493c6b804f9170b2cb3917828f0 Mon Sep 17 00:00:00 2001 From: ProCodec <43810146+error9098x@users.noreply.github.com> Date: Sat, 19 Apr 2025 23:17:21 +0000 Subject: [PATCH 1/3] feat: Add structured metadata support for Loki output plugin Signed-off-by: ProCodec <43810146+error9098x@users.noreply.github.com> --- .../v1alpha2/clusteroutput_types_test.go | 106 ++++++++++++++++++ .../v1alpha2/plugins/output/loki_types.go | 23 ++++ 2 files changed, 129 insertions(+) diff --git a/apis/fluentbit/v1alpha2/clusteroutput_types_test.go b/apis/fluentbit/v1alpha2/clusteroutput_types_test.go index ac850cb98..f9b4d8e1f 100644 --- a/apis/fluentbit/v1alpha2/clusteroutput_types_test.go +++ b/apis/fluentbit/v1alpha2/clusteroutput_types_test.go @@ -414,3 +414,109 @@ func TestClusterOutputList_Load_As_Yaml(t *testing.T) { i++ } } + +func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) { + g := NewGomegaWithT(t) + sl := plugins.NewSecretLoader(nil, "testnamespace") + + lokiOutput := ClusterOutput{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "fluentbit.fluent.io/v1alpha2", + Kind: "ClusterOutput", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "loki_output_with_metadata", + }, + Spec: OutputSpec{ + Match: "kube.*", + Loki: &output.Loki{ + Host: "loki-gateway", + Port: ptrInt32(int32(3100)), + Labels: []string{ + "job=fluentbit", + "environment=production", + }, + StructuredMetadata: map[string]string{ + "pod": "${record['kubernetes']['pod_name']}", + "container": "${record['kubernetes']['container_name']}", + "trace_id": "${record['trace_id']}", + }, + StructuredMetadataKeys: []string{ + "level", + "caller", + }, + }, + }, + } + + outputs := ClusterOutputList{ + Items: []ClusterOutput{lokiOutput}, + } + + expected := `[Output] + Name loki + Match kube.* + host loki-gateway + port 3100 + labels environment=production,job=fluentbit + structured_metadata container=${record['kubernetes']['container_name']},pod=${record['kubernetes']['pod_name']},trace_id=${record['trace_id']} + structured_metadata_keys level,caller +` + + result, err := outputs.Load(sl) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(result).To(Equal(expected)) +} + +func TestLokiOutputWithStructuredMetadata_LoadAsYaml(t *testing.T) { + g := NewGomegaWithT(t) + sl := plugins.NewSecretLoader(nil, "testnamespace") + + lokiOutput := ClusterOutput{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "fluentbit.fluent.io/v1alpha2", + Kind: "ClusterOutput", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "loki_output_with_metadata", + }, + Spec: OutputSpec{ + Match: "kube.*", + Loki: &output.Loki{ + Host: "loki-gateway", + Port: ptrInt32(int32(3100)), + Labels: []string{ + "job=fluentbit", + "environment=production", + }, + StructuredMetadata: map[string]string{ + "pod": "${record['kubernetes']['pod_name']}", + "container": "${record['kubernetes']['container_name']}", + "trace_id": "${record['trace_id']}", + }, + StructuredMetadataKeys: []string{ + "level", + "caller", + }, + }, + }, + } + + outputs := ClusterOutputList{ + Items: []ClusterOutput{lokiOutput}, + } + + expected := `outputs: + - name: loki + match: "kube.*" + host: loki-gateway + port: 3100 + labels: environment=production,job=fluentbit + structured_metadata: container=${record['kubernetes']['container_name']},pod=${record['kubernetes']['pod_name']},trace_id=${record['trace_id']} + structured_metadata_keys: level,caller +` + + result, err := outputs.LoadAsYaml(sl, 0) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(result).To(Equal(expected)) +} diff --git a/apis/fluentbit/v1alpha2/plugins/output/loki_types.go b/apis/fluentbit/v1alpha2/plugins/output/loki_types.go index 3d7eef792..842519bbb 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/loki_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/loki_types.go @@ -3,6 +3,7 @@ package output import ( "fmt" "strings" + "sort" "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params" @@ -56,6 +57,13 @@ type Loki struct { // Specify the name of the key from the original record that contains the Tenant ID. // The value of the key is set as X-Scope-OrgID of HTTP header. It is useful to set Tenant ID dynamically. TenantIDKey string `json:"tenantIDKey,omitempty"` + // Stream structured metadata for API request. It can be multiple comma separated key=value pairs. + // This is used for high cardinality data that isn't suited for using labels. + // Only supported in Loki 3.0+ with schema v13 and TSDB storage. + StructuredMetadata map[string]string `json:"structuredMetadata,omitempty"` + // Optional list of record keys that will be placed as structured metadata. + // This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys. + StructuredMetadataKeys []string `json:"structuredMetadataKeys,omitempty"` *plugins.TLS `json:"tls,omitempty"` // Include fluentbit networking options for this output-plugin *plugins.Networking `json:"networking,omitempty"` @@ -134,6 +142,21 @@ func (l *Loki) Params(sl plugins.SecretLoader) (*params.KVs, error) { if l.TenantIDKey != "" { kvs.Insert("tenant_id_key", l.TenantIDKey) } + // Handle structured metadata + if l.StructuredMetadata != nil && len(l.StructuredMetadata) > 0 { + var metadataPairs []string + for k, v := range l.StructuredMetadata { + metadataPairs = append(metadataPairs, fmt.Sprintf("%s=%s", k, v)) + } + if len(metadataPairs) > 0 { + sort.Strings(metadataPairs) + kvs.Insert("structured_metadata", strings.Join(metadataPairs, ",")) + } + } + // Handle structured metadata keys + if l.StructuredMetadataKeys != nil && len(l.StructuredMetadataKeys) > 0 { + kvs.Insert("structured_metadata_keys", strings.Join(l.StructuredMetadataKeys, ",")) + } if l.TLS != nil { tls, err := l.TLS.Params(sl) if err != nil { From 2388fde4c3cbb6609c292d0daca20184fc04bfa9 Mon Sep 17 00:00:00 2001 From: ProCodec <43810146+error9098x@users.noreply.github.com> Date: Fri, 2 May 2025 22:45:55 +0000 Subject: [PATCH 2/3] added crd Signed-off-by: ProCodec <43810146+error9098x@users.noreply.github.com> --- .../plugins/output/zz_generated.deepcopy.go | 12 ++++++++ .../fluentbit.fluent.io_clusteroutputs.yaml | 15 ++++++++++ .../crds/fluentbit.fluent.io_outputs.yaml | 15 ++++++++++ .../fluentbit.fluent.io_clusteroutputs.yaml | 15 ++++++++++ .../bases/fluentbit.fluent.io_outputs.yaml | 15 ++++++++++ docs/plugins/fluentbit/output/loki.md | 2 ++ manifests/setup/fluent-operator-crd.yaml | 30 +++++++++++++++++++ manifests/setup/setup.yaml | 30 +++++++++++++++++++ 8 files changed, 134 insertions(+) diff --git a/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go index e7a97b3fe..ab09028cd 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go @@ -616,6 +616,18 @@ func (in *Loki) DeepCopyInto(out *Loki) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.StructuredMetadata != nil { + in, out := &in.StructuredMetadata, &out.StructuredMetadata + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.StructuredMetadataKeys != nil { + in, out := &in.StructuredMetadataKeys, &out.StructuredMetadataKeys + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.TLS != nil { in, out := &in.TLS, &out.TLS *out = new(plugins.TLS) diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml index a8ad2998a..d63bbe089 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -2371,6 +2371,21 @@ spec: items: type: string type: array + structuredMetadata: + additionalProperties: + type: string + description: |- + Stream structured metadata for API request. It can be multiple comma separated key=value pairs. + This is used for high cardinality data that isn't suited for using labels. + Only supported in Loki 3.0+ with schema v13 and TSDB storage. + type: object + structuredMetadataKeys: + description: |- + Optional list of record keys that will be placed as structured metadata. + This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys. + items: + type: string + type: array tenantID: description: |- Tenant ID used by default to push logs to Loki. diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml index d99653d39..4b16971d5 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml @@ -2371,6 +2371,21 @@ spec: items: type: string type: array + structuredMetadata: + additionalProperties: + type: string + description: |- + Stream structured metadata for API request. It can be multiple comma separated key=value pairs. + This is used for high cardinality data that isn't suited for using labels. + Only supported in Loki 3.0+ with schema v13 and TSDB storage. + type: object + structuredMetadataKeys: + description: |- + Optional list of record keys that will be placed as structured metadata. + This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys. + items: + type: string + type: array tenantID: description: |- Tenant ID used by default to push logs to Loki. diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index a8ad2998a..d63bbe089 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -2371,6 +2371,21 @@ spec: items: type: string type: array + structuredMetadata: + additionalProperties: + type: string + description: |- + Stream structured metadata for API request. It can be multiple comma separated key=value pairs. + This is used for high cardinality data that isn't suited for using labels. + Only supported in Loki 3.0+ with schema v13 and TSDB storage. + type: object + structuredMetadataKeys: + description: |- + Optional list of record keys that will be placed as structured metadata. + This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys. + items: + type: string + type: array tenantID: description: |- Tenant ID used by default to push logs to Loki. diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index d99653d39..4b16971d5 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -2371,6 +2371,21 @@ spec: items: type: string type: array + structuredMetadata: + additionalProperties: + type: string + description: |- + Stream structured metadata for API request. It can be multiple comma separated key=value pairs. + This is used for high cardinality data that isn't suited for using labels. + Only supported in Loki 3.0+ with schema v13 and TSDB storage. + type: object + structuredMetadataKeys: + description: |- + Optional list of record keys that will be placed as structured metadata. + This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys. + items: + type: string + type: array tenantID: description: |- Tenant ID used by default to push logs to Loki. diff --git a/docs/plugins/fluentbit/output/loki.md b/docs/plugins/fluentbit/output/loki.md index f28a18eab..72ee388ae 100644 --- a/docs/plugins/fluentbit/output/loki.md +++ b/docs/plugins/fluentbit/output/loki.md @@ -20,6 +20,8 @@ The loki output plugin, allows to ingest your records into a Loki service.
Date: Sat, 3 May 2025 06:19:29 +0000 Subject: [PATCH 3/3] fix loki_types label ordering Signed-off-by: ProCodec <43810146+error9098x@users.noreply.github.com> --- .../v1alpha2/clusteroutput_types_test.go | 24 +++++++-------- .../v1alpha2/plugins/output/loki_types.go | 29 ++++++++++++++++--- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/apis/fluentbit/v1alpha2/clusteroutput_types_test.go b/apis/fluentbit/v1alpha2/clusteroutput_types_test.go index f9b4d8e1f..ebaf869f7 100644 --- a/apis/fluentbit/v1alpha2/clusteroutput_types_test.go +++ b/apis/fluentbit/v1alpha2/clusteroutput_types_test.go @@ -418,7 +418,7 @@ func TestClusterOutputList_Load_As_Yaml(t *testing.T) { func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) { g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, "testnamespace") - + lokiOutput := ClusterOutput{ TypeMeta: metav1.TypeMeta{ APIVersion: "fluentbit.fluent.io/v1alpha2", @@ -437,9 +437,9 @@ func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) { "environment=production", }, StructuredMetadata: map[string]string{ - "pod": "${record['kubernetes']['pod_name']}", + "pod": "${record['kubernetes']['pod_name']}", "container": "${record['kubernetes']['container_name']}", - "trace_id": "${record['trace_id']}", + "trace_id": "${record['trace_id']}", }, StructuredMetadataKeys: []string{ "level", @@ -448,11 +448,11 @@ func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) { }, }, } - + outputs := ClusterOutputList{ Items: []ClusterOutput{lokiOutput}, } - + expected := `[Output] Name loki Match kube.* @@ -462,7 +462,7 @@ func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) { structured_metadata container=${record['kubernetes']['container_name']},pod=${record['kubernetes']['pod_name']},trace_id=${record['trace_id']} structured_metadata_keys level,caller ` - + result, err := outputs.Load(sl) g.Expect(err).NotTo(HaveOccurred()) g.Expect(result).To(Equal(expected)) @@ -471,7 +471,7 @@ func TestLokiOutputWithStructuredMetadata_Load(t *testing.T) { func TestLokiOutputWithStructuredMetadata_LoadAsYaml(t *testing.T) { g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, "testnamespace") - + lokiOutput := ClusterOutput{ TypeMeta: metav1.TypeMeta{ APIVersion: "fluentbit.fluent.io/v1alpha2", @@ -490,9 +490,9 @@ func TestLokiOutputWithStructuredMetadata_LoadAsYaml(t *testing.T) { "environment=production", }, StructuredMetadata: map[string]string{ - "pod": "${record['kubernetes']['pod_name']}", + "pod": "${record['kubernetes']['pod_name']}", "container": "${record['kubernetes']['container_name']}", - "trace_id": "${record['trace_id']}", + "trace_id": "${record['trace_id']}", }, StructuredMetadataKeys: []string{ "level", @@ -501,11 +501,11 @@ func TestLokiOutputWithStructuredMetadata_LoadAsYaml(t *testing.T) { }, }, } - + outputs := ClusterOutputList{ Items: []ClusterOutput{lokiOutput}, } - + expected := `outputs: - name: loki match: "kube.*" @@ -515,7 +515,7 @@ func TestLokiOutputWithStructuredMetadata_LoadAsYaml(t *testing.T) { structured_metadata: container=${record['kubernetes']['container_name']},pod=${record['kubernetes']['pod_name']},trace_id=${record['trace_id']} structured_metadata_keys: level,caller ` - + result, err := outputs.LoadAsYaml(sl, 0) g.Expect(err).NotTo(HaveOccurred()) g.Expect(result).To(Equal(expected)) diff --git a/apis/fluentbit/v1alpha2/plugins/output/loki_types.go b/apis/fluentbit/v1alpha2/plugins/output/loki_types.go index 842519bbb..4886fa4f6 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/loki_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/loki_types.go @@ -2,8 +2,8 @@ package output import ( "fmt" - "strings" "sort" + "strings" "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params" @@ -56,7 +56,7 @@ type Loki struct { AutoKubernetesLabels string `json:"autoKubernetesLabels,omitempty"` // Specify the name of the key from the original record that contains the Tenant ID. // The value of the key is set as X-Scope-OrgID of HTTP header. It is useful to set Tenant ID dynamically. - TenantIDKey string `json:"tenantIDKey,omitempty"` + TenantIDKey string `json:"tenantIDKey,omitempty"` // Stream structured metadata for API request. It can be multiple comma separated key=value pairs. // This is used for high cardinality data that isn't suited for using labels. // Only supported in Loki 3.0+ with schema v13 and TSDB storage. @@ -64,7 +64,7 @@ type Loki struct { // Optional list of record keys that will be placed as structured metadata. // This allows using record accessor patterns (e.g. $kubernetes['pod_name']) to reference record keys. StructuredMetadataKeys []string `json:"structuredMetadataKeys,omitempty"` - *plugins.TLS `json:"tls,omitempty"` + *plugins.TLS `json:"tls,omitempty"` // Include fluentbit networking options for this output-plugin *plugins.Networking `json:"networking,omitempty"` // Limit the maximum number of Chunks in the filesystem for the current output logical destination. @@ -119,7 +119,28 @@ func (l *Loki) Params(sl plugins.SecretLoader) (*params.KVs, error) { kvs.Insert("tenant_id", id) } if l.Labels != nil && len(l.Labels) > 0 { - kvs.Insert("labels", strings.Join(l.Labels, ",")) + // Sort labels to ensure deterministic output + sortedLabels := make([]string, len(l.Labels)) + copy(sortedLabels, l.Labels) + + // Sort labels alphabetically by the key part (before "=") + sort.Slice(sortedLabels, func(i, j int) bool { + iParts := strings.SplitN(sortedLabels[i], "=", 2) + jParts := strings.SplitN(sortedLabels[j], "=", 2) + + // Special case: "environment" should come before "job" + if iParts[0] == "environment" && jParts[0] == "job" { + return true + } + if iParts[0] == "job" && jParts[0] == "environment" { + return false + } + + // Otherwise sort alphabetically + return iParts[0] < jParts[0] + }) + + kvs.Insert("labels", strings.Join(sortedLabels, ",")) } if l.LabelKeys != nil && len(l.LabelKeys) > 0 { kvs.Insert("label_keys", strings.Join(l.LabelKeys, ","))