From 1ab569dbb8ef2d359796c59d925c7d1c2812947b Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Wed, 31 Jan 2024 14:31:58 +0100 Subject: [PATCH] feat(persistence): add file_storage Signed-off-by: Szilard Parrag --- api/telemetry/v1alpha1/collector_types.go | 1 + ...telemetry.kube-logging.dev_collectors.yaml | 2 + .../one_tenant_two_subscriptions.yaml | 1 + e2e/e2e_test.sh | 38 ++++++++++++++----- .../telemetry/collector_controller.go | 37 ++++++++++++++++++ .../otel_col_conf_test_fixtures/complex.yaml | 8 +++- .../controller/telemetry/otel_conf_gen.go | 15 +++++++- 7 files changed, 90 insertions(+), 12 deletions(-) diff --git a/api/telemetry/v1alpha1/collector_types.go b/api/telemetry/v1alpha1/collector_types.go index 933fccee..1894c12d 100644 --- a/api/telemetry/v1alpha1/collector_types.go +++ b/api/telemetry/v1alpha1/collector_types.go @@ -22,6 +22,7 @@ import ( type CollectorSpec struct { TenantSelector metav1.LabelSelector `json:"tenantSelector,omitempty"` ControlNamespace string `json:"controlNamespace"` + AtomicPersist bool `json:"atomicPersist,omitempty"` } // CollectorStatus defines the observed state of Collector diff --git a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml index 8c6ab426..cb2d0680 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml @@ -43,6 +43,8 @@ spec: spec: description: CollectorSpec defines the desired state of Collector properties: + atomicPersist: + type: boolean controlNamespace: type: string tenantSelector: diff --git a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml index 773ed447..063bfc6e 100644 --- a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml +++ b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml @@ -16,6 +16,7 @@ metadata: name: example-collector spec: controlNamespace: collector + atomicPersist: true tenantSelector: matchLabels: collectorLabel: example-collector diff --git a/e2e/e2e_test.sh b/e2e/e2e_test.sh index da9cc023..23a6518a 100755 --- a/e2e/e2e_test.sh +++ b/e2e/e2e_test.sh @@ -11,8 +11,6 @@ create_if_does_not_exist() { KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME_E2E:-so-e2e} NO_KIND_CLEANUP=${NO_KIND_CLEANUP:-} CI_MODE=${CI_MODE:-} - # Backup current kubernetes context -CURRENT_K8S_CTX=$(kubectl config view | grep "current" | cut -f 2 -d : | xargs) # Prepare env kind create cluster --name "${KIND_CLUSTER_NAME}" --wait 5m @@ -48,23 +46,45 @@ else fi # Create log-generator -helm install --wait --create-namespace --namespace example-tenant-ns --generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator +helm install --wait \ + --create-namespace \ + --namespace example-tenant-ns \ + --generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator \ + --debug \ + --set app.count=0 + +LOG_GENERATOR_POD=$(kubectl get pods -A -o custom-columns=':metadata.name' | grep log-generator) +kubectl port-forward -n example-tenant-ns "pod/${LOG_GENERATOR_POD}" 11000:11000 & +sleep 5 +JSON_PAYLOAD='{ "type": "web", "format": "apache", "count": 100 }' +curl --location --request POST '127.0.0.1:11000/loggen' --header 'Content-Type: application/json' --data-raw "${JSON_PAYLOAD}" +# Make sure log generator only generates n log messages +EXPECTED_NUMBER_OF_LOGS=$(echo "$JSON_PAYLOAD" | jq .count) +EXPECTED_NUMBER_OF_LOGS_CHECKPOINT=15 # Check for received messages - subscription-sample while - echo "Checking for subscription-sample-1 in deployments/receiver-collector logs" - kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -q "subscription-sample-1" + echo "Checking for subscription-sample-1 in deployments/receiver-collector logs, expected: ${EXPECTED_NUMBER_OF_LOGS}" + NUM_OF_LOGS=$(kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -c "subscription-sample-1") + echo "Found logs: ${NUM_OF_LOGS}" + + if [[ $NUM_OF_LOGS -eq $EXPECTED_NUMBER_OF_LOGS_CHECKPOINT ]]; then + # Kill the telemetry controller to assert persist works + TELEMETRY_CONTROLLER_POD=$(kubectl get pods -A -o custom-columns=':metadata.name' | grep subscription-operator-controller-manager) + kubectl delete pod --namespace subscription-operator-system "${TELEMETRY_CONTROLLER_POD}" + fi + - [[ $? -ne 0 ]] + [[ $NUM_OF_LOGS -ne $EXPECTED_NUMBER_OF_LOGS ]] do true; done # Check for received messages - subscription-sample-2 while - echo "Checking for subscription-sample-2 in deployments/receiver-collector logs" - kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -q "subscription-sample-2" + echo "Checking for subscription-sample-2 in deployments/receiver-collector logs, expected: ${EXPECTED_NUMBER_OF_LOGS}" + NUM_OF_LOGS=$(kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -c "subscription-sample-2") - [[ $? -ne 0 ]] + [[ $NUM_OF_LOGS -ne $EXPECTED_NUMBER_OF_LOGS ]] do true; done echo "E2E test: PASSED" diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 4173e594..ef5a7611 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -35,6 +35,12 @@ import ( otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" ) +const ( + PersistPath = "/opt/telemetry-controller/persist" + DefaultMountContainerImage = "busybox" + DefaultMountContainerImageTag = "latest" +) + // CollectorReconciler reconciles a Collector object type CollectorReconciler struct { client.Client @@ -145,6 +151,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( Outputs: outputs, TenantSubscriptionMap: tenantSubscriptionMap, SubscriptionOutputMap: subscriptionOutputMap, + AtomicPersist: collector.Spec.AtomicPersist, } otelConfig, err := otelConfigInput.ToIntermediateRepresentation().ToYAML() @@ -201,6 +208,36 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( }, } + if collector.Spec.AtomicPersist { + persistVolumeMount := apiv1.VolumeMount{ + Name: "persist", + ReadOnly: false, + MountPath: PersistPath, + } + otelCollector.Spec.VolumeMounts = append(otelCollector.Spec.VolumeMounts, persistVolumeMount) + + mountInitContainer := apiv1.Container{ + Name: "persist-mount-fix", + Image: fmt.Sprintf("%s:%s", DefaultMountContainerImage, DefaultMountContainerImageTag), + Command: []string{"sh", "-c", "chmod -R 777 " + PersistPath}, + + VolumeMounts: []apiv1.VolumeMount{persistVolumeMount}, + } + otelCollector.Spec.InitContainers = append(otelCollector.Spec.InitContainers, mountInitContainer) + + persistVolumeType := apiv1.HostPathDirectoryOrCreate + persistVolume := apiv1.Volume{ + Name: "persist", + VolumeSource: apiv1.VolumeSource{ + HostPath: &apiv1.HostPathVolumeSource{ + Path: PersistPath, + Type: &persistVolumeType, + }, + }, + } + otelCollector.Spec.Volumes = append(otelCollector.Spec.Volumes, persistVolume) + } + if err := ctrl.SetControllerReference(collector, &otelCollector, r.Scheme); err != nil { return ctrl.Result{}, err } diff --git a/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml index 1505761b..d79dec29 100644 --- a/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml @@ -1,3 +1,7 @@ +extensions: + file_storage/persist: + directory: /opt/telemetry-controller/persist + fsync: false receivers: filelog/kubernetes: exclude: @@ -54,7 +58,7 @@ receivers: - from: attributes.uid to: resource["k8s.pod.uid"] type: move - start_at: end + storage: file_storage/persist exporters: logging/debug: verbosity: detailed @@ -111,6 +115,8 @@ connectors: - statement: 'route() where ' pipelines: [logs/tenant_example-tenant] service: + extensions: + [file_storage/persist] pipelines: logs/all: receivers: [filelog/kubernetes] diff --git a/internal/controller/telemetry/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen.go index 2ed08aac..0d116188 100644 --- a/internal/controller/telemetry/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen.go @@ -28,6 +28,7 @@ type OtelColConfigInput struct { Tenants []v1alpha1.Tenant Subscriptions []v1alpha1.Subscription Outputs []v1alpha1.OtelOutput + AtomicPersist bool // Subscriptions map, where the key is the Tenants' namespaced name, value is a slice of subscriptions' namespaced name TenantSubscriptionMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName @@ -69,12 +70,13 @@ type Pipelines struct { } type Services struct { - Extensions map[string]any `yaml:"extensions,omitempty"` + Extensions []string `yaml:"extensions,omitempty"` Pipelines Pipelines `yaml:"pipelines,omitempty"` Telemetry map[string]any `yaml:"telemetry,omitempty"` } type OtelColConfigIR struct { + Extensions map[string]any `yaml:"extensions,omitempty"` Receivers map[string]any `yaml:"receivers,omitempty"` Exporters map[string]any `yaml:"exporters,omitempty"` Processors map[string]any `yaml:"processors,omitempty"` @@ -439,10 +441,10 @@ func (cfgInput *OtelColConfigInput) generateDefaultKubernetesReceiver() map[stri k8sReceiver := map[string]any{ "include": []string{"/var/log/pods/*/*/*.log"}, "exclude": []string{"/var/log/pods/*/otc-container/*.log"}, - "start_at": "end", "include_file_path": true, "include_file_name": false, "operators": operators, + "storage": "file_storage/persist", } return k8sReceiver @@ -452,6 +454,13 @@ func (cfgInput *OtelColConfigInput) generateDefaultKubernetesReceiver() map[stri func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfigIR { result := OtelColConfigIR{} + fileStorageName := "file_storage/persist" + result.Extensions = make(map[string]any) + result.Extensions[fileStorageName] = map[string]any{ + "directory": PersistPath, + "fsync": cfgInput.AtomicPersist, + } + // Get outputs based tenant names result.Exporters = cfgInput.generateExporters() @@ -469,6 +478,8 @@ func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfi result.Services.Telemetry = make(map[string]any) + result.Services.Extensions = append(result.Services.Extensions, fileStorageName) + return &result }