diff --git a/go.mod b/go.mod index 9d386a9..d7fc29c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/cisco-open/operator-tools v0.37.0 github.com/google/go-cmp v0.6.0 github.com/hashicorp/go-multierror v1.1.1 - github.com/imdario/mergo v0.3.16 github.com/mitchellh/mapstructure v1.5.0 github.com/onsi/ginkgo/v2 v2.22.0 github.com/onsi/gomega v1.36.0 @@ -68,6 +67,7 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/iancoleman/orderedmap v0.3.0 // indirect + github.com/imdario/mergo v0.3.16 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -176,6 +176,3 @@ require ( sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) - -// ref: https://github.com/darccio/mergo/blob/2b1eb9c67d7332f286430af241180c5005a6a5a4/README.md?plain=1#L53 -replace github.com/imdario/mergo => github.com/imdario/mergo v0.3.16 diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 16ba98b..1f0e323 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -20,11 +20,11 @@ import ( "math" "reflect" "slices" + "strings" "time" "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/reconciler" - "github.com/imdario/mergo" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -235,10 +235,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, fmt.Errorf("%+v", err) } - otelCollector, state, err := r.otelCollector(collector, otelConfig, additionalArgs, otelConfigInput.Tenants, saName.Name) - if err != nil { - return ctrl.Result{}, err - } + otelCollector, state := r.otelCollector(collector, otelConfig, additionalArgs, otelConfigInput.Tenants, saName.Name) if err := ctrl.SetControllerReference(collector, otelCollector, r.Scheme); err != nil { return ctrl.Result{}, err @@ -389,7 +386,7 @@ func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelConfig otelv1beta1.Config, additionalArgs map[string]string, tenants []v1alpha1.Tenant, saName string) (*otelv1beta1.OpenTelemetryCollector, reconciler.DesiredState, error) { +func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelConfig otelv1beta1.Config, additionalArgs map[string]string, tenants []v1alpha1.Tenant, saName string) (*otelv1beta1.OpenTelemetryCollector, reconciler.DesiredState) { otelCollector := otelv1beta1.OpenTelemetryCollector{ TypeMeta: metav1.TypeMeta{ APIVersion: otelv1beta1.GroupVersion.String(), @@ -407,9 +404,7 @@ func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelC }, } appendAdditionalVolumesForTenantsFileStorage(&otelCollector.Spec.OpenTelemetryCommonFields, tenants) - if err := setOtelCommonFieldsDefaults(&otelCollector.Spec.OpenTelemetryCommonFields, additionalArgs, saName); err != nil { - return &otelCollector, nil, err - } + setOtelCommonFieldsDefaults(&otelCollector.Spec.OpenTelemetryCommonFields, additionalArgs, saName) if memoryLimit := collector.Spec.GetMemoryLimit(); memoryLimit != nil { // Calculate 80% of the specified memory limit for GOMEMLIMIT @@ -428,7 +423,7 @@ func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelC return nil }) - return &otelCollector, beforeUpdateHook, nil + return &otelCollector, beforeUpdateHook } func (r *CollectorReconciler) reconcileRBAC(ctx context.Context, collector *v1alpha1.Collector) (v1alpha1.NamespacedName, error) { @@ -568,18 +563,26 @@ func normalizeStringSlice(inputList []string) []string { return uniqueList } -func appendAdditionalVolumesForTenantsFileStorage(otelComonFields *otelv1beta1.OpenTelemetryCommonFields, tenants []v1alpha1.Tenant) { +func appendAdditionalVolumesForTenantsFileStorage(otelCommonFields *otelv1beta1.OpenTelemetryCommonFields, tenants []v1alpha1.Tenant) { + var volumeMountsForInit []corev1.VolumeMount + var chmodCommands []string + for _, tenant := range tenants { if tenant.Spec.PersistenceConfig.EnableFileStorage { + bufferVolumeName := fmt.Sprintf("buffervolume-%s", tenant.Name) mountPath := storage.DetermineFileStorageDirectory(tenant.Spec.PersistenceConfig.Directory) + volumeMount := corev1.VolumeMount{ + Name: bufferVolumeName, + MountPath: mountPath, + } + volumeMountsForInit = append(volumeMountsForInit, volumeMount) + chmodCommands = append(chmodCommands, fmt.Sprintf("chmod -R 777 %s", mountPath)) + + otelCommonFields.VolumeMounts = append(otelCommonFields.VolumeMounts, volumeMount) switch tenant.Spec.PersistenceConfig.VolumeSource { case "hostPath": - otelComonFields.VolumeMounts = append(otelComonFields.VolumeMounts, corev1.VolumeMount{ - Name: fmt.Sprintf("buffervolume/%s", tenant.Name), - MountPath: mountPath, - }) - otelComonFields.Volumes = append(otelComonFields.Volumes, corev1.Volume{ - Name: fmt.Sprintf("buffervolume/%s", tenant.Name), + otelCommonFields.Volumes = append(otelCommonFields.Volumes, corev1.Volume{ + Name: bufferVolumeName, VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: mountPath, @@ -587,14 +590,9 @@ func appendAdditionalVolumesForTenantsFileStorage(otelComonFields *otelv1beta1.O }, }, }) - case "emptyDir": - otelComonFields.VolumeMounts = append(otelComonFields.VolumeMounts, corev1.VolumeMount{ - Name: fmt.Sprintf("buffervolume/%s", tenant.Name), - MountPath: mountPath, - }) - otelComonFields.Volumes = append(otelComonFields.Volumes, corev1.Volume{ - Name: fmt.Sprintf("buffervolume/%s", tenant.Name), + otelCommonFields.Volumes = append(otelCommonFields.Volumes, corev1.Volume{ + Name: bufferVolumeName, VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{ Medium: corev1.StorageMediumDefault, @@ -604,9 +602,21 @@ func appendAdditionalVolumesForTenantsFileStorage(otelComonFields *otelv1beta1.O } } } + + // Add a single initContainer to handle all chmod operations + if len(chmodCommands) > 0 && len(volumeMountsForInit) > 0 { + initContainer := corev1.Container{ + Name: "init-chmod", + Image: "busybox", + Command: []string{"sh", "-c"}, + Args: []string{strings.Join(chmodCommands, " && ")}, + VolumeMounts: volumeMountsForInit, + } + otelCommonFields.InitContainers = append(otelCommonFields.InitContainers, initContainer) + } } -func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryCommonFields, additionalArgs map[string]string, saName string) error { +func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryCommonFields, additionalArgs map[string]string, saName string) { if otelCommonFields == nil { otelCommonFields = &otelv1beta1.OpenTelemetryCommonFields{} } @@ -614,8 +624,11 @@ func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryComm otelCommonFields.Image = axoflowOtelCollectorImageRef otelCommonFields.ServiceAccount = saName - if err := mergo.Merge(&otelCommonFields.Args, additionalArgs, mergo.WithOverride); err != nil { - return err + if otelCommonFields.Args == nil { + otelCommonFields.Args = make(map[string]string) + } + for key, value := range additionalArgs { + otelCommonFields.Args[key] = value } volumeMounts := []corev1.VolumeMount{ @@ -630,9 +643,7 @@ func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryComm MountPath: "/var/lib/docker/containers", }, } - if err := mergo.Merge(&otelCommonFields.VolumeMounts, volumeMounts, mergo.WithOverride); err != nil { - return err - } + otelCommonFields.VolumeMounts = append(otelCommonFields.VolumeMounts, volumeMounts...) volumes := []corev1.Volume{ { @@ -652,9 +663,5 @@ func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryComm }, }, } - if err := mergo.Merge(&otelCommonFields.Volumes, volumes, mergo.WithOverride); err != nil { - return err - } - - return nil + otelCommonFields.Volumes = append(otelCommonFields.Volumes, volumes...) } diff --git a/internal/controller/telemetry/collector_controller_test.go b/internal/controller/telemetry/collector_controller_test.go index 5343c9e..dc2c7b6 100644 --- a/internal/controller/telemetry/collector_controller_test.go +++ b/internal/controller/telemetry/collector_controller_test.go @@ -148,11 +148,7 @@ func TestSetOtelCommonFieldsDefaults(t *testing.T) { if ttp.initialCommonFields != nil { commonFieldsCopy = ttp.initialCommonFields.DeepCopy() } - - err := setOtelCommonFieldsDefaults(commonFieldsCopy, ttp.additionalArgs, ttp.saName) - if err != nil { - assert.EqualError(t, err, ttp.expectedError.Error()) - } + setOtelCommonFieldsDefaults(commonFieldsCopy, ttp.additionalArgs, ttp.saName) assert.Equal(t, ttp.expectedResult, commonFieldsCopy) }) diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index 6236799..49126d2 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -59,10 +59,10 @@ exporters: {} extensions: bearertokenauth/collector_otlp-test-output: token: testtoken - filestorage/example-tenant-a: + file_storage/example-tenant-a: create_directory: true directory: /var/lib/otelcol/file_storage - filestorage/example-tenant-b: + file_storage/example-tenant-b: create_directory: true directory: /var/lib/otelcol/file_storage processors: @@ -166,8 +166,8 @@ receivers: {} service: extensions: - bearertokenauth/collector_otlp-test-output - - filestorage/example-tenant-a - - filestorage/example-tenant-b + - file_storage/example-tenant-a + - file_storage/example-tenant-b pipelines: logs/output_example-tenant-a-ns_subscription-example-1_collector_loki-test-output: exporters: diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index d7a8e8f..e0d5142 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -122,7 +122,7 @@ func (cfgInput *OtelColConfigInput) generateExtensions() (map[string]any, []stri for _, tenant := range cfgInput.Tenants { if tenant.Spec.PersistenceConfig.EnableFileStorage { - extensions[fmt.Sprintf("filestorage/%s", tenant.Name)] = storage.GenerateFileStorageExtensionForTenant(tenant.Spec.PersistenceConfig.Directory) + extensions[fmt.Sprintf("file_storage/%s", tenant.Name)] = storage.GenerateFileStorageExtensionForTenant(tenant.Spec.PersistenceConfig.Directory) } } diff --git a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go index 134b281..334df12 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter.go @@ -39,7 +39,7 @@ func GenerateFluentforwardExporters(ctx context.Context, resourceRelations compo continue } if tenant.Spec.PersistenceConfig.EnableFileStorage { - output.Output.Spec.Fluentforward.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("filestorage/%s", tenant.Name)) + output.Output.Spec.Fluentforward.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("file_storage/%s", tenant.Name)) } fluentForwardMarshaled, err := json.Marshal(output.Output.Spec.Fluentforward) diff --git a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go index 9837645..2f705cd 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go +++ b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go @@ -112,7 +112,7 @@ func TestGenerateFluentforwardExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -213,7 +213,7 @@ func TestGenerateFluentforwardExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go index 4896114..410ed3c 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter.go @@ -40,7 +40,7 @@ func GenerateOTLPGRPCExporters(ctx context.Context, resourceRelations components continue } if tenant.Spec.PersistenceConfig.EnableFileStorage { - output.Output.Spec.OTLPGRPC.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("filestorage/%s", tenant.Name)) + output.Output.Spec.OTLPGRPC.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("file_storage/%s", tenant.Name)) } if output.Output.Spec.Authentication != nil { diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go index ecbd979..85a680c 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go +++ b/internal/controller/telemetry/pipeline/components/exporter/otlp_exporter_test.go @@ -136,7 +136,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -243,7 +243,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -329,7 +329,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -470,7 +470,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) { "enabled": true, "num_consumers": float64(10), "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go index 6abfa15..b571cb2 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go +++ b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter.go @@ -40,7 +40,7 @@ func GenerateOTLPHTTPExporters(ctx context.Context, resourceRelations components continue } if tenant.Spec.PersistenceConfig.EnableFileStorage { - output.Output.Spec.OTLPHTTP.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("filestorage/%s", tenant.Name)) + output.Output.Spec.OTLPHTTP.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("file_storage/%s", tenant.Name)) } if output.Output.Spec.Authentication != nil { diff --git a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go index 264f064..e7c45c1 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go +++ b/internal/controller/telemetry/pipeline/components/exporter/otlphttp_exporter_test.go @@ -137,7 +137,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -244,7 +244,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -330,7 +330,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) { "sending_queue": map[string]any{ "enabled": true, "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, @@ -470,7 +470,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) { "enabled": true, "num_consumers": float64(10), "queue_size": float64(100), - "storage": fmt.Sprintf("filestorage/%s", testTenantName), + "storage": fmt.Sprintf("file_storage/%s", testTenantName), }, "retry_on_failure": map[string]any{ "enabled": true, diff --git a/internal/controller/telemetry/pipeline/components/receiver/filelog_receiver.go b/internal/controller/telemetry/pipeline/components/receiver/filelog_receiver.go index 7c7b2c9..49b82ba 100644 --- a/internal/controller/telemetry/pipeline/components/receiver/filelog_receiver.go +++ b/internal/controller/telemetry/pipeline/components/receiver/filelog_receiver.go @@ -115,7 +115,7 @@ func GenerateDefaultKubernetesReceiver(namespaces []string, tenant v1alpha1.Tena }, } if tenant.Spec.EnableFileStorage { - k8sReceiver["storage"] = fmt.Sprintf("filestorage/%s", tenant.Name) + k8sReceiver["storage"] = fmt.Sprintf("file_storage/%s", tenant.Name) } return k8sReceiver