Skip to content

Commit

Permalink
feat: batch processor
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
  • Loading branch information
csatib02 committed Nov 14, 2024
1 parent 5867f09 commit 591011c
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 4 deletions.
38 changes: 38 additions & 0 deletions api/telemetry/v1alpha1/output_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,50 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Batch struct {
// From go.opentelemetry.io/collector/processor/batchprocessor

// +kubebuilder:validation:Format=duration

// Timeout sets the time after which a batch will be sent regardless of size.
// When this is set to zero, batched data will be sent immediately.
Timeout string `json:"timeout,omitempty"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
// When this is set to zero, the batch size is ignored and data will be sent immediately
// subject to only send_batch_max_size.
SendBatchSize uint32 `json:"send_batch_size,omitempty"`

// SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
// Larger batches are split into smaller units.
// Default value is 0, that means no maximum size.
SendBatchMaxSize uint32 `json:"send_batch_max_size,omitempty"`

// MetadataKeys is a list of client.Metadata keys that will be
// used to form distinct batchers. If this setting is empty,
// a single batcher instance will be used. When this setting
// is not empty, one batcher will be used per distinct
// combination of values for the listed metadata keys.
//
// Empty value and unset metadata are treated as distinct cases.
//
// Entries are case-insensitive. Duplicated entries will
// trigger a validation error.
MetadataKeys []string `json:"metadata_keys,omitempty"`

// MetadataCardinalityLimit indicates the maximum number of
// batcher instances that will be created through a distinct
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `json:"metadata_cardinality_limit,omitempty"`
}

// OutputSpec defines the desired state of Output
type OutputSpec struct {
OTLPGRPC *OTLPGRPC `json:"otlp,omitempty"`
Fluentforward *Fluentforward `json:"fluentforward,omitempty"`
OTLPHTTP *OTLPHTTP `json:"otlphttp,omitempty"`
Authentication *OutputAuth `json:"authentication,omitempty"`
Batch *Batch `json:"batch,omitempty"`
}

type OutputAuth struct {
Expand Down
25 changes: 25 additions & 0 deletions api/telemetry/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,51 @@ spec:
type: string
type: object
type: object
batch:
properties:
metadata_cardinality_limit:
description: |-
MetadataCardinalityLimit indicates the maximum number of
batcher instances that will be created through a distinct
combination of MetadataKeys.
format: int32
type: integer
metadata_keys:
description: |-
MetadataKeys is a list of client.Metadata keys that will be
used to form distinct batchers. If this setting is empty,
a single batcher instance will be used. When this setting
is not empty, one batcher will be used per distinct
combination of values for the listed metadata keys.
Empty value and unset metadata are treated as distinct cases.
Entries are case-insensitive. Duplicated entries will
trigger a validation error.
items:
type: string
type: array
send_batch_max_size:
description: |-
SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
Larger batches are split into smaller units.
Default value is 0, that means no maximum size.
format: int32
type: integer
send_batch_size:
description: |-
SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
When this is set to zero, the batch size is ignored and data will be sent immediately
subject to only send_batch_max_size.
format: int32
type: integer
timeout:
description: |-
Timeout sets the time after which a batch will be sent regardless of size.
When this is set to zero, batched data will be sent immediately.
format: duration
type: string
type: object
fluentforward:
properties:
compress_gzip:
Expand Down
45 changes: 45 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,51 @@ spec:
type: string
type: object
type: object
batch:
properties:
metadata_cardinality_limit:
description: |-
MetadataCardinalityLimit indicates the maximum number of
batcher instances that will be created through a distinct
combination of MetadataKeys.
format: int32
type: integer
metadata_keys:
description: |-
MetadataKeys is a list of client.Metadata keys that will be
used to form distinct batchers. If this setting is empty,
a single batcher instance will be used. When this setting
is not empty, one batcher will be used per distinct
combination of values for the listed metadata keys.
Empty value and unset metadata are treated as distinct cases.
Entries are case-insensitive. Duplicated entries will
trigger a validation error.
items:
type: string
type: array
send_batch_max_size:
description: |-
SendBatchMaxSize is the maximum size of a batch. It must be larger than SendBatchSize.
Larger batches are split into smaller units.
Default value is 0, that means no maximum size.
format: int32
type: integer
send_batch_size:
description: |-
SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
When this is set to zero, the batch size is ignored and data will be sent immediately
subject to only send_batch_max_size.
format: int32
type: integer
timeout:
description: |-
Timeout sets the time after which a batch will be sent regardless of size.
When this is set to zero, batched data will be sent immediately.
format: duration
type: string
type: object
fluentforward:
properties:
compress_gzip:
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

const (
requeueDelayOnFailedTenant = 20 * time.Second
axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0"
axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0-dev1"
)

// CollectorReconciler reconciles a Collector object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ processors:
- action: insert
key: tenant
value: example-tenant-b
batch/otlp-test-output:
metadata_cardinality_limit: 100
metadata_keys:
- key1
- key2
send_batch_max_size: 4096
send_batch_size: 512
timeout: 5s
deltatocumulative: {}
k8sattributes:
auth_type: serviceAccount
Expand Down Expand Up @@ -169,6 +177,7 @@ service:
processors:
- memory_limiter
- attributes/exporter_name_otlp-test-output
- batch/otlp-test-output
receivers:
- routing/subscription_example-tenant-a-ns_subscription-example-1_outputs
logs/output_example-tenant-a-ns_subscription-example-2_collector_otlp-test-output-2:
Expand Down
14 changes: 14 additions & 0 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (cfgInput *OtelColConfigInput) generateProcessors() map[string]any {

for _, output := range cfgInput.OutputsWithSecretData {
processors[fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)] = processor.GenerateOutputExporterNameProcessor(components.GetExporterNameForOutput(output.Output))

// Add a batch processor if the output has one
if output.Output.Spec.Batch != nil {
processors[fmt.Sprintf("batch/%s", output.Output.Name)] = processor.GenerateBatchProcessorForOutput(*output.Output.Spec.Batch)
}
}

return processors
Expand Down Expand Up @@ -201,6 +206,15 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b

receivers := []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}
processors := []string{fmt.Sprintf("attributes/exporter_name_%s", output.Output.Name)}

// NOTE: The order of the processors is important.
// The batch processor should be defined in the pipeline after the memory_limiter as well as any sampling processors.
// This is because batching should happen after any data drops such as sampling.
// ref: https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor#recommended-processors
if output.Output.Spec.Batch != nil {
processors = append(processors, fmt.Sprintf("batch/%s", output.Output.Name))
}

var exporters []string

if output.Output.Spec.OTLPGRPC != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func TestOtelColConfComplex(t *testing.T) {
},
},
},
Batch: &v1alpha1.Batch{
Timeout: "5s",
SendBatchSize: 512,
SendBatchMaxSize: 4096,
MetadataKeys: []string{"key1", "key2"},
MetadataCardinalityLimit: 100,
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func createDecoderConfig(result interface{}, hooks ...mapstructure.DecodeHookFun

// decodeID converts string to component.ID or pipeline.ID
func decodeID(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) {
// contrib components sometimes does not follow the type/name format
contribComponents := map[string]bool{
// occasionally components don't follow the type/name format
// in such cases, we need to handle them separately
exceptionComponents := map[string]bool{
"debug": true,
"deltatocumulative": true,
"memory_limiter": true,
Expand All @@ -49,7 +50,7 @@ func decodeID(from reflect.Type, to reflect.Type, data interface{}) (interface{}
switch to {
case reflect.TypeOf(component.ID{}):
if len(parts) != 2 {
if contribComponents[parts[0]] {
if exceptionComponents[parts[0]] {
return component.MustNewID(parts[0]), nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright © 2024 Kube logging authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"

func GenerateBatchProcessorForOutput(batch v1alpha1.Batch) map[string]any {
batchResult := make(map[string]any)
batchResult["timeout"] = batch.Timeout
if batch.SendBatchSize != 0 {
batchResult["send_batch_size"] = batch.SendBatchSize
}
if batch.SendBatchMaxSize != 0 {
batchResult["send_batch_max_size"] = batch.SendBatchMaxSize
}
if batch.MetadataKeys != nil {
batchResult["metadata_keys"] = batch.MetadataKeys
}
if batch.MetadataCardinalityLimit != 0 {
batchResult["metadata_cardinality_limit"] = batch.MetadataCardinalityLimit
}

return batchResult

}

0 comments on commit 591011c

Please sign in to comment.