Skip to content

Commit

Permalink
feat(persistence): add file_storage
Browse files Browse the repository at this point in the history
Signed-off-by: Szilard Parrag <szilard.parrag@axoflow.com>
  • Loading branch information
OverOrion committed Feb 1, 2024
1 parent 876d472 commit 1ab569d
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 12 deletions.
1 change: 1 addition & 0 deletions api/telemetry/v1alpha1/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
type CollectorSpec struct {
TenantSelector metav1.LabelSelector `json:"tenantSelector,omitempty"`
ControlNamespace string `json:"controlNamespace"`
AtomicPersist bool `json:"atomicPersist,omitempty"`
}

// CollectorStatus defines the observed state of Collector
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
spec:
description: CollectorSpec defines the desired state of Collector
properties:
atomicPersist:
type: boolean
controlNamespace:
type: string
tenantSelector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ metadata:
name: example-collector
spec:
controlNamespace: collector
atomicPersist: true
tenantSelector:
matchLabels:
collectorLabel: example-collector
Expand Down
38 changes: 29 additions & 9 deletions e2e/e2e_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ create_if_does_not_exist() {
KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME_E2E:-so-e2e}
NO_KIND_CLEANUP=${NO_KIND_CLEANUP:-}
CI_MODE=${CI_MODE:-}
# Backup current kubernetes context
CURRENT_K8S_CTX=$(kubectl config view | grep "current" | cut -f 2 -d : | xargs)

# Prepare env
kind create cluster --name "${KIND_CLUSTER_NAME}" --wait 5m
Expand Down Expand Up @@ -48,23 +46,45 @@ else
fi

# Create log-generator
helm install --wait --create-namespace --namespace example-tenant-ns --generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator
helm install --wait \
--create-namespace \
--namespace example-tenant-ns \
--generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator \
--debug \
--set app.count=0

LOG_GENERATOR_POD=$(kubectl get pods -A -o custom-columns=':metadata.name' | grep log-generator)
kubectl port-forward -n example-tenant-ns "pod/${LOG_GENERATOR_POD}" 11000:11000 &
sleep 5
JSON_PAYLOAD='{ "type": "web", "format": "apache", "count": 100 }'
curl --location --request POST '127.0.0.1:11000/loggen' --header 'Content-Type: application/json' --data-raw "${JSON_PAYLOAD}"

# Make sure log generator only generates n log messages
EXPECTED_NUMBER_OF_LOGS=$(echo "$JSON_PAYLOAD" | jq .count)

EXPECTED_NUMBER_OF_LOGS_CHECKPOINT=15
# Check for received messages - subscription-sample
while
echo "Checking for subscription-sample-1 in deployments/receiver-collector logs"
kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -q "subscription-sample-1"
echo "Checking for subscription-sample-1 in deployments/receiver-collector logs, expected: ${EXPECTED_NUMBER_OF_LOGS}"
NUM_OF_LOGS=$(kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -c "subscription-sample-1")
echo "Found logs: ${NUM_OF_LOGS}"

if [[ $NUM_OF_LOGS -eq $EXPECTED_NUMBER_OF_LOGS_CHECKPOINT ]]; then
# Kill the telemetry controller to assert persist works
TELEMETRY_CONTROLLER_POD=$(kubectl get pods -A -o custom-columns=':metadata.name' | grep subscription-operator-controller-manager)
kubectl delete pod --namespace subscription-operator-system "${TELEMETRY_CONTROLLER_POD}"
fi


[[ $? -ne 0 ]]
[[ $NUM_OF_LOGS -ne $EXPECTED_NUMBER_OF_LOGS ]]
do true; done

# Check for received messages - subscription-sample-2
while
echo "Checking for subscription-sample-2 in deployments/receiver-collector logs"
kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -q "subscription-sample-2"
echo "Checking for subscription-sample-2 in deployments/receiver-collector logs, expected: ${EXPECTED_NUMBER_OF_LOGS}"
NUM_OF_LOGS=$(kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -c "subscription-sample-2")

[[ $? -ne 0 ]]
[[ $NUM_OF_LOGS -ne $EXPECTED_NUMBER_OF_LOGS ]]
do true; done

echo "E2E test: PASSED"
Expand Down
37 changes: 37 additions & 0 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ import (
otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
)

const (
PersistPath = "/opt/telemetry-controller/persist"
DefaultMountContainerImage = "busybox"
DefaultMountContainerImageTag = "latest"
)

// CollectorReconciler reconciles a Collector object
type CollectorReconciler struct {
client.Client
Expand Down Expand Up @@ -145,6 +151,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
Outputs: outputs,
TenantSubscriptionMap: tenantSubscriptionMap,
SubscriptionOutputMap: subscriptionOutputMap,
AtomicPersist: collector.Spec.AtomicPersist,
}

otelConfig, err := otelConfigInput.ToIntermediateRepresentation().ToYAML()
Expand Down Expand Up @@ -201,6 +208,36 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
},
}

if collector.Spec.AtomicPersist {
persistVolumeMount := apiv1.VolumeMount{
Name: "persist",
ReadOnly: false,
MountPath: PersistPath,
}
otelCollector.Spec.VolumeMounts = append(otelCollector.Spec.VolumeMounts, persistVolumeMount)

mountInitContainer := apiv1.Container{
Name: "persist-mount-fix",
Image: fmt.Sprintf("%s:%s", DefaultMountContainerImage, DefaultMountContainerImageTag),
Command: []string{"sh", "-c", "chmod -R 777 " + PersistPath},

VolumeMounts: []apiv1.VolumeMount{persistVolumeMount},
}
otelCollector.Spec.InitContainers = append(otelCollector.Spec.InitContainers, mountInitContainer)

persistVolumeType := apiv1.HostPathDirectoryOrCreate
persistVolume := apiv1.Volume{
Name: "persist",
VolumeSource: apiv1.VolumeSource{
HostPath: &apiv1.HostPathVolumeSource{
Path: PersistPath,
Type: &persistVolumeType,
},
},
}
otelCollector.Spec.Volumes = append(otelCollector.Spec.Volumes, persistVolume)
}

if err := ctrl.SetControllerReference(collector, &otelCollector, r.Scheme); err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
extensions:
file_storage/persist:
directory: /opt/telemetry-controller/persist
fsync: false
receivers:
filelog/kubernetes:
exclude:
Expand Down Expand Up @@ -54,7 +58,7 @@ receivers:
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
start_at: end
storage: file_storage/persist
exporters:
logging/debug:
verbosity: detailed
Expand Down Expand Up @@ -111,6 +115,8 @@ connectors:
- statement: 'route() where '
pipelines: [logs/tenant_example-tenant]
service:
extensions:
[file_storage/persist]
pipelines:
logs/all:
receivers: [filelog/kubernetes]
Expand Down
15 changes: 13 additions & 2 deletions internal/controller/telemetry/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type OtelColConfigInput struct {
Tenants []v1alpha1.Tenant
Subscriptions []v1alpha1.Subscription
Outputs []v1alpha1.OtelOutput
AtomicPersist bool

// Subscriptions map, where the key is the Tenants' namespaced name, value is a slice of subscriptions' namespaced name
TenantSubscriptionMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName
Expand Down Expand Up @@ -69,12 +70,13 @@ type Pipelines struct {
}

type Services struct {
Extensions map[string]any `yaml:"extensions,omitempty"`
Extensions []string `yaml:"extensions,omitempty"`
Pipelines Pipelines `yaml:"pipelines,omitempty"`
Telemetry map[string]any `yaml:"telemetry,omitempty"`
}

type OtelColConfigIR struct {
Extensions map[string]any `yaml:"extensions,omitempty"`
Receivers map[string]any `yaml:"receivers,omitempty"`
Exporters map[string]any `yaml:"exporters,omitempty"`
Processors map[string]any `yaml:"processors,omitempty"`
Expand Down Expand Up @@ -439,10 +441,10 @@ func (cfgInput *OtelColConfigInput) generateDefaultKubernetesReceiver() map[stri
k8sReceiver := map[string]any{
"include": []string{"/var/log/pods/*/*/*.log"},
"exclude": []string{"/var/log/pods/*/otc-container/*.log"},
"start_at": "end",
"include_file_path": true,
"include_file_name": false,
"operators": operators,
"storage": "file_storage/persist",
}

return k8sReceiver
Expand All @@ -452,6 +454,13 @@ func (cfgInput *OtelColConfigInput) generateDefaultKubernetesReceiver() map[stri
func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfigIR {
result := OtelColConfigIR{}

fileStorageName := "file_storage/persist"
result.Extensions = make(map[string]any)
result.Extensions[fileStorageName] = map[string]any{
"directory": PersistPath,
"fsync": cfgInput.AtomicPersist,
}

// Get outputs based tenant names
result.Exporters = cfgInput.generateExporters()

Expand All @@ -469,6 +478,8 @@ func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfi

result.Services.Telemetry = make(map[string]any)

result.Services.Extensions = append(result.Services.Extensions, fileStorageName)

return &result
}

Expand Down

0 comments on commit 1ab569d

Please sign in to comment.