diff --git a/api/telemetry/v1alpha1/collector_types.go b/api/telemetry/v1alpha1/collector_types.go index c654268..8a8d4ad 100644 --- a/api/telemetry/v1alpha1/collector_types.go +++ b/api/telemetry/v1alpha1/collector_types.go @@ -81,12 +81,14 @@ func (c CollectorSpec) GetMemoryLimit() *resource.Quantity { // CollectorStatus defines the observed state of Collector type CollectorStatus struct { Tenants []string `json:"tenants,omitempty"` + State State `json:"state,omitempty"` } //+kubebuilder:object:root=true //+kubebuilder:resource:scope=Cluster,categories=telemetry-all //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Tenants",type=string,JSONPath=`.status.tenants` +//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` // Collector is the Schema for the collectors API type Collector struct { diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml index 1326859..098d722 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml @@ -20,6 +20,9 @@ spec: - jsonPath: .status.tenants name: Tenants type: string + - jsonPath: .status.state + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -8071,6 +8074,8 @@ spec: status: description: CollectorStatus defines the observed state of Collector properties: + state: + type: string tenants: items: type: string diff --git a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml index 1326859..098d722 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml @@ -20,6 +20,9 @@ spec: - jsonPath: .status.tenants name: Tenants type: string + - jsonPath: .status.state + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -8071,6 +8074,8 @@ spec: status: description: CollectorStatus defines the observed state of Collector properties: + state: + type: string tenants: items: type: string diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml index 620a136..48165ae 100644 --- a/docs/examples/tenant-to-tenant-routing/pipeline.yaml +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -24,35 +24,6 @@ spec: logSourceNamespaceSelectors: - matchLabels: tenant: shared - subscriptionNamespaceSelectors: - - matchLabels: - tenant: shared ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Subscription -metadata: - name: shared - namespace: shared -spec: - condition: "true" - outputs: - - name: openobserve-shared - namespace: shared ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Output -metadata: - name: openobserve-shared - namespace: shared -spec: - otlp: - endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 - headers: - Authorization: "Basic " - organization: default - stream-name: shared - tls: - insecure: true --- # A tenant that consumes logs from the shared tenant using a bridge apiVersion: telemetry.kube-logging.dev/v1alpha1 @@ -62,9 +33,6 @@ metadata: collector: cluster name: database spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: database subscriptionNamespaceSelectors: - matchLabels: tenant: database @@ -112,9 +80,6 @@ metadata: collector: cluster name: web spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: web subscriptionNamespaceSelectors: - matchLabels: tenant: web diff --git a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml index 595f83f..48c5d8a 100644 --- a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml +++ b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml @@ -50,31 +50,6 @@ spec: logSourceNamespaceSelectors: - matchLabels: tenant: shared - subscriptionNamespaceSelectors: - - matchLabels: - tenant: shared ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Subscription -metadata: - name: shared - namespace: shared -spec: - condition: "true" - outputs: - - name: otlp-test-output-shared - namespace: collector ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Output -metadata: - name: otlp-test-output-shared - namespace: collector -spec: - otlp: - endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 - tls: - insecure: true --- # A tenant that consumes logs from the shared tenant using a bridge apiVersion: telemetry.kube-logging.dev/v1alpha1 @@ -84,9 +59,6 @@ metadata: collector: cluster name: database spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: database subscriptionNamespaceSelectors: - matchLabels: tenant: database @@ -130,9 +102,6 @@ metadata: collector: cluster name: web spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: web subscriptionNamespaceSelectors: - matchLabels: tenant: web diff --git a/go.mod b/go.mod index 5f0dec4..c1c56e3 100644 --- a/go.mod +++ b/go.mod @@ -25,9 +25,11 @@ require ( k8s.io/client-go v0.30.2 sigs.k8s.io/controller-runtime v0.18.4 sigs.k8s.io/yaml v1.4.0 + github.com/hashicorp/go-multierror v1.1.1 ) require ( + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/briandowns/spinner v1.23.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index 97484ae..6271394 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,11 @@ github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQN github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 6bf81c9..8cbda8c 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -27,6 +27,7 @@ import ( otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -178,14 +179,17 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( logger := log.FromContext(ctx, "collector", req.Name) collector := &v1alpha1.Collector{} - logger.Info("Reconciling collector") + logger.Info(fmt.Sprintf("getting collector: %q", req.Name)) if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil { + logger.Error(errors.New("failed getting collector, possible API server error"), "failed getting collector, possible API server error", + "collector", req.Name) return ctrl.Result{}, err } collector.Spec.SetDefaults() - originalCollectorStatus := collector.Status.DeepCopy() + originalCollectorStatus := collector.Status + logger.Info(fmt.Sprintf("reconciling collector: %q", collector.Name)) otelConfigInput, err := r.buildConfigInputForCollector(ctx, collector) if err != nil { @@ -195,6 +199,15 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + // NOTE: This might be revised or removed in the future, but good enough for now to avoid + // deploying the collector that would immediately error due to configuration errors. + // Might also be a good place to add a validation webhook to validate the collector spec + if err := otelConfigInput.ValidateConfig(); err != nil { + collector.Status.State = v1alpha1.StateFailed + logger.Error(errors.WithStack(err), "failed validating otel config input") + return ctrl.Result{}, err + } + otelConfig := otelConfigInput.AssembleConfig(ctx) saName, err := r.reconcileRBAC(ctx, collector) @@ -264,7 +277,11 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( resourceReconciler := reconciler.NewReconcilerWith(r.Client, reconciler.WithLog(logger)) _, err = resourceReconciler.ReconcileResource(&otelCollector, reconciler.StatePresent) if err != nil { - return ctrl.Result{}, err + collector.Status.State = v1alpha1.StateFailed + if apierrors.IsConflict(err) { + logger.Info("conflict while creating otel collector, retrying") + return ctrl.Result{Requeue: true}, nil + } } tenantNames := []string{} @@ -273,9 +290,12 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } collector.Status.Tenants = normalizeStringSlice(tenantNames) + collector.Status.State = v1alpha1.StateReady if !reflect.DeepEqual(originalCollectorStatus, collector.Status) { + logger.Info("collector status changed") if err = r.Client.Status().Update(ctx, collector); err != nil { logger.Error(errors.WithStack(err), "failed updating collector status") + return ctrl.Result{}, err } } diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 525723d..a36f8dd 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -16,12 +16,14 @@ package otel_conf_gen import ( "context" + "errors" "fmt" "slices" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "golang.org/x/exp/maps" + "github.com/hashicorp/go-multierror" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" @@ -107,9 +109,11 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any { if tenantIdx := slices.IndexFunc(cfgInput.Tenants, func(t v1alpha1.Tenant) bool { return tenantName == t.Name }); tenantIdx != -1 { - k8sReceiverName := fmt.Sprintf("filelog/%s", tenantName) namespaces := cfgInput.Tenants[tenantIdx].Status.LogSourceNamespaces - receivers[k8sReceiverName] = receiver.GenerateDefaultKubernetesReceiver(namespaces) + // Generate filelog receiver for the tenant if it has any logsource namespaces + if len(namespaces) > 0 { + receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces) + } } } @@ -121,13 +125,19 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { maps.Copy(connectors, connector.GenerateCountConnectors()) for _, tenant := range cfgInput.Tenants { - rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) - connectors[rc.Name] = rc + // Generate routing connector for the tenant's subscription if it has any + if len(cfgInput.TenantSubscriptionMap[tenant.Name]) > 0 { + rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) + connectors[rc.Name] = rc + } } for _, subscription := range cfgInput.Subscriptions { - rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) - connectors[rc.Name] = rc + // Generate routing connector for the subscription's outputs if it has any + if len(cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) > 0 { + rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) + connectors[rc.Name] = rc + } } for _, bridge := range cfgInput.Bridges { @@ -144,7 +154,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b var namedPipelines = make(map[string]*otelv1beta1.Pipeline) tenants := []string{} for tenant := range cfgInput.TenantSubscriptionMap { - namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(tenant) + namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant) tenants = append(tenants, tenant) } @@ -153,7 +163,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b for _, tenant := range tenants { // Generate a pipeline for the tenant tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant) - namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(tenant) + namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant) connector.GenerateRoutingConnectorForBridgesTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges) processor.GenerateTransformProcessorForTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Tenants) @@ -255,3 +265,69 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be }, } } + +func validateTenants(tenants *[]v1alpha1.Tenant) error { + var result *multierror.Error + + if len(*tenants) == 0 { + return errors.New("no tenants provided, at least one tenant must be provided") + } + + for _, tenant := range *tenants { + if len(tenant.Spec.SubscriptionNamespaceSelectors) == 0 && len(tenant.Spec.LogSourceNamespaceSelectors) == 0 { + result = multierror.Append(result, fmt.Errorf("tenant must have at least one subscription or logsource namespace selector, tenant: %s has neither", tenant.Name)) + } + } + + return result.ErrorOrNil() +} + +func validateSubscriptionsAndBridges(tenants *[]v1alpha1.Tenant, subscriptions *map[v1alpha1.NamespacedName]v1alpha1.Subscription, bridges *[]v1alpha1.Bridge) error { + var result *multierror.Error + + hasSubs := len(*subscriptions) > 0 + hasBridges := len(*bridges) > 0 + if !hasSubs && !hasBridges { + return errors.New("no subscriptions or bridges provided, at least one subscription or bridge must be provided") + } + + if hasSubs { + for _, subscription := range *subscriptions { + if len(subscription.Spec.Outputs) == 0 { + result = multierror.Append(result, fmt.Errorf("subscription %s has no outputs", subscription.Name)) + } + } + } + + if hasBridges { + tenantMap := make(map[string]struct{}) + for _, tenant := range *tenants { + tenantMap[tenant.Name] = struct{}{} + } + + for _, bridge := range *bridges { + if _, sourceFound := tenantMap[bridge.Spec.SourceTenant]; !sourceFound { + result = multierror.Append(result, fmt.Errorf("bridge: %s has a source tenant: %s that does not exist", bridge.Name, bridge.Spec.SourceTenant)) + } + if _, targetFound := tenantMap[bridge.Spec.TargetTenant]; !targetFound { + result = multierror.Append(result, fmt.Errorf("bridge: %s has a target tenant: %s that does not exist", bridge.Name, bridge.Spec.TargetTenant)) + } + } + } + + return result.ErrorOrNil() +} + +func (cfgInput *OtelColConfigInput) ValidateConfig() error { + var result *multierror.Error + + if err := validateTenants(&cfgInput.Tenants); err != nil { + result = multierror.Append(result, err) + } + + if err := validateSubscriptionsAndBridges(&cfgInput.Tenants, &cfgInput.Subscriptions, &cfgInput.Bridges); err != nil { + result = multierror.Append(result, err) + } + + return result.ErrorOrNil() +} diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 5c875a0..c702559 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -466,7 +466,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Debug: false, }, expectedPipelines: map[string]*otelv1beta1.Pipeline{ - "logs/tenant_tenant1": pipeline.GenerateRootPipeline("tenant1"), + "logs/tenant_tenant1": pipeline.GenerateRootPipeline([]v1alpha1.Tenant{}, "tenant1"), "logs/tenant_tenant1_subscription_ns1_sub1": pipeline.GeneratePipeline( []string{"routing/tenant_tenant1_subscriptions"}, []string{"attributes/subscription_sub1"}, @@ -485,7 +485,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { }, }, { - name: "Two tenants each with a subscription with a bridge", + name: "Three tenants two bridges", cfgInput: OtelColConfigInput{ Tenants: []v1alpha1.Tenant{ { @@ -493,114 +493,163 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Name: "tenant1", }, Spec: v1alpha1.TenantSpec{ - Transform: v1alpha1.Transform{ - Name: "transform1", - LogStatements: []v1alpha1.TransformStatement{ - { - Statements: []string{`set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))"))`}, + LogSourceNamespaceSelectors: []metav1.LabelSelector{ + { + MatchLabels: map[string]string{ + "nsSelector": "ns1", }, }, }, }, Status: v1alpha1.TenantStatus{ LogSourceNamespaces: []string{"ns1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tenant2", + }, + Spec: v1alpha1.TenantSpec{ + SubscriptionNamespaceSelectors: []metav1.LabelSelector{ + { + MatchLabels: map[string]string{ + "nsSelector": "ns2", + }, + }, + }, + }, + Status: v1alpha1.TenantStatus{ Subscriptions: []v1alpha1.NamespacedName{ { - Namespace: "ns1", - Name: "sub1", + Namespace: "ns2", + Name: "sub2", }, }, - State: v1alpha1.StateReady, }, }, { ObjectMeta: metav1.ObjectMeta{ - Name: "tenant2", + Name: "tenant3", + }, + Spec: v1alpha1.TenantSpec{ + SubscriptionNamespaceSelectors: []metav1.LabelSelector{ + { + MatchLabels: map[string]string{ + "nsSelector": "ns3", + }, + }, + }, }, - Spec: v1alpha1.TenantSpec{}, Status: v1alpha1.TenantStatus{ - LogSourceNamespaces: []string{"ns2"}, Subscriptions: []v1alpha1.NamespacedName{ { - Namespace: "ns2", - Name: "sub2", + Namespace: "ns3", + Name: "sub3", }, }, - State: v1alpha1.StateReady, }, }, }, Subscriptions: map[v1alpha1.NamespacedName]v1alpha1.Subscription{ { - Namespace: "ns1", - Name: "sub1", + Namespace: "ns2", + Name: "sub2", }: { ObjectMeta: metav1.ObjectMeta{ - Name: "sub1", - Namespace: "ns1", + Name: "sub2", + Namespace: "ns2", }, Spec: v1alpha1.SubscriptionSpec{ Condition: "true", - Outputs: []v1alpha1.NamespacedName{}, + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, + }, + }, + Status: v1alpha1.SubscriptionStatus{ + Tenant: "tenant2", + Outputs: []v1alpha1.NamespacedName{{Namespace: "xy", Name: "zq"}}, }, }, { - Namespace: "ns2", - Name: "sub2", + Namespace: "ns3", + Name: "sub3", }: { ObjectMeta: metav1.ObjectMeta{ - Name: "sub2", - Namespace: "ns2", + Name: "sub3", + Namespace: "ns3", }, Spec: v1alpha1.SubscriptionSpec{ Condition: "true", - Outputs: []v1alpha1.NamespacedName{}, + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, + }, + }, + Status: v1alpha1.SubscriptionStatus{ + Tenant: "tenant3", + Outputs: []v1alpha1.NamespacedName{{Namespace: "xy", Name: "zq"}}, }, }, }, - TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ - "tenant1": { - { - Namespace: "ns1", - Name: "sub1", + Bridges: []v1alpha1.Bridge{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge1", + }, + Spec: v1alpha1.BridgeSpec{ + SourceTenant: "tenant1", + TargetTenant: "tenant2", + Condition: `attributes["parsed"]["method"] == "GET"`, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge2", + }, + Spec: v1alpha1.BridgeSpec{ + SourceTenant: "tenant1", + TargetTenant: "tenant3", + Condition: `attributes["parsed"]["method"] == "PUT"`, + }, + }, + }, + TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ + "tenant1": {}, "tenant2": { { Namespace: "ns2", Name: "sub2", }, }, - }, - SubscriptionOutputMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ - { - Namespace: "ns1", - Name: "sub1", - }: { + "tenant3": { { - Namespace: "ns1", - Name: "output1", + Namespace: "ns3", + Name: "sub3", }, }, + }, + SubscriptionOutputMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ { Namespace: "ns2", Name: "sub2", }: { { - Namespace: "ns2", - Name: "output2", + Namespace: "xy", + Name: "zq", }, }, - }, - Bridges: []v1alpha1.Bridge{ { - ObjectMeta: metav1.ObjectMeta{ - Name: "bridge1", - }, - Spec: v1alpha1.BridgeSpec{ - SourceTenant: "tenant1", - TargetTenant: "tenant2", - Condition: "true", + Namespace: "ns3", + Name: "sub3", + }: { + { + Namespace: "xy", + Name: "zq", }, }, }, @@ -608,36 +657,41 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Debug: false, }, expectedPipelines: map[string]*otelv1beta1.Pipeline{ - "logs/tenant_tenant1": pipeline.GeneratePipeline( - []string{"filelog/tenant1"}, - []string{"k8sattributes", "attributes/tenant_tenant1", "transform/transform1"}, - []string{"routing/tenant_tenant1_subscriptions", "count/tenant_metrics", "routing/bridge_bridge1"}, - ), - "logs/tenant_tenant1_subscription_ns1_sub1": pipeline.GeneratePipeline( - []string{"routing/tenant_tenant1_subscriptions"}, - []string{"attributes/subscription_sub1"}, - []string{"routing/subscription_ns1_sub1_outputs"}, - ), - "logs/tenant_tenant2": pipeline.GeneratePipeline( - []string{"filelog/tenant2", "routing/bridge_bridge1"}, - []string{"k8sattributes", "attributes/tenant_tenant2"}, - []string{"routing/tenant_tenant2_subscriptions", "count/tenant_metrics"}, - ), - "logs/tenant_tenant2_subscription_ns2_sub2": pipeline.GeneratePipeline( - []string{"routing/tenant_tenant2_subscriptions"}, - []string{"attributes/subscription_sub2"}, - []string{"routing/subscription_ns2_sub2_outputs"}, - ), - "metrics/output": pipeline.GeneratePipeline( - []string{"count/output_metrics"}, - []string{"deltatocumulative", "attributes/metricattributes"}, - []string{"prometheus/message_metrics_exporter"}, - ), - "metrics/tenant": pipeline.GeneratePipeline( - []string{"count/tenant_metrics"}, - []string{"deltatocumulative", "attributes/metricattributes"}, - []string{"prometheus/message_metrics_exporter"}, - ), + "logs/tenant_tenant1": { + Receivers: []string{"filelog/tenant1"}, + Processors: []string{"k8sattributes", "attributes/tenant_tenant1"}, + Exporters: []string{"count/tenant_metrics", "routing/bridge_bridge1", "routing/bridge_bridge2"}, + }, + "logs/tenant_tenant2": { + Receivers: []string{"routing/bridge_bridge1"}, + Processors: []string{"k8sattributes", "attributes/tenant_tenant2"}, + Exporters: []string{"routing/tenant_tenant2_subscriptions", "count/tenant_metrics"}, + }, + "logs/tenant_tenant2_subscription_ns2_sub2": { + Receivers: []string{"routing/tenant_tenant2_subscriptions"}, + Processors: []string{"attributes/subscription_sub2"}, + Exporters: []string{"routing/subscription_ns2_sub2_outputs"}, + }, + "logs/tenant_tenant3": { + Receivers: []string{"routing/bridge_bridge2"}, + Processors: []string{"k8sattributes", "attributes/tenant_tenant3"}, + Exporters: []string{"routing/tenant_tenant3_subscriptions", "count/tenant_metrics"}, + }, + "logs/tenant_tenant3_subscription_ns3_sub3": { + Receivers: []string{"routing/tenant_tenant3_subscriptions"}, + Processors: []string{"attributes/subscription_sub3"}, + Exporters: []string{"routing/subscription_ns3_sub3_outputs"}, + }, + "metrics/output": { + Receivers: []string{"count/output_metrics"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + }, + "metrics/tenant": { + Receivers: []string{"count/tenant_metrics"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + }, }, }, } @@ -645,7 +699,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { for _, tt := range tests { ttp := tt t.Run(ttp.name, func(t *testing.T) { - assert.Equal(t, ttp.cfgInput.generateNamedPipelines(), ttp.expectedPipelines) + assert.Equal(t, ttp.expectedPipelines, ttp.cfgInput.generateNamedPipelines()) }) } } diff --git a/internal/controller/telemetry/pipeline/pipeline.go b/internal/controller/telemetry/pipeline/pipeline.go index c2b4919..956c024 100644 --- a/internal/controller/telemetry/pipeline/pipeline.go +++ b/internal/controller/telemetry/pipeline/pipeline.go @@ -17,22 +17,35 @@ package pipeline import ( "fmt" + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" ) func GeneratePipeline(receivers, processors, exporters []string) *otelv1beta1.Pipeline { - result := otelv1beta1.Pipeline{} - result.Receivers = receivers - result.Processors = processors - result.Exporters = exporters - - return &result + return &otelv1beta1.Pipeline{ + Receivers: filterEmptyPipelines(receivers), + Processors: filterEmptyPipelines(processors), + Exporters: filterEmptyPipelines(exporters), + } } -func GenerateRootPipeline(tenantName string) *otelv1beta1.Pipeline { +func GenerateRootPipeline(tenants []v1alpha1.Tenant, tenantName string) *otelv1beta1.Pipeline { tenantCountConnectorName := "count/tenant_metrics" - receiverName := fmt.Sprintf("filelog/%s", tenantName) - exporterName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName) + var receiverName string + var exporterName string + for _, tenant := range tenants { + if tenant.Name == tenantName { + // Add filelog receiver to tenant's pipeline if it has any logsource namespace selectors + if tenant.Spec.LogSourceNamespaceSelectors != nil { + receiverName = fmt.Sprintf("filelog/%s", tenantName) + } + // Add routing connector to tenant's pipeline if it has any subscription namespace selectors + if tenant.Spec.SubscriptionNamespaceSelectors != nil { + exporterName = fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName) + } + } + } + return GeneratePipeline([]string{receiverName}, []string{"k8sattributes", fmt.Sprintf("attributes/tenant_%s", tenantName)}, []string{exporterName, tenantCountConnectorName}) } @@ -51,3 +64,13 @@ func GenerateMetricsPipelines() map[string]*otelv1beta1.Pipeline { return metricsPipelines } + +func filterEmptyPipelines(items []string) []string { + var result []string + for _, item := range items { + if item != "" { + result = append(result, item) + } + } + return result +} diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go index a3f2af6..b783f8e 100644 --- a/internal/controller/telemetry/route_controller.go +++ b/internal/controller/telemetry/route_controller.go @@ -475,26 +475,20 @@ func (r *RouteReconciler) getTenants(ctx context.Context, listOpts *client.ListO } func (r *RouteReconciler) checkBridgeConnections(ctx context.Context, bridge *v1alpha1.Bridge) error { - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(tenantNameField, bridge.Spec.SourceTenant), - } - sourceTenant, err := r.getTenants(ctx, listOpts) - if err != nil { - return err - } - if len(sourceTenant) != 1 && sourceTenant[0].Name != bridge.Spec.SourceTenant { - return errors.Errorf("bridge (%s) has invalid source tenant", bridge.Name) - } - - listOpts = &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(tenantNameField, bridge.Spec.TargetTenant), - } - targetTenant, err := r.getTenants(ctx, listOpts) - if err != nil { - return err - } - if len(targetTenant) != 1 && targetTenant[0].Name != bridge.Spec.TargetTenant { - return errors.Errorf("bridge (%s) has invalid target tenant", bridge.Name) + for _, tenant := range []string{bridge.Spec.SourceTenant, bridge.Spec.TargetTenant} { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(tenantNameField, tenant), + } + tenants, err := r.getTenants(ctx, listOpts) + if err != nil { + return err + } + if len(tenants) == 0 { + return errors.Errorf("tenant: %s not found for bridge: %s", tenant, bridge.Name) + } + if len(tenants) != 1 || tenants[0].Name != tenant { + return errors.Errorf("bridge: %s has invalid tenant reference: %s", bridge.Name, tenant) + } } return nil