Skip to content

Commit

Permalink
fix: chmod init
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
  • Loading branch information
csatib02 committed Dec 10, 2024
1 parent bfa14eb commit c05fe66
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 63 deletions.
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/cisco-open/operator-tools v0.37.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/imdario/mergo v0.3.16
github.com/mitchellh/mapstructure v1.5.0
github.com/onsi/ginkgo/v2 v2.22.0
github.com/onsi/gomega v1.36.0
Expand Down Expand Up @@ -68,6 +67,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/iancoleman/orderedmap v0.3.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down Expand Up @@ -176,6 +176,3 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)

// ref: https://github.com/darccio/mergo/blob/2b1eb9c67d7332f286430af241180c5005a6a5a4/README.md?plain=1#L53
replace github.com/imdario/mergo => github.com/imdario/mergo v0.3.16
77 changes: 42 additions & 35 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"math"
"reflect"
"slices"
"strings"
"time"

"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/reconciler"
"github.com/imdario/mergo"
otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -235,10 +235,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, fmt.Errorf("%+v", err)
}

otelCollector, state, err := r.otelCollector(collector, otelConfig, additionalArgs, otelConfigInput.Tenants, saName.Name)
if err != nil {
return ctrl.Result{}, err
}
otelCollector, state := r.otelCollector(collector, otelConfig, additionalArgs, otelConfigInput.Tenants, saName.Name)

if err := ctrl.SetControllerReference(collector, otelCollector, r.Scheme); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -389,7 +386,7 @@ func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelConfig otelv1beta1.Config, additionalArgs map[string]string, tenants []v1alpha1.Tenant, saName string) (*otelv1beta1.OpenTelemetryCollector, reconciler.DesiredState, error) {
func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelConfig otelv1beta1.Config, additionalArgs map[string]string, tenants []v1alpha1.Tenant, saName string) (*otelv1beta1.OpenTelemetryCollector, reconciler.DesiredState) {
otelCollector := otelv1beta1.OpenTelemetryCollector{
TypeMeta: metav1.TypeMeta{
APIVersion: otelv1beta1.GroupVersion.String(),
Expand All @@ -407,9 +404,7 @@ func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelC
},
}
appendAdditionalVolumesForTenantsFileStorage(&otelCollector.Spec.OpenTelemetryCommonFields, tenants)
if err := setOtelCommonFieldsDefaults(&otelCollector.Spec.OpenTelemetryCommonFields, additionalArgs, saName); err != nil {
return &otelCollector, nil, err
}
setOtelCommonFieldsDefaults(&otelCollector.Spec.OpenTelemetryCommonFields, additionalArgs, saName)

if memoryLimit := collector.Spec.GetMemoryLimit(); memoryLimit != nil {
// Calculate 80% of the specified memory limit for GOMEMLIMIT
Expand All @@ -428,7 +423,7 @@ func (r *CollectorReconciler) otelCollector(collector *v1alpha1.Collector, otelC
return nil
})

return &otelCollector, beforeUpdateHook, nil
return &otelCollector, beforeUpdateHook
}

