Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add support for transform processor log-flattenning feature-gate #104

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/telemetry/v1alpha1/tenant_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type Transform struct {
// Name of the Transform processor
Name string `json:"name,omitempty"`

// When FlattenData is true, the processor provides each log record with a distinct copy
// of its resource and scope. Then, after applying all transformations,
// the log records are regrouped by resource and scope.
FlattenData bool `json:"flattenData,omitempty"`

// +kubebuilder:validation:Enum:=ignore;silent;propagate

// ErrorMode specifies how errors are handled while processing a statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ spec:
- silent
- propagate
type: string
flattenData:
description: |-
When FlattenData is true, the processor provides each log record with a distinct copy
of its resource and scope. Then, after applying all transformations,
the log records are regrouped by resource and scope.
type: boolean
logStatements:
items:
description: |-
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_tenants.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ spec:
- silent
- propagate
type: string
flattenData:
description: |-
When FlattenData is true, the processor provides each log record with a distinct copy
of its resource and scope. Then, after applying all transformations,
the log records are regrouped by resource and scope.
type: boolean
logStatements:
items:
description: |-
Expand Down
1 change: 1 addition & 0 deletions docs/examples/tenant-to-tenant-routing/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ metadata:
spec:
transform:
name: parse-nginx
flattenData: true
logStatements:
- context: log
statements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ metadata:
spec:
transform:
name: parse-nginx
flattenData: true
logStatements:
- context: log
statements:
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

otelConfig := otelConfigInput.AssembleConfig(ctx)
otelConfig, additionalArgs := otelConfigInput.AssembleConfig(ctx)
if err := validator.ValidateAssembledConfig(otelConfig); err != nil {
logger.Error(errors.WithStack(err), "invalid otel config")

Expand Down Expand Up @@ -241,6 +241,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
Mode: otelv1beta1.ModeDaemonSet,
OpenTelemetryCommonFields: otelv1beta1.OpenTelemetryCommonFields{
Image: axoflowOtelCollectorImageRef,
Args: additionalArgs,
ServiceAccount: saName.Name,
VolumeMounts: []corev1.VolumeMount{
{
Expand Down
27 changes: 25 additions & 2 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"slices"
"strings"

"github.com/hashicorp/go-multierror"
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (cfgInput *OtelColConfigInput) generateTelemetry() map[string]any {
return telemetry
}

func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1beta1.Config {
func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) (otelv1beta1.Config, map[string]string) {
exporters := cfgInput.generateExporters(ctx)

processors := cfgInput.generateProcessors()
Expand Down Expand Up @@ -306,7 +307,7 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
extensionNames = nil
}

return otelv1beta1.Config{
otelConfig := otelv1beta1.Config{
Receivers: otelv1beta1.AnyConfig{Object: receivers},
Exporters: otelv1beta1.AnyConfig{Object: exporters},
Processors: &otelv1beta1.AnyConfig{Object: processors},
Expand All @@ -318,6 +319,28 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
Pipelines: pipelines,
},
}

return otelConfig, assembleAdditionalArgs(&otelConfig)
}

func assembleAdditionalArgs(otelConfig *otelv1beta1.Config) map[string]string {
const (
featureGatesKey = "feature-gates"
flattenLogsFeatureGateValue = "transform.flatten.logs"

transformProcessorID = "transform"
flattenDataKey = "flatten_data"
)

args := make(map[string]string)
for processorName, processorConfig := range otelConfig.Processors.Object {
if strings.Contains(processorName, transformProcessorID) && processorConfig.(processor.TransformProcessor).FlattenData {
args[featureGatesKey] = flattenLogsFeatureGateValue
break
}
}

return args
}

func validateTenants(tenants *[]v1alpha1.Tenant) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestOtelColConfComplex(t *testing.T) {
// Config

// The receiver and exporter entries are not serialized because of tags on the underlying data structure. The tests won't contain them, this is a known issue.
generatedConfig := inputCfg.AssembleConfig(context.TODO())
generatedConfig, _ := inputCfg.AssembleConfig(context.TODO())
actualJSONBytes, err1 := json.Marshal(generatedConfig)
if err1 != nil {
t.Fatalf("error %v", err1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TransformProcessorStatement struct {

type TransformProcessor struct {
ErrorMode components.ErrorMode `json:"error_mode,omitempty"`
FlattenData bool `json:"flatten_data,omitempty"`
TraceStatements []TransformProcessorStatement `json:"trace_statements,omitempty"`
MetricStatements []TransformProcessorStatement `json:"metric_statements,omitempty"`
LogStatements []TransformProcessorStatement `json:"log_statements,omitempty"`
Expand All @@ -38,6 +39,7 @@ type TransformProcessor struct {
func GenerateTransformProcessorForTenant(tenant v1alpha1.Tenant) TransformProcessor {
return TransformProcessor{
ErrorMode: components.ErrorMode(tenant.Spec.Transform.ErrorMode),
FlattenData: tenant.Spec.Transform.FlattenData,
TraceStatements: convertAPIStatements(tenant.Spec.Transform.TraceStatements),
MetricStatements: convertAPIStatements(tenant.Spec.Transform.MetricStatements),
LogStatements: convertAPIStatements(tenant.Spec.Transform.LogStatements),
Expand Down
Loading