From 14bf7f45660c124eb877efa3f2bf68ab011d7c75 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Mon, 11 Nov 2024 17:34:50 +0100 Subject: [PATCH] feat: further stabilize operator Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/bridge_types.go | 2 - .../telemetry.kube-logging.dev_bridges.yaml | 6 -- .../telemetry.kube-logging.dev_bridges.yaml | 6 -- .../telemetry/collector_controller.go | 77 +++++++------------ .../telemetry/otel_conf_gen/otel_conf_gen.go | 31 ++++++-- .../components/connector/routing_connector.go | 35 +++++++-- .../controller/telemetry/route_controller.go | 62 +++------------ 7 files changed, 91 insertions(+), 128 deletions(-) diff --git a/api/telemetry/v1alpha1/bridge_types.go b/api/telemetry/v1alpha1/bridge_types.go index ee061ae..d98c73c 100644 --- a/api/telemetry/v1alpha1/bridge_types.go +++ b/api/telemetry/v1alpha1/bridge_types.go @@ -29,7 +29,6 @@ type BridgeSpec struct { // BridgeStatus defines the observed state of Bridge type BridgeStatus struct { - State State `json:"state,omitempty"` } //+kubebuilder:object:root=true @@ -37,7 +36,6 @@ type BridgeStatus struct { //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Source Tenant",type=string,JSONPath=`.spec.sourceTenant` //+kubebuilder:printcolumn:name="Target Tenant",type=string,JSONPath=`.spec.targetTenant` -//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` // Bridge is the Schema for the Bridges API type Bridge struct { diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml index 0e12d33..f4be2a7 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml @@ -23,9 +23,6 @@ spec: - jsonPath: .spec.targetTenant name: Target Tenant type: string - - jsonPath: .status.state - name: State - type: string name: v1alpha1 schema: openAPIV3Schema: @@ -67,9 +64,6 @@ spec: type: object status: description: BridgeStatus defines the observed state of Bridge - properties: - state: - type: string type: object type: object served: true diff --git a/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml b/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml index 0e12d33..f4be2a7 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml @@ -23,9 +23,6 @@ spec: - jsonPath: .spec.targetTenant name: Target Tenant type: string - - jsonPath: .status.state - name: State - type: string name: v1alpha1 schema: openAPIV3Schema: @@ -67,9 +64,6 @@ spec: type: object status: description: BridgeStatus defines the observed state of Bridge - properties: - state: - type: string type: object type: object served: true diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 838dbe0..59be58b 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -27,7 +27,6 @@ import ( otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -44,6 +43,10 @@ import ( "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" ) +var ( + ErrTenantFailed = errors.New("tenant failed") +) + const ( requeueDelayOnFailedTenant = 20 * time.Second axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0" @@ -55,20 +58,15 @@ type CollectorReconciler struct { Scheme *runtime.Scheme } -type TenantFailedError struct { - msg string -} - type BasicAuthClientAuthConfig struct { Username string Password string } -func (e *TenantFailedError) Error() string { return e.msg } - func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, collector *v1alpha1.Collector) (otelcolconfgen.OtelColConfigInput, error) { logger := log.FromContext(ctx) tenantSubscriptionMap := make(map[string][]v1alpha1.NamespacedName) + var bridgesReferencedByTenant []v1alpha1.Bridge subscriptionOutputMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName) tenants, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector) @@ -81,15 +79,13 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, } for _, tenant := range tenants { - if tenant.Status.State == v1alpha1.StateFailed { logger.Info("tenant %q is in failed state, retrying later", tenant.Name) - return otelcolconfgen.OtelColConfigInput{}, &TenantFailedError{msg: "tenant failed"} + return otelcolconfgen.OtelColConfigInput{}, ErrTenantFailed } subscriptionNames := tenant.Status.Subscriptions tenantSubscriptionMap[tenant.Name] = subscriptionNames - for _, subsName := range subscriptionNames { queriedSubs := &v1alpha1.Subscription{} if err = r.Client.Get(ctx, types.NamespacedName(subsName), queriedSubs); err != nil { @@ -98,12 +94,22 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, } subscriptions[subsName] = *queriedSubs } + + bridgeNames := tenant.Status.ConnectedBridges + for _, bridgeName := range bridgeNames { + queriedBridge := &v1alpha1.Bridge{} + if err = r.Client.Get(ctx, types.NamespacedName{Name: bridgeName}, queriedBridge); err != nil { + logger.Error(errors.WithStack(err), "failed getting bridges for tenant", "tenant", tenant.Name) + return otelcolconfgen.OtelColConfigInput{}, err + } + + bridgesReferencedByTenant = append(bridgesReferencedByTenant, *queriedBridge) + } } for _, subscription := range subscriptions { outputNames := subscription.Status.Outputs subscriptionOutputMap[subscription.NamespacedName()] = outputNames - for _, outputName := range outputNames { outputWithSecretData := components.OutputWithSecretData{} @@ -117,29 +123,20 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, if err := r.populateSecretForOutput(ctx, queriedOutput, &outputWithSecretData); err != nil { return otelcolconfgen.OtelColConfigInput{}, err } - outputs = append(outputs, outputWithSecretData) } } - bridges, err := r.getBridges(ctx, client.ListOptions{}) - if err != nil { - logger.Error(errors.WithStack(err), "failed listing bridges") - return otelcolconfgen.OtelColConfigInput{}, err - } - - otelConfigInput := otelcolconfgen.OtelColConfigInput{ + return otelcolconfgen.OtelColConfigInput{ Tenants: tenants, Subscriptions: subscriptions, - Bridges: bridges, + Bridges: bridgesReferencedByTenant, OutputsWithSecretData: outputs, TenantSubscriptionMap: tenantSubscriptionMap, SubscriptionOutputMap: subscriptionOutputMap, Debug: collector.Spec.Debug, MemoryLimiter: *collector.Spec.MemoryLimiter, - } - - return otelConfigInput, nil + }, nil } func (r *CollectorReconciler) populateSecretForOutput(ctx context.Context, queriedOutput *v1alpha1.Output, outputWithSecret *components.OutputWithSecretData) error { @@ -182,10 +179,8 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( collector := &v1alpha1.Collector{} logger.Info(fmt.Sprintf("getting collector: %q", req.Name)) - if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil { - logger.Error(errors.New("failed getting collector, possible API server error"), "failed getting collector, possible API server error", - "collector", req.Name) - return ctrl.Result{}, err + if err := r.Get(ctx, req.NamespacedName, collector); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) } collector.Spec.SetDefaults() @@ -194,13 +189,13 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( otelConfigInput, err := r.buildConfigInputForCollector(ctx, collector) if err != nil { - if errors.Is(err, &TenantFailedError{}) { + if errors.Is(err, ErrTenantFailed) { return ctrl.Result{RequeueAfter: requeueDelayOnFailedTenant}, err } return ctrl.Result{}, err } - if err := otelConfigInput.ValidateConfig(); ignoreNoResourcesError(err) != nil { + if err := otelConfigInput.ValidateConfig(); err != nil { collector.Status.State = v1alpha1.StateFailed logger.Error(errors.WithStack(err), "invalid otel config input") return ctrl.Result{}, err @@ -281,10 +276,8 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( _, err = resourceReconciler.ReconcileResource(&otelCollector, reconciler.StatePresent) if err != nil { collector.Status.State = v1alpha1.StateFailed - if apierrors.IsConflict(err) { - logger.Info("conflict while creating otel collector, retrying") - return ctrl.Result{Requeue: true}, nil - } + logger.Error(errors.WithStack(err), "failed reconciling otel collector") + return ctrl.Result{}, err } tenantNames := []string{} @@ -297,6 +290,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if !reflect.DeepEqual(originalCollectorStatus, collector.Status) { logger.Info("collector status changed") if err = r.Client.Status().Update(ctx, collector); err != nil { + collector.Status.State = v1alpha1.StateFailed logger.Error(errors.WithStack(err), "failed updating collector status") return ctrl.Result{}, err } @@ -536,15 +530,6 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l return tenantsForSelector.Items, nil } -func (r *CollectorReconciler) getBridges(ctx context.Context, listOpts client.ListOptions) ([]v1alpha1.Bridge, error) { - var bridges v1alpha1.BridgeList - if err := r.Client.List(ctx, &bridges, &listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - return bridges.Items, nil -} - func normalizeStringSlice(inputList []string) []string { allKeys := make(map[string]bool) uniqueList := []string{} @@ -558,11 +543,3 @@ func normalizeStringSlice(inputList []string) []string { return uniqueList } - -func ignoreNoResourcesError(err error) error { - if err == otelcolconfgen.ErrNoResources { - return nil - } - - return err -} diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index f11724a..c98e47d 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "reflect" "slices" "github.com/hashicorp/go-multierror" @@ -49,6 +50,21 @@ type OtelColConfigInput struct { Debug bool } +func (cfgInput *OtelColConfigInput) IsEmpty() bool { + v := reflect.ValueOf(*cfgInput) + for i := range v.NumField() { + field := v.Field(i) + switch field.Kind() { + case reflect.Slice, reflect.Map: + if field.Len() != 0 { + return false + } + } + } + + return true +} + func (cfgInput *OtelColConfigInput) generateExporters(ctx context.Context) map[string]any { exporters := map[string]any{} maps.Copy(exporters, exporter.GenerateMetricsExporters()) @@ -248,9 +264,14 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be } } - extensionNames := make([]string, 0, len(extensions)) - for k := range extensions { - extensionNames = append(extensionNames, k) + var extensionNames []string + if len(extensions) > 0 { + extensionNames = make([]string, 0, len(extensions)) + for k := range extensions { + extensionNames = append(extensionNames, k) + } + } else { + extensionNames = nil } return otelv1beta1.Config{ @@ -320,16 +341,14 @@ func validateSubscriptionsAndBridges(tenants *[]v1alpha1.Tenant, subscriptions * } func (cfgInput *OtelColConfigInput) ValidateConfig() error { - if cfgInput == nil { + if cfgInput.IsEmpty() { return ErrNoResources } var result *multierror.Error - if err := validateTenants(&cfgInput.Tenants); err != nil { result = multierror.Append(result, err) } - if err := validateSubscriptionsAndBridges(&cfgInput.Tenants, &cfgInput.Subscriptions, &cfgInput.Bridges); err != nil { result = multierror.Append(result, err) } diff --git a/internal/controller/telemetry/pipeline/components/connector/routing_connector.go b/internal/controller/telemetry/pipeline/components/connector/routing_connector.go index 38f85df..29ea3df 100644 --- a/internal/controller/telemetry/pipeline/components/connector/routing_connector.go +++ b/internal/controller/telemetry/pipeline/components/connector/routing_connector.go @@ -103,6 +103,33 @@ func GenerateRoutingConnectorForBridge(bridge v1alpha1.Bridge) RoutingConnector return rc } +func hasPipelineReceiverOrExporter(pipeline *otelv1beta1.Pipeline, receiverName string) bool { + for _, receiver := range pipeline.Receivers { + if receiver == receiverName { + return true + } + } + + for _, exporter := range pipeline.Exporters { + if exporter == receiverName { + return true + } + } + + return false +} + +func addConnectorToPipeline(pipeline *otelv1beta1.Pipeline, connectorName string, needsReceiver, needsExporter bool) { + if !hasPipelineReceiverOrExporter(pipeline, connectorName) { + if needsReceiver { + pipeline.Receivers = append(pipeline.Receivers, connectorName) + } + if needsExporter { + pipeline.Exporters = append(pipeline.Exporters, connectorName) + } + } +} + func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (needsReceiver bool, needsExporter bool, bridgeName string) { if bridge.Spec.SourceTenant == tenantName { needsExporter = true @@ -118,11 +145,7 @@ func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (n func GenerateRoutingConnectorForBridgesTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline, bridges []v1alpha1.Bridge) { for _, bridge := range bridges { needsReceiver, needsExporter, bridgeName := checkBridgeConnectorForTenant(tenantName, bridge) - if needsReceiver { - pipeline.Receivers = append(pipeline.Receivers, fmt.Sprintf("routing/bridge_%s", bridgeName)) - } - if needsExporter { - pipeline.Exporters = append(pipeline.Exporters, fmt.Sprintf("routing/bridge_%s", bridgeName)) - } + connectorName := fmt.Sprintf("routing/bridge_%s", bridgeName) + addConnectorToPipeline(pipeline, connectorName, needsReceiver, needsExporter) } } diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go index b783f8e..a9037fe 100644 --- a/internal/controller/telemetry/route_controller.go +++ b/internal/controller/telemetry/route_controller.go @@ -19,6 +19,7 @@ import ( "fmt" "reflect" "slices" + "sort" "strings" "emperror.dev/errors" @@ -67,10 +68,8 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl tenant := &v1alpha1.Tenant{} logger.Info(fmt.Sprintf("getting tenant: %q", req.NamespacedName.Name)) - if err := r.Get(ctx, req.NamespacedName, tenant); client.IgnoreNotFound(err) != nil { - logger.Error(errors.New("failed getting tenant, possible API server error"), "failed getting tenant, possible API server error", - "tenant", req.NamespacedName.String()) - return ctrl.Result{}, err + if err := r.Get(ctx, req.NamespacedName, tenant); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) } originalTenantStatus := tenant.Status @@ -114,11 +113,11 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } else { subscription.Status.State = v1alpha1.StateReady } - subscription.Status.Outputs = validOutputs if !reflect.DeepEqual(originalSubscriptionStatus, subscription.Status) { if updateErr := r.Status().Update(ctx, &subscription); updateErr != nil { + subscription.Status.State = v1alpha1.StateFailed logger.Error(errors.WithStack(updateErr), "failed update subscription status", "subscription", subscription.NamespacedName().String()) return ctrl.Result{}, err } @@ -135,23 +134,9 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } return ctrl.Result{}, err } - tenant.Status.ConnectedBridges = getBridgeNamesFromBridges(bridgesForTenant) - - for _, bridge := range bridgesForTenant { - originalBridgeStatus := bridge.Status.DeepCopy() - if err := r.checkBridgeConnections(ctx, &bridge); err != nil { - bridge.Status.State = v1alpha1.StateFailed - logger.Error(errors.WithStack(err), "failed bridge connection verification", "bridge", bridge.Name) - } - - bridge.Status.State = v1alpha1.StateReady - if !reflect.DeepEqual(originalBridgeStatus, bridge.Status) { - if updateErr := r.Status().Update(ctx, &bridge); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed update bridge status", "bridge", bridge.Name) - return ctrl.Result{}, err - } - } - } + bridgesForTenantNames := getBridgeNamesFromBridges(bridgesForTenant) + sort.Strings(bridgesForTenantNames) + tenant.Status.ConnectedBridges = bridgesForTenantNames logsourceNamespacesForTenant, err := r.getLogsourceNamespaceNamesForTenant(ctx, tenant) if err != nil { @@ -170,6 +155,7 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl if !reflect.DeepEqual(originalTenantStatus, tenant.Status) { logger.Info("tenant status changed") if err := r.Status().Update(ctx, tenant); err != nil { + tenant.Status.State = v1alpha1.StateFailed logger.Error(errors.New("failed update tenant status"), "failed update tenant status", "tenant", tenant.Name) return ctrl.Result{}, err } @@ -362,8 +348,8 @@ func (r *RouteReconciler) disownSubscriptions(ctx context.Context, subscriptions for _, subscription := range subscriptionsToDisown { subscription.Status.Tenant = "" - if err := r.Client.Status().Update(ctx, &subscription); err != nil { + subscription.Status.State = v1alpha1.StateFailed logger.Error(err, fmt.Sprintf("failed to detach subscription %s/%s from collector", subscription.Namespace, subscription.Name)) } else { logger.Info("disowning subscription", "subscription", fmt.Sprintf("%s/%s", subscription.Namespace, subscription.Name)) @@ -381,6 +367,7 @@ func (r *RouteReconciler) updateSubscriptionsForTenant(ctx context.Context, tena logger.Info("updating subscription status for tenant ownership") if err := r.Status().Update(ctx, &subscription); err != nil { + subscription.Status.State = v1alpha1.StateFailed logger.Error(err, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", subscription.Namespace, subscription.Name, tenantName)) } else { updatedSubscriptions = append(updatedSubscriptions, subscription) @@ -465,35 +452,6 @@ func (r *RouteReconciler) getBridgesForTenant(ctx context.Context, tenantName st return } -func (r *RouteReconciler) getTenants(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Tenant, error) { - var tenants v1alpha1.TenantList - if err := r.Client.List(ctx, &tenants, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - return tenants.Items, nil -} - -func (r *RouteReconciler) checkBridgeConnections(ctx context.Context, bridge *v1alpha1.Bridge) error { - for _, tenant := range []string{bridge.Spec.SourceTenant, bridge.Spec.TargetTenant} { - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(tenantNameField, tenant), - } - tenants, err := r.getTenants(ctx, listOpts) - if err != nil { - return err - } - if len(tenants) == 0 { - return errors.Errorf("tenant: %s not found for bridge: %s", tenant, bridge.Name) - } - if len(tenants) != 1 || tenants[0].Name != tenant { - return errors.Errorf("bridge: %s has invalid tenant reference: %s", bridge.Name, tenant) - } - } - - return nil -} - func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { allKeys := make(map[string]bool) uniqueList := []apiv1.Namespace{}