func (r *CollectorReconciler) reconcileRBAC(ctx context.Context, collector *v1alpha1.Collector) (v1alpha1.NamespacedName, error) {
Expand Down Expand Up @@ -568,33 +563,36 @@ func normalizeStringSlice(inputList []string) []string {
return uniqueList
}

func appendAdditionalVolumesForTenantsFileStorage(otelComonFields *otelv1beta1.OpenTelemetryCommonFields, tenants []v1alpha1.Tenant) {
func appendAdditionalVolumesForTenantsFileStorage(otelCommonFields *otelv1beta1.OpenTelemetryCommonFields, tenants []v1alpha1.Tenant) {
var volumeMountsForInit []corev1.VolumeMount
var chmodCommands []string

for _, tenant := range tenants {
if tenant.Spec.PersistenceConfig.EnableFileStorage {
bufferVolumeName := fmt.Sprintf("buffervolume-%s", tenant.Name)
mountPath := storage.DetermineFileStorageDirectory(tenant.Spec.PersistenceConfig.Directory)
volumeMount := corev1.VolumeMount{
Name: bufferVolumeName,
MountPath: mountPath,
}
volumeMountsForInit = append(volumeMountsForInit, volumeMount)
chmodCommands = append(chmodCommands, fmt.Sprintf("chmod -R 777 %s", mountPath))

otelCommonFields.VolumeMounts = append(otelCommonFields.VolumeMounts, volumeMount)
switch tenant.Spec.PersistenceConfig.VolumeSource {
case "hostPath":
otelComonFields.VolumeMounts = append(otelComonFields.VolumeMounts, corev1.VolumeMount{
Name: fmt.Sprintf("buffervolume/%s", tenant.Name),
MountPath: mountPath,
})
otelComonFields.Volumes = append(otelComonFields.Volumes, corev1.Volume{
Name: fmt.Sprintf("buffervolume/%s", tenant.Name),
otelCommonFields.Volumes = append(otelCommonFields.Volumes, corev1.Volume{
Name: bufferVolumeName,
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: mountPath,
Type: utils.ToPtr(corev1.HostPathDirectoryOrCreate),
},
},
})

case "emptyDir":
otelComonFields.VolumeMounts = append(otelComonFields.VolumeMounts, corev1.VolumeMount{
Name: fmt.Sprintf("buffervolume/%s", tenant.Name),
MountPath: mountPath,
})
otelComonFields.Volumes = append(otelComonFields.Volumes, corev1.Volume{
Name: fmt.Sprintf("buffervolume/%s", tenant.Name),
otelCommonFields.Volumes = append(otelCommonFields.Volumes, corev1.Volume{
Name: bufferVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumDefault,
Expand All @@ -604,18 +602,33 @@ func appendAdditionalVolumesForTenantsFileStorage(otelComonFields *otelv1beta1.O
}
}
}

// Add a single initContainer to handle all chmod operations
if len(chmodCommands) > 0 && len(volumeMountsForInit) > 0 {
initContainer := corev1.Container{
Name: "init-chmod",
Image: "busybox",
Command: []string{"sh", "-c"},
Args: []string{strings.Join(chmodCommands, " && ")},
VolumeMounts: volumeMountsForInit,
}
otelCommonFields.InitContainers = append(otelCommonFields.InitContainers, initContainer)
}
}

func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryCommonFields, additionalArgs map[string]string, saName string) error {
func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryCommonFields, additionalArgs map[string]string, saName string) {
if otelCommonFields == nil {
otelCommonFields = &otelv1beta1.OpenTelemetryCommonFields{}
}

otelCommonFields.Image = axoflowOtelCollectorImageRef
otelCommonFields.ServiceAccount = saName

if err := mergo.Merge(&otelCommonFields.Args, additionalArgs, mergo.WithOverride); err != nil {
return err
if otelCommonFields.Args == nil {
otelCommonFields.Args = make(map[string]string)
}
for key, value := range additionalArgs {
otelCommonFields.Args[key] = value
}

volumeMounts := []corev1.VolumeMount{
Expand All @@ -630,9 +643,7 @@ func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryComm
MountPath: "/var/lib/docker/containers",
},
}
if err := mergo.Merge(&otelCommonFields.VolumeMounts, volumeMounts, mergo.WithOverride); err != nil {
return err
}
otelCommonFields.VolumeMounts = append(otelCommonFields.VolumeMounts, volumeMounts...)

volumes := []corev1.Volume{
{
Expand All @@ -652,9 +663,5 @@ func setOtelCommonFieldsDefaults(otelCommonFields *otelv1beta1.OpenTelemetryComm
},
},
}
if err := mergo.Merge(&otelCommonFields.Volumes, volumes, mergo.WithOverride); err != nil {
return err
}

return nil
otelCommonFields.Volumes = append(otelCommonFields.Volumes, volumes...)
}
6 changes: 1 addition & 5 deletions internal/controller/telemetry/collector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,7 @@ func TestSetOtelCommonFieldsDefaults(t *testing.T) {
if ttp.initialCommonFields != nil {
commonFieldsCopy = ttp.initialCommonFields.DeepCopy()
}

err := setOtelCommonFieldsDefaults(commonFieldsCopy, ttp.additionalArgs, ttp.saName)
if err != nil {
assert.EqualError(t, err, ttp.expectedError.Error())
}
setOtelCommonFieldsDefaults(commonFieldsCopy, ttp.additionalArgs, ttp.saName)

assert.Equal(t, ttp.expectedResult, commonFieldsCopy)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ exporters: {}
extensions:
bearertokenauth/collector_otlp-test-output:
token: testtoken
filestorage/example-tenant-a:
file_storage/example-tenant-a:
create_directory: true
directory: /var/lib/otelcol/file_storage
filestorage/example-tenant-b:
file_storage/example-tenant-b:
create_directory: true
directory: /var/lib/otelcol/file_storage
processors:
Expand Down Expand Up @@ -166,8 +166,8 @@ receivers: {}
service:
extensions:
- bearertokenauth/collector_otlp-test-output
- filestorage/example-tenant-a
- filestorage/example-tenant-b
- file_storage/example-tenant-a
- file_storage/example-tenant-b
pipelines:
logs/output_example-tenant-a-ns_subscription-example-1_collector_loki-test-output:
exporters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (cfgInput *OtelColConfigInput) generateExtensions() (map[string]any, []stri

for _, tenant := range cfgInput.Tenants {
if tenant.Spec.PersistenceConfig.EnableFileStorage {
extensions[fmt.Sprintf("filestorage/%s", tenant.Name)] = storage.GenerateFileStorageExtensionForTenant(tenant.Spec.PersistenceConfig.Directory)
extensions[fmt.Sprintf("file_storage/%s", tenant.Name)] = storage.GenerateFileStorageExtensionForTenant(tenant.Spec.PersistenceConfig.Directory)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func GenerateFluentforwardExporters(ctx context.Context, resourceRelations compo
continue
}
if tenant.Spec.PersistenceConfig.EnableFileStorage {
output.Output.Spec.Fluentforward.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("filestorage/%s", tenant.Name))
output.Output.Spec.Fluentforward.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("file_storage/%s", tenant.Name))
}

fluentForwardMarshaled, err := json.Marshal(output.Output.Spec.Fluentforward)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestGenerateFluentforwardExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestGenerateFluentforwardExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func GenerateOTLPGRPCExporters(ctx context.Context, resourceRelations components
continue
}
if tenant.Spec.PersistenceConfig.EnableFileStorage {
output.Output.Spec.OTLPGRPC.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("filestorage/%s", tenant.Name))
output.Output.Spec.OTLPGRPC.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("file_storage/%s", tenant.Name))
}

if output.Output.Spec.Authentication != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestGenerateOTLPGRPCExporters(t *testing.T) {
"enabled": true,
"num_consumers": float64(10),
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func GenerateOTLPHTTPExporters(ctx context.Context, resourceRelations components
continue
}
if tenant.Spec.PersistenceConfig.EnableFileStorage {
output.Output.Spec.OTLPHTTP.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("filestorage/%s", tenant.Name))
output.Output.Spec.OTLPHTTP.QueueConfig.Storage = utils.ToPtr(fmt.Sprintf("file_storage/%s", tenant.Name))
}

if output.Output.Spec.Authentication != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) {
"sending_queue": map[string]any{
"enabled": true,
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestGenerateOTLPHTTPExporters(t *testing.T) {
"enabled": true,
"num_consumers": float64(10),
"queue_size": float64(100),
"storage": fmt.Sprintf("filestorage/%s", testTenantName),
"storage": fmt.Sprintf("file_storage/%s", testTenantName),
},
"retry_on_failure": map[string]any{
"enabled": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func GenerateDefaultKubernetesReceiver(namespaces []string, tenant v1alpha1.Tena
},
}
if tenant.Spec.EnableFileStorage {
k8sReceiver["storage"] = fmt.Sprintf("filestorage/%s", tenant.Name)
k8sReceiver["storage"] = fmt.Sprintf("file_storage/%s", tenant.Name)
}

return k8sReceiver
Expand Down

0 comments on commit c05fe66

Please sign in to comment.