From 591011cb50763bf04edfa6b8e0b89f2f05c91cd1 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Thu, 14 Nov 2024 14:09:45 +0100 Subject: [PATCH] feat: batch processor Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/output_types.go | 38 ++++++++++++++++ .../v1alpha1/zz_generated.deepcopy.go | 25 +++++++++++ .../telemetry.kube-logging.dev_outputs.yaml | 45 +++++++++++++++++++ .../telemetry.kube-logging.dev_outputs.yaml | 45 +++++++++++++++++++ .../telemetry/collector_controller.go | 2 +- .../otel_col_conf_test_fixtures/complex.yaml | 9 ++++ .../telemetry/otel_conf_gen/otel_conf_gen.go | 14 ++++++ .../otel_conf_gen/otel_conf_gen_test.go | 7 +++ .../otel_conf_gen/validator/hooks.go | 7 +-- .../components/processor/batch_processor.go | 37 +++++++++++++++ 10 files changed, 225 insertions(+), 4 deletions(-) create mode 100644 internal/controller/telemetry/pipeline/components/processor/batch_processor.go diff --git a/api/telemetry/v1alpha1/output_types.go b/api/telemetry/v1alpha1/output_types.go index beb3011..7b89539 100644 --- a/api/telemetry/v1alpha1/output_types.go +++ b/api/telemetry/v1alpha1/output_types.go @@ -22,12 +22,50 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type Batch struct { + // From go.opentelemetry.io/collector/processor/batchprocessor + + // +kubebuilder:validation:Format=duration + + // Timeout sets the time after which a batch will be sent regardless of size. + // When this is set to zero, batched data will be sent immediately. + Timeout string `json:"timeout,omitempty"` + + // SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + // When this is set to zero, the batch size is ignored and data will be sent immediately + // subject to only send_batch_max_size. + SendBatchSize uint32 `json:"send_batch_size,omitempty"` + + // SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize. + // Larger batches are split into smaller units. + // Default value is 0, that means no maximum size. + SendBatchMaxSize uint32 `json:"send_batch_max_size,omitempty"` + + // MetadataKeys is a list of client.Metadata keys that will be + // used to form distinct batchers. If this setting is empty, + // a single batcher instance will be used. When this setting + // is not empty, one batcher will be used per distinct + // combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will + // trigger a validation error. + MetadataKeys []string `json:"metadata_keys,omitempty"` + + // MetadataCardinalityLimit indicates the maximum number of + // batcher instances that will be created through a distinct + // combination of MetadataKeys. + MetadataCardinalityLimit uint32 `json:"metadata_cardinality_limit,omitempty"` +} + // OutputSpec defines the desired state of Output type OutputSpec struct { OTLPGRPC *OTLPGRPC `json:"otlp,omitempty"` Fluentforward *Fluentforward `json:"fluentforward,omitempty"` OTLPHTTP *OTLPHTTP `json:"otlphttp,omitempty"` Authentication *OutputAuth `json:"authentication,omitempty"` + Batch *Batch `json:"batch,omitempty"` } type OutputAuth struct { diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index c4c1d15..844c307 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -77,6 +77,26 @@ func (in *BasicAuthConfig) DeepCopy() *BasicAuthConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Batch) DeepCopyInto(out *Batch) { + *out = *in + if in.MetadataKeys != nil { + in, out := &in.MetadataKeys, &out.MetadataKeys + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Batch. +func (in *Batch) DeepCopy() *Batch { + if in == nil { + return nil + } + out := new(Batch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BearerAuthConfig) DeepCopyInto(out *BearerAuthConfig) { *out = *in @@ -602,6 +622,11 @@ func (in *OutputSpec) DeepCopyInto(out *OutputSpec) { *out = new(OutputAuth) (*in).DeepCopyInto(*out) } + if in.Batch != nil { + in, out := &in.Batch, &out.Batch + *out = new(Batch) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OutputSpec. diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml index 169c6a2..83f7234 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml @@ -86,6 +86,51 @@ spec: type: string type: object type: object + batch: + properties: + metadata_cardinality_limit: + description: |- + MetadataCardinalityLimit indicates the maximum number of + batcher instances that will be created through a distinct + combination of MetadataKeys. + format: int32 + type: integer + metadata_keys: + description: |- + MetadataKeys is a list of client.Metadata keys that will be + used to form distinct batchers. If this setting is empty, + a single batcher instance will be used. When this setting + is not empty, one batcher will be used per distinct + combination of values for the listed metadata keys. + + Empty value and unset metadata are treated as distinct cases. + + Entries are case-insensitive. Duplicated entries will + trigger a validation error. + items: + type: string + type: array + send_batch_max_size: + description: |- + SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize. + Larger batches are split into smaller units. + Default value is 0, that means no maximum size. + format: int32 + type: integer + send_batch_size: + description: |- + SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + When this is set to zero, the batch size is ignored and data will be sent immediately + subject to only send_batch_max_size. + format: int32 + type: integer + timeout: + description: |- + Timeout sets the time after which a batch will be sent regardless of size. + When this is set to zero, batched data will be sent immediately. + format: duration + type: string + type: object fluentforward: properties: compress_gzip: diff --git a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml index 169c6a2..83f7234 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml @@ -86,6 +86,51 @@ spec: type: string type: object type: object + batch: + properties: + metadata_cardinality_limit: + description: |- + MetadataCardinalityLimit indicates the maximum number of + batcher instances that will be created through a distinct + combination of MetadataKeys. + format: int32 + type: integer + metadata_keys: + description: |- + MetadataKeys is a list of client.Metadata keys that will be + used to form distinct batchers. If this setting is empty, + a single batcher instance will be used. When this setting + is not empty, one batcher will be used per distinct + combination of values for the listed metadata keys. + + Empty value and unset metadata are treated as distinct cases. + + Entries are case-insensitive. Duplicated entries will + trigger a validation error. + items: + type: string + type: array + send_batch_max_size: + description: |- + SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize. + Larger batches are split into smaller units. + Default value is 0, that means no maximum size. + format: int32 + type: integer + send_batch_size: + description: |- + SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + When this is set to zero, the batch size is ignored and data will be sent immediately + subject to only send_batch_max_size. + format: int32 + type: integer + timeout: + description: |- + Timeout sets the time after which a batch will be sent regardless of size. + When this is set to zero, batched data will be sent immediately. + format: duration + type: string + type: object fluentforward: properties: compress_gzip: diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index d0a117b..d15a3ae 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -49,7 +49,7 @@ var ( const ( requeueDelayOnFailedTenant = 20 * time.Second - axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0" + axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0-dev1" ) // CollectorReconciler reconciles a Collector object diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index 2bfb5eb..3659a72 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -122,6 +122,14 @@ processors: - action: insert key: tenant value: example-tenant-b + batch/otlp-test-output: + metadata_cardinality_limit: 100 + metadata_keys: + - key1 + - key2 + send_batch_max_size: 4096 + send_batch_size: 512 + timeout: 5s deltatocumulative: {} k8sattributes: auth_type: serviceAccount @@ -169,6 +177,7 @@ service: processors: - memory_limiter - attributes/exporter_name_otlp-test-output + - batch/otlp-test-output receivers: - routing/subscription_example-tenant-a-ns_subscription-example-1_outputs logs/output_example-tenant-a-ns_subscription-example-2_collector_otlp-test-output-2: diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 9da1803..2ae2e61 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -97,6 +97,11 @@ func (cfgInput *OtelColConfigInput) generateProcessors() map[string]any { for _, output := range cfgInput.OutputsWithSecretData { processors[fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)] = processor.GenerateOutputExporterNameProcessor(components.GetExporterNameForOutput(output.Output)) + + // Add a batch processor if the output has one + if output.Output.Spec.Batch != nil { + processors[fmt.Sprintf("batch/%s", output.Output.Name)] = processor.GenerateBatchProcessorForOutput(*output.Output.Spec.Batch) + } } return processors @@ -201,6 +206,15 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b receivers := []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)} processors := []string{fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)} + + // NOTE: The order of the processors is important. + // The batch processor should be defined in the pipeline after the memory_limiter as well as any sampling processors. + // This is because batching should happen after any data drops such as sampling. + // ref: https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor#recommended-processors + if output.Output.Spec.Batch != nil { + processors = append(processors, fmt.Sprintf("batch/%s", output.Output.Name)) + } + var exporters []string if output.Output.Spec.OTLPGRPC != nil { diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index c702559..0e1565f 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -208,6 +208,13 @@ func TestOtelColConfComplex(t *testing.T) { }, }, }, + Batch: &v1alpha1.Batch{ + Timeout: "5s", + SendBatchSize: 512, + SendBatchMaxSize: 4096, + MetadataKeys: []string{"key1", "key2"}, + MetadataCardinalityLimit: 100, + }, }, }, }, diff --git a/internal/controller/telemetry/otel_conf_gen/validator/hooks.go b/internal/controller/telemetry/otel_conf_gen/validator/hooks.go index faec141..6c7e01b 100644 --- a/internal/controller/telemetry/otel_conf_gen/validator/hooks.go +++ b/internal/controller/telemetry/otel_conf_gen/validator/hooks.go @@ -36,8 +36,9 @@ func createDecoderConfig(result interface{}, hooks ...mapstructure.DecodeHookFun // decodeID converts string to component.ID or pipeline.ID func decodeID(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) { - // contrib components sometimes does not follow the type/name format - contribComponents := map[string]bool{ + // occasionally components don't follow the type/name format + // in such cases, we need to handle them separately + exceptionComponents := map[string]bool{ "debug": true, "deltatocumulative": true, "memory_limiter": true, @@ -49,7 +50,7 @@ func decodeID(from reflect.Type, to reflect.Type, data interface{}) (interface{} switch to { case reflect.TypeOf(component.ID{}): if len(parts) != 2 { - if contribComponents[parts[0]] { + if exceptionComponents[parts[0]] { return component.MustNewID(parts[0]), nil } diff --git a/internal/controller/telemetry/pipeline/components/processor/batch_processor.go b/internal/controller/telemetry/pipeline/components/processor/batch_processor.go new file mode 100644 index 0000000..91195cf --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/processor/batch_processor.go @@ -0,0 +1,37 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + +func GenerateBatchProcessorForOutput(batch v1alpha1.Batch) map[string]any { + batchResult := make(map[string]any) + batchResult["timeout"] = batch.Timeout + if batch.SendBatchSize != 0 { + batchResult["send_batch_size"] = batch.SendBatchSize + } + if batch.SendBatchMaxSize != 0 { + batchResult["send_batch_max_size"] = batch.SendBatchMaxSize + } + if batch.MetadataKeys != nil { + batchResult["metadata_keys"] = batch.MetadataKeys + } + if batch.MetadataCardinalityLimit != 0 { + batchResult["metadata_cardinality_limit"] = batch.MetadataCardinalityLimit + } + + return batchResult + +}