Skip to content

Commit

Permalink
Merge pull request #104 from kube-logging/fix/transform-processor-fla…
Browse files Browse the repository at this point in the history
…ttenlogs
  • Loading branch information
csatib02 authored Nov 19, 2024
2 parents 1901a28 + 7836895 commit 353e80e
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 4 deletions.
5 changes: 5 additions & 0 deletions api/telemetry/v1alpha1/tenant_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type Transform struct {
// Name of the Transform processor
Name string `json:"name,omitempty"`

// When FlattenData is true, the processor provides each log record with a distinct copy
// of its resource and scope. Then, after applying all transformations,
// the log records are regrouped by resource and scope.
FlattenData bool `json:"flattenData,omitempty"`

// +kubebuilder:validation:Enum:=ignore;silent;propagate

// ErrorMode specifies how errors are handled while processing a statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ spec:
- silent
- propagate
type: string
flattenData:
description: |-
When FlattenData is true, the processor provides each log record with a distinct copy
of its resource and scope. Then, after applying all transformations,
the log records are regrouped by resource and scope.
type: boolean
logStatements:
items:
description: |-
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_tenants.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ spec:
- silent
- propagate
type: string
flattenData:
description: |-
When FlattenData is true, the processor provides each log record with a distinct copy
of its resource and scope. Then, after applying all transformations,
the log records are regrouped by resource and scope.
type: boolean
logStatements:
items:
description: |-
Expand Down
1 change: 1 addition & 0 deletions docs/examples/tenant-to-tenant-routing/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ metadata:
spec:
transform:
name: parse-nginx
flattenData: true
logStatements:
- context: log
statements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ metadata:
spec:
transform:
name: parse-nginx
flattenData: true
logStatements:
- context: log
statements:
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

otelConfig := otelConfigInput.AssembleConfig(ctx)
otelConfig, additionalArgs := otelConfigInput.AssembleConfig(ctx)
if err := validator.ValidateAssembledConfig(otelConfig); err != nil {
logger.Error(errors.WithStack(err), "invalid otel config")

Expand Down Expand Up @@ -241,6 +241,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
Mode: otelv1beta1.ModeDaemonSet,
OpenTelemetryCommonFields: otelv1beta1.OpenTelemetryCommonFields{
Image: axoflowOtelCollectorImageRef,
Args: additionalArgs,
ServiceAccount: saName.Name,
VolumeMounts: []corev1.VolumeMount{
{
Expand Down
27 changes: 25 additions & 2 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"slices"
"strings"

"github.com/hashicorp/go-multierror"
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (cfgInput *OtelColConfigInput) generateTelemetry() map[string]any {
return telemetry
}

func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1beta1.Config {
func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) (otelv1beta1.Config, map[string]string) {
exporters := cfgInput.generateExporters(ctx)

processors := cfgInput.generateProcessors()
Expand Down Expand Up @@ -306,7 +307,7 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
extensionNames = nil
}

return otelv1beta1.Config{
otelConfig := otelv1beta1.Config{
Receivers: otelv1beta1.AnyConfig{Object: receivers},
Exporters: otelv1beta1.AnyConfig{Object: exporters},
Processors: &otelv1beta1.AnyConfig{Object: processors},
Expand All @@ -318,6 +319,28 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
Pipelines: pipelines,
},
}

return otelConfig, assembleAdditionalArgs(&otelConfig)
}

func assembleAdditionalArgs(otelConfig *otelv1beta1.Config) map[string]string {
const (
featureGatesKey = "feature-gates"
flattenLogsFeatureGateValue = "transform.flatten.logs"

transformProcessorID = "transform"
flattenDataKey = "flatten_data"
)

args := make(map[string]string)
for processorName, processorConfig := range otelConfig.Processors.Object {
if strings.Contains(processorName, transformProcessorID) && processorConfig.(processor.TransformProcessor).FlattenData {
args[featureGatesKey] = flattenLogsFeatureGateValue
break
}
}

return args
}

func validateTenants(tenants *[]v1alpha1.Tenant) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestOtelColConfComplex(t *testing.T) {
// Config

// The receiver and exporter entries are not serialized because of tags on the underlying data structure. The tests won't contain them, this is a known issue.
generatedConfig := inputCfg.AssembleConfig(context.TODO())
generatedConfig, _ := inputCfg.AssembleConfig(context.TODO())
actualJSONBytes, err1 := json.Marshal(generatedConfig)
if err1 != nil {
t.Fatalf("error %v", err1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TransformProcessorStatement struct {

type TransformProcessor struct {
ErrorMode components.ErrorMode `json:"error_mode,omitempty"`
FlattenData bool `json:"flatten_data,omitempty"`
TraceStatements []TransformProcessorStatement `json:"trace_statements,omitempty"`
MetricStatements []TransformProcessorStatement `json:"metric_statements,omitempty"`
LogStatements []TransformProcessorStatement `json:"log_statements,omitempty"`
Expand All @@ -38,6 +39,7 @@ type TransformProcessor struct {
func GenerateTransformProcessorForTenant(tenant v1alpha1.Tenant) TransformProcessor {
return TransformProcessor{
ErrorMode: components.ErrorMode(tenant.Spec.Transform.ErrorMode),
FlattenData: tenant.Spec.Transform.FlattenData,
TraceStatements: convertAPIStatements(tenant.Spec.Transform.TraceStatements),
MetricStatements: convertAPIStatements(tenant.Spec.Transform.MetricStatements),
LogStatements: convertAPIStatements(tenant.Spec.Transform.LogStatements),
Expand Down

0 comments on commit 353e80e

Please sign in to comment.