Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(persistence): add file_storage #20

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/telemetry/v1alpha1/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
type CollectorSpec struct {
TenantSelector metav1.LabelSelector `json:"tenantSelector,omitempty"`
ControlNamespace string `json:"controlNamespace"`
Fsync bool `json:"fsync,omitempty"`
}

// CollectorStatus defines the observed state of Collector
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/telemetry.kube-logging.dev_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
properties:
controlNamespace:
type: string
fsync:
type: boolean
tenantSelector:
description: |-
A label selector is a label query over a set of resources. The result of matchLabels and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ metadata:
name: example-collector
spec:
controlNamespace: collector
fsync: false
tenantSelector:
matchLabels:
collectorLabel: example-collector
Expand Down
38 changes: 29 additions & 9 deletions e2e/e2e_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ create_if_does_not_exist() {
KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME_E2E:-so-e2e}
NO_KIND_CLEANUP=${NO_KIND_CLEANUP:-}
CI_MODE=${CI_MODE:-}
# Backup current kubernetes context
CURRENT_K8S_CTX=$(kubectl config view | grep "current" | cut -f 2 -d : | xargs)

# Prepare env
kind create cluster --name "${KIND_CLUSTER_NAME}" --wait 5m
Expand Down Expand Up @@ -48,23 +46,45 @@ else
fi

# Create log-generator
helm install --wait --create-namespace --namespace example-tenant-ns --generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator
helm install --wait \
--create-namespace \
--namespace example-tenant-ns \
--generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator \
--debug \
--set app.count=0

LOG_GENERATOR_POD=$(kubectl get pods -A -o custom-columns=':metadata.name' | grep log-generator)
kubectl port-forward -n example-tenant-ns "pod/${LOG_GENERATOR_POD}" 11000:11000 &
sleep 5
JSON_PAYLOAD='{ "type": "web", "format": "apache", "count": 100 }'
curl --location --request POST '127.0.0.1:11000/loggen' --header 'Content-Type: application/json' --data-raw "${JSON_PAYLOAD}"

# Make sure log generator only generates n log messages
EXPECTED_NUMBER_OF_LOGS=$(echo "$JSON_PAYLOAD" | jq .count)

EXPECTED_NUMBER_OF_LOGS_CHECKPOINT=15
# Check for received messages - subscription-sample
while
echo "Checking for subscription-sample-1 in deployments/receiver-collector logs"
kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -q "subscription-sample-1"
echo "Checking for subscription-sample-1 in deployments/receiver-collector logs, expected: ${EXPECTED_NUMBER_OF_LOGS}"
NUM_OF_LOGS=$(kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -c "subscription-sample-1")
echo "Found logs: ${NUM_OF_LOGS}"

if [[ $NUM_OF_LOGS -eq $EXPECTED_NUMBER_OF_LOGS_CHECKPOINT ]]; then
# Kill the telemetry controller to assert persist works
TELEMETRY_CONTROLLER_POD=$(kubectl get pods -A -o custom-columns=':metadata.name' | grep subscription-operator-controller-manager)
kubectl delete pod --namespace subscription-operator-system "${TELEMETRY_CONTROLLER_POD}"
fi


[[ $? -ne 0 ]]
[[ $NUM_OF_LOGS -ne $EXPECTED_NUMBER_OF_LOGS ]]
do true; done

# Check for received messages - subscription-sample-2
while
echo "Checking for subscription-sample-2 in deployments/receiver-collector logs"
kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -q "subscription-sample-2"
echo "Checking for subscription-sample-2 in deployments/receiver-collector logs, expected: ${EXPECTED_NUMBER_OF_LOGS}"
NUM_OF_LOGS=$(kubectl logs --namespace example-tenant-ns deployments/receiver-collector | grep -c "subscription-sample-2")

[[ $? -ne 0 ]]
[[ $NUM_OF_LOGS -ne $EXPECTED_NUMBER_OF_LOGS ]]
do true; done

echo "E2E test: PASSED"
Expand Down
37 changes: 37 additions & 0 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ import (
otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
)

const (
PersistPath = "/opt/telemetry-controller/persist"
ReceiversPersistPath = PersistPath + "/receivers"
ExportersPersistPath = PersistPath + "/exporters"
DefaultMountContainerImage = "busybox"
DefaultMountContainerImageTag = "latest"
)

// CollectorReconciler reconciles a Collector object
type CollectorReconciler struct {
client.Client
Expand Down Expand Up @@ -145,6 +153,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
Outputs: outputs,
TenantSubscriptionMap: tenantSubscriptionMap,
SubscriptionOutputMap: subscriptionOutputMap,
Fsync: collector.Spec.Fsync,
}

otelConfig, err := otelConfigInput.ToIntermediateRepresentation().ToYAML()
Expand Down Expand Up @@ -201,6 +210,34 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
},
}

persistVolumeMount := apiv1.VolumeMount{
Name: "persist",
ReadOnly: false,
MountPath: PersistPath,
}
otelCollector.Spec.VolumeMounts = append(otelCollector.Spec.VolumeMounts, persistVolumeMount)

