diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index d5d92cf..bb93e75 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -32,6 +32,11 @@ type Transform struct { // Name of the Transform processor Name string `json:"name,omitempty"` + // When FlattenData is true, the processor provides each log record with a distinct copy + // of its resource and scope. Then, after applying all transformations, + // the log records are regrouped by resource and scope. + FlattenData bool `json:"flattenData,omitempty"` + // +kubebuilder:validation:Enum:=ignore;silent;propagate // ErrorMode specifies how errors are handled while processing a statement diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml index ab253d4..92d2dcb 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml @@ -190,6 +190,12 @@ spec: - silent - propagate type: string + flattenData: + description: |- + When FlattenData is true, the processor provides each log record with a distinct copy + of its resource and scope. Then, after applying all transformations, + the log records are regrouped by resource and scope. + type: boolean logStatements: items: description: |- diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index ab253d4..92d2dcb 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -190,6 +190,12 @@ spec: - silent - propagate type: string + flattenData: + description: |- + When FlattenData is true, the processor provides each log record with a distinct copy + of its resource and scope. Then, after applying all transformations, + the log records are regrouped by resource and scope. + type: boolean logStatements: items: description: |- diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml index 48165ae..71186e2 100644 --- a/docs/examples/tenant-to-tenant-routing/pipeline.yaml +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -17,6 +17,7 @@ metadata: spec: transform: name: parse-nginx + flattenData: true logStatements: - context: log statements: diff --git a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml index c7df2c1..ee74c92 100644 --- a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml +++ b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml @@ -43,6 +43,7 @@ metadata: spec: transform: name: parse-nginx + flattenData: true logStatements: - context: log statements: diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index d15a3ae..c98de88 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -211,7 +211,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - otelConfig := otelConfigInput.AssembleConfig(ctx) + otelConfig, additionalArgs := otelConfigInput.AssembleConfig(ctx) if err := validator.ValidateAssembledConfig(otelConfig); err != nil { logger.Error(errors.WithStack(err), "invalid otel config") @@ -241,6 +241,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( Mode: otelv1beta1.ModeDaemonSet, OpenTelemetryCommonFields: otelv1beta1.OpenTelemetryCommonFields{ Image: axoflowOtelCollectorImageRef, + Args: additionalArgs, ServiceAccount: saName.Name, VolumeMounts: []corev1.VolumeMount{ { diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index b54064d..8e2da7f 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "slices" + "strings" "github.com/hashicorp/go-multierror" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" @@ -270,7 +271,7 @@ func (cfgInput *OtelColConfigInput) generateTelemetry() map[string]any { return telemetry } -func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1beta1.Config { +func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) (otelv1beta1.Config, map[string]string) { exporters := cfgInput.generateExporters(ctx) processors := cfgInput.generateProcessors() @@ -306,7 +307,7 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be extensionNames = nil } - return otelv1beta1.Config{ + otelConfig := otelv1beta1.Config{ Receivers: otelv1beta1.AnyConfig{Object: receivers}, Exporters: otelv1beta1.AnyConfig{Object: exporters}, Processors: &otelv1beta1.AnyConfig{Object: processors}, @@ -318,6 +319,28 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be Pipelines: pipelines, }, } + + return otelConfig, assembleAdditionalArgs(&otelConfig) +} + +func assembleAdditionalArgs(otelConfig *otelv1beta1.Config) map[string]string { + const ( + featureGatesKey = "feature-gates" + flattenLogsFeatureGateValue = "transform.flatten.logs" + + transformProcessorID = "transform" + flattenDataKey = "flatten_data" + ) + + args := make(map[string]string) + for processorName, processorConfig := range otelConfig.Processors.Object { + if strings.Contains(processorName, transformProcessorID) && processorConfig.(processor.TransformProcessor).FlattenData { + args[featureGatesKey] = flattenLogsFeatureGateValue + break + } + } + + return args } func validateTenants(tenants *[]v1alpha1.Tenant) error { diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 8a9e4d3..940ca7b 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -310,7 +310,7 @@ func TestOtelColConfComplex(t *testing.T) { // Config // The receiver and exporter entries are not serialized because of tags on the underlying data structure. The tests won't contain them, this is a known issue. - generatedConfig := inputCfg.AssembleConfig(context.TODO()) + generatedConfig, _ := inputCfg.AssembleConfig(context.TODO()) actualJSONBytes, err1 := json.Marshal(generatedConfig) if err1 != nil { t.Fatalf("error %v", err1) diff --git a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go index 23d52a6..9d5ee79 100644 --- a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go +++ b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go @@ -30,6 +30,7 @@ type TransformProcessorStatement struct { type TransformProcessor struct { ErrorMode components.ErrorMode `json:"error_mode,omitempty"` + FlattenData bool `json:"flatten_data,omitempty"` TraceStatements []TransformProcessorStatement `json:"trace_statements,omitempty"` MetricStatements []TransformProcessorStatement `json:"metric_statements,omitempty"` LogStatements []TransformProcessorStatement `json:"log_statements,omitempty"` @@ -38,6 +39,7 @@ type TransformProcessor struct { func GenerateTransformProcessorForTenant(tenant v1alpha1.Tenant) TransformProcessor { return TransformProcessor{ ErrorMode: components.ErrorMode(tenant.Spec.Transform.ErrorMode), + FlattenData: tenant.Spec.Transform.FlattenData, TraceStatements: convertAPIStatements(tenant.Spec.Transform.TraceStatements), MetricStatements: convertAPIStatements(tenant.Spec.Transform.MetricStatements), LogStatements: convertAPIStatements(tenant.Spec.Transform.LogStatements),