mountInitContainer := apiv1.Container{
Name: "persist-mount-fix",
Image: fmt.Sprintf("%s:%s", DefaultMountContainerImage, DefaultMountContainerImageTag),
Command: []string{"sh", "-c", "mkdir -p " + ReceiversPersistPath + "; mkdir -p " + ExportersPersistPath + "; " + "chmod -R 777 " + PersistPath},

VolumeMounts: []apiv1.VolumeMount{persistVolumeMount},
}
otelCollector.Spec.InitContainers = append(otelCollector.Spec.InitContainers, mountInitContainer)

persistVolumeType := apiv1.HostPathDirectoryOrCreate
persistVolume := apiv1.Volume{
Name: "persist",
VolumeSource: apiv1.VolumeSource{
HostPath: &apiv1.HostPathVolumeSource{
Path: PersistPath,
Type: &persistVolumeType,
},
},
}
otelCollector.Spec.Volumes = append(otelCollector.Spec.Volumes, persistVolume)

if err := ctrl.SetControllerReference(collector, &otelCollector, r.Scheme); err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
extensions:
file_storage/receiver:
directory: /opt/telemetry-controller/persist/receivers
fsync: false
file_storage/exporters:
directory: /opt/telemetry-controller/persist/exporters
fsync: false
receivers:
filelog/kubernetes:
exclude:
Expand Down Expand Up @@ -54,16 +61,20 @@ receivers:
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
start_at: end
storage: file_storage/receiver
exporters:
logging/debug:
verbosity: detailed
otlp/collector_otlp-test-output:
endpoint: receiver-collector.example-tenant-ns.svc.cluster.local:4317
sending_queue:
storage: "file_storage/exporters"
tls:
insecure: true
otlp/collector_otlp-test-output-2:
endpoint: receiver-collector.example-tenant-ns.svc.cluster.local:4317
sending_queue:
storage: "file_storage/exporters"
tls:
insecure: true
processors:
Expand Down Expand Up @@ -111,6 +122,8 @@ connectors:
- statement: 'route() where '
pipelines: [logs/tenant_example-tenant]
service:
extensions:
[file_storage/receiver, file_storage/exporters]
pipelines:
logs/all:
receivers: [filelog/kubernetes]
Expand Down
27 changes: 25 additions & 2 deletions internal/controller/telemetry/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ import (
"gopkg.in/yaml.v3"
)

const (
ReceiverFileStorageName = "file_storage/receiver"
ExportersFileStorageName = "file_storage/exporters"
)

// TODO move this to its appropriate place
type OtelColConfigInput struct {
Tenants []v1alpha1.Tenant
Subscriptions []v1alpha1.Subscription
Outputs []v1alpha1.OtelOutput
Fsync bool

// Subscriptions map, where the key is the Tenants' namespaced name, value is a slice of subscriptions' namespaced name
TenantSubscriptionMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName
Expand Down Expand Up @@ -69,12 +75,13 @@ type Pipelines struct {
}

type Services struct {
Extensions map[string]any `yaml:"extensions,omitempty"`
Extensions []string `yaml:"extensions,omitempty"`
Pipelines Pipelines `yaml:"pipelines,omitempty"`
Telemetry map[string]any `yaml:"telemetry,omitempty"`
}

type OtelColConfigIR struct {
Extensions map[string]any `yaml:"extensions,omitempty"`
Receivers map[string]any `yaml:"receivers,omitempty"`
Exporters map[string]any `yaml:"exporters,omitempty"`
Processors map[string]any `yaml:"processors,omitempty"`
Expand All @@ -100,6 +107,9 @@ func (cfgInput *OtelColConfigInput) generateOTLPExporters() map[string]any {
"tls": map[string]any{
"insecure": output.Spec.OTLP.TLSSetting.Insecure,
},
"sending_queue": map[string]any{
"storage": ExportersFileStorageName,
},
}
}

Expand Down Expand Up @@ -439,10 +449,10 @@ func (cfgInput *OtelColConfigInput) generateDefaultKubernetesReceiver() map[stri
k8sReceiver := map[string]any{
"include": []string{"/var/log/pods/*/*/*.log"},
"exclude": []string{"/var/log/pods/*/otc-container/*.log"},
"start_at": "end",
"include_file_path": true,
"include_file_name": false,
"operators": operators,
"storage": ReceiverFileStorageName,
}

return k8sReceiver
Expand All @@ -452,6 +462,16 @@ func (cfgInput *OtelColConfigInput) generateDefaultKubernetesReceiver() map[stri
func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfigIR {
result := OtelColConfigIR{}

result.Extensions = make(map[string]any)
result.Extensions[ReceiverFileStorageName] = map[string]any{
"directory": ReceiversPersistPath,
"fsync": cfgInput.Fsync,
}
result.Extensions[ExportersFileStorageName] = map[string]any{
"directory": ExportersPersistPath,
"fsync": cfgInput.Fsync,
}

// Get outputs based tenant names
result.Exporters = cfgInput.generateExporters()

Expand All @@ -469,6 +489,9 @@ func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfi

result.Services.Telemetry = make(map[string]any)

result.Services.Extensions = append(result.Services.Extensions, ReceiverFileStorageName)
result.Services.Extensions = append(result.Services.Extensions, ExportersFileStorageName)

return &result
}

Expand Down
Loading