diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml index 2b9fe17..098d722 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_collectors.yaml @@ -17,12 +17,12 @@ spec: scope: Cluster versions: - additionalPrinterColumns: - - jsonPath: .status.state - name: State - type: string - jsonPath: .status.tenants name: Tenants type: string + - jsonPath: .status.state + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: diff --git a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml index 2b9fe17..098d722 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml @@ -17,12 +17,12 @@ spec: scope: Cluster versions: - additionalPrinterColumns: - - jsonPath: .status.state - name: State - type: string - jsonPath: .status.tenants name: Tenants type: string + - jsonPath: .status.state + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml index 620a136..48165ae 100644 --- a/docs/examples/tenant-to-tenant-routing/pipeline.yaml +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -24,35 +24,6 @@ spec: logSourceNamespaceSelectors: - matchLabels: tenant: shared - subscriptionNamespaceSelectors: - - matchLabels: - tenant: shared ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Subscription -metadata: - name: shared - namespace: shared -spec: - condition: "true" - outputs: - - name: openobserve-shared - namespace: shared ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Output -metadata: - name: openobserve-shared - namespace: shared -spec: - otlp: - endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 - headers: - Authorization: "Basic " - organization: default - stream-name: shared - tls: - insecure: true --- # A tenant that consumes logs from the shared tenant using a bridge apiVersion: telemetry.kube-logging.dev/v1alpha1 @@ -62,9 +33,6 @@ metadata: collector: cluster name: database spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: database subscriptionNamespaceSelectors: - matchLabels: tenant: database @@ -112,9 +80,6 @@ metadata: collector: cluster name: web spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: web subscriptionNamespaceSelectors: - matchLabels: tenant: web diff --git a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml index 595f83f..48c5d8a 100644 --- a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml +++ b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml @@ -50,31 +50,6 @@ spec: logSourceNamespaceSelectors: - matchLabels: tenant: shared - subscriptionNamespaceSelectors: - - matchLabels: - tenant: shared ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Subscription -metadata: - name: shared - namespace: shared -spec: - condition: "true" - outputs: - - name: otlp-test-output-shared - namespace: collector ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Output -metadata: - name: otlp-test-output-shared - namespace: collector -spec: - otlp: - endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 - tls: - insecure: true --- # A tenant that consumes logs from the shared tenant using a bridge apiVersion: telemetry.kube-logging.dev/v1alpha1 @@ -84,9 +59,6 @@ metadata: collector: cluster name: database spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: database subscriptionNamespaceSelectors: - matchLabels: tenant: database @@ -130,9 +102,6 @@ metadata: collector: cluster name: web spec: - logSourceNamespaceSelectors: - - matchLabels: - tenant: web subscriptionNamespaceSelectors: - matchLabels: tenant: web diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 3b0f1e2..8cbda8c 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -203,6 +203,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // deploying the collector that would immediately error due to configuration errors. // Might also be a good place to add a validation webhook to validate the collector spec if err := otelConfigInput.ValidateConfig(); err != nil { + collector.Status.State = v1alpha1.StateFailed logger.Error(errors.WithStack(err), "failed validating otel config input") return ctrl.Result{}, err } diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index ea703af..2ce3219 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -109,9 +109,11 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any { if tenantIdx := slices.IndexFunc(cfgInput.Tenants, func(t v1alpha1.Tenant) bool { return tenantName == t.Name }); tenantIdx != -1 { - k8sReceiverName := fmt.Sprintf("filelog/%s", tenantName) namespaces := cfgInput.Tenants[tenantIdx].Status.LogSourceNamespaces - receivers[k8sReceiverName] = receiver.GenerateDefaultKubernetesReceiver(namespaces) + // Generate filelog receiver for the tenant if it has any logsource namespaces + if len(namespaces) > 0 { + receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces) + } } } @@ -123,13 +125,19 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { maps.Copy(connectors, connector.GenerateCountConnectors()) for _, tenant := range cfgInput.Tenants { - rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) - connectors[rc.Name] = rc + // Generate routing connector for the tenant's subscription if it has any + if len(cfgInput.TenantSubscriptionMap[tenant.Name]) > 0 { + rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) + connectors[rc.Name] = rc + } } for _, subscription := range cfgInput.Subscriptions { - rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) - connectors[rc.Name] = rc + // Generate routing connector for the subscription's outputs if it has any + if len(cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) > 0 { + rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) + connectors[rc.Name] = rc + } } for _, bridge := range cfgInput.Bridges { @@ -146,7 +154,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b var namedPipelines = make(map[string]*otelv1beta1.Pipeline) tenants := []string{} for tenant := range cfgInput.TenantSubscriptionMap { - namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(tenant) + namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant) tenants = append(tenants, tenant) } @@ -155,7 +163,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b for _, tenant := range tenants { // Generate a pipeline for the tenant tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant) - namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(tenant) + namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant) connector.GenerateRoutingConnectorForBridgesTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges) processor.GenerateTransformProcessorForTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Tenants) diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 5c875a0..c702559 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -466,7 +466,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Debug: false, }, expectedPipelines: map[string]*otelv1beta1.Pipeline{ - "logs/tenant_tenant1": pipeline.GenerateRootPipeline("tenant1"), + "logs/tenant_tenant1": pipeline.GenerateRootPipeline([]v1alpha1.Tenant{}, "tenant1"), "logs/tenant_tenant1_subscription_ns1_sub1": pipeline.GeneratePipeline( []string{"routing/tenant_tenant1_subscriptions"}, []string{"attributes/subscription_sub1"}, @@ -485,7 +485,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { }, }, { - name: "Two tenants each with a subscription with a bridge", + name: "Three tenants two bridges", cfgInput: OtelColConfigInput{ Tenants: []v1alpha1.Tenant{ { @@ -493,114 +493,163 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Name: "tenant1", }, Spec: v1alpha1.TenantSpec{ - Transform: v1alpha1.Transform{ - Name: "transform1", - LogStatements: []v1alpha1.TransformStatement{ - { - Statements: []string{`set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))"))`}, + LogSourceNamespaceSelectors: []metav1.LabelSelector{ + { + MatchLabels: map[string]string{ + "nsSelector": "ns1", }, }, }, }, Status: v1alpha1.TenantStatus{ LogSourceNamespaces: []string{"ns1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tenant2", + }, + Spec: v1alpha1.TenantSpec{ + SubscriptionNamespaceSelectors: []metav1.LabelSelector{ + { + MatchLabels: map[string]string{ + "nsSelector": "ns2", + }, + }, + }, + }, + Status: v1alpha1.TenantStatus{ Subscriptions: []v1alpha1.NamespacedName{ { - Namespace: "ns1", - Name: "sub1", + Namespace: "ns2", + Name: "sub2", }, }, - State: v1alpha1.StateReady, }, }, { ObjectMeta: metav1.ObjectMeta{ - Name: "tenant2", + Name: "tenant3", + }, + Spec: v1alpha1.TenantSpec{ + SubscriptionNamespaceSelectors: []metav1.LabelSelector{ + { + MatchLabels: map[string]string{ + "nsSelector": "ns3", + }, + }, + }, }, - Spec: v1alpha1.TenantSpec{}, Status: v1alpha1.TenantStatus{ - LogSourceNamespaces: []string{"ns2"}, Subscriptions: []v1alpha1.NamespacedName{ { - Namespace: "ns2", - Name: "sub2", + Namespace: "ns3", + Name: "sub3", }, }, - State: v1alpha1.StateReady, }, }, }, Subscriptions: map[v1alpha1.NamespacedName]v1alpha1.Subscription{ { - Namespace: "ns1", - Name: "sub1", + Namespace: "ns2", + Name: "sub2", }: { ObjectMeta: metav1.ObjectMeta{ - Name: "sub1", - Namespace: "ns1", + Name: "sub2", + Namespace: "ns2", }, Spec: v1alpha1.SubscriptionSpec{ Condition: "true", - Outputs: []v1alpha1.NamespacedName{}, + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, + }, + }, + Status: v1alpha1.SubscriptionStatus{ + Tenant: "tenant2", + Outputs: []v1alpha1.NamespacedName{{Namespace: "xy", Name: "zq"}}, }, }, { - Namespace: "ns2", - Name: "sub2", + Namespace: "ns3", + Name: "sub3", }: { ObjectMeta: metav1.ObjectMeta{ - Name: "sub2", - Namespace: "ns2", + Name: "sub3", + Namespace: "ns3", }, Spec: v1alpha1.SubscriptionSpec{ Condition: "true", - Outputs: []v1alpha1.NamespacedName{}, + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, + }, + }, + Status: v1alpha1.SubscriptionStatus{ + Tenant: "tenant3", + Outputs: []v1alpha1.NamespacedName{{Namespace: "xy", Name: "zq"}}, }, }, }, - TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ - "tenant1": { - { - Namespace: "ns1", - Name: "sub1", + Bridges: []v1alpha1.Bridge{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge1", + }, + Spec: v1alpha1.BridgeSpec{ + SourceTenant: "tenant1", + TargetTenant: "tenant2", + Condition: `attributes["parsed"]["method"] == "GET"`, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge2", + }, + Spec: v1alpha1.BridgeSpec{ + SourceTenant: "tenant1", + TargetTenant: "tenant3", + Condition: `attributes["parsed"]["method"] == "PUT"`, + }, + }, + }, + TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ + "tenant1": {}, "tenant2": { { Namespace: "ns2", Name: "sub2", }, }, - }, - SubscriptionOutputMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ - { - Namespace: "ns1", - Name: "sub1", - }: { + "tenant3": { { - Namespace: "ns1", - Name: "output1", + Namespace: "ns3", + Name: "sub3", }, }, + }, + SubscriptionOutputMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ { Namespace: "ns2", Name: "sub2", }: { { - Namespace: "ns2", - Name: "output2", + Namespace: "xy", + Name: "zq", }, }, - }, - Bridges: []v1alpha1.Bridge{ { - ObjectMeta: metav1.ObjectMeta{ - Name: "bridge1", - }, - Spec: v1alpha1.BridgeSpec{ - SourceTenant: "tenant1", - TargetTenant: "tenant2", - Condition: "true", + Namespace: "ns3", + Name: "sub3", + }: { + { + Namespace: "xy", + Name: "zq", }, }, }, @@ -608,36 +657,41 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Debug: false, }, expectedPipelines: map[string]*otelv1beta1.Pipeline{ - "logs/tenant_tenant1": pipeline.GeneratePipeline( - []string{"filelog/tenant1"}, - []string{"k8sattributes", "attributes/tenant_tenant1", "transform/transform1"}, - []string{"routing/tenant_tenant1_subscriptions", "count/tenant_metrics", "routing/bridge_bridge1"}, - ), - "logs/tenant_tenant1_subscription_ns1_sub1": pipeline.GeneratePipeline( - []string{"routing/tenant_tenant1_subscriptions"}, - []string{"attributes/subscription_sub1"}, - []string{"routing/subscription_ns1_sub1_outputs"}, - ), - "logs/tenant_tenant2": pipeline.GeneratePipeline( - []string{"filelog/tenant2", "routing/bridge_bridge1"}, - []string{"k8sattributes", "attributes/tenant_tenant2"}, - []string{"routing/tenant_tenant2_subscriptions", "count/tenant_metrics"}, - ), - "logs/tenant_tenant2_subscription_ns2_sub2": pipeline.GeneratePipeline( - []string{"routing/tenant_tenant2_subscriptions"}, - []string{"attributes/subscription_sub2"}, - []string{"routing/subscription_ns2_sub2_outputs"}, - ), - "metrics/output": pipeline.GeneratePipeline( - []string{"count/output_metrics"}, - []string{"deltatocumulative", "attributes/metricattributes"}, - []string{"prometheus/message_metrics_exporter"}, - ), - "metrics/tenant": pipeline.GeneratePipeline( - []string{"count/tenant_metrics"}, - []string{"deltatocumulative", "attributes/metricattributes"}, - []string{"prometheus/message_metrics_exporter"}, - ), + "logs/tenant_tenant1": { + Receivers: []string{"filelog/tenant1"}, + Processors: []string{"k8sattributes", "attributes/tenant_tenant1"}, + Exporters: []string{"count/tenant_metrics", "routing/bridge_bridge1", "routing/bridge_bridge2"}, + }, + "logs/tenant_tenant2": { + Receivers: []string{"routing/bridge_bridge1"}, + Processors: []string{"k8sattributes", "attributes/tenant_tenant2"}, + Exporters: []string{"routing/tenant_tenant2_subscriptions", "count/tenant_metrics"}, + }, + "logs/tenant_tenant2_subscription_ns2_sub2": { + Receivers: []string{"routing/tenant_tenant2_subscriptions"}, + Processors: []string{"attributes/subscription_sub2"}, + Exporters: []string{"routing/subscription_ns2_sub2_outputs"}, + }, + "logs/tenant_tenant3": { + Receivers: []string{"routing/bridge_bridge2"}, + Processors: []string{"k8sattributes", "attributes/tenant_tenant3"}, + Exporters: []string{"routing/tenant_tenant3_subscriptions", "count/tenant_metrics"}, + }, + "logs/tenant_tenant3_subscription_ns3_sub3": { + Receivers: []string{"routing/tenant_tenant3_subscriptions"}, + Processors: []string{"attributes/subscription_sub3"}, + Exporters: []string{"routing/subscription_ns3_sub3_outputs"}, + }, + "metrics/output": { + Receivers: []string{"count/output_metrics"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + }, + "metrics/tenant": { + Receivers: []string{"count/tenant_metrics"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + }, }, }, } @@ -645,7 +699,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { for _, tt := range tests { ttp := tt t.Run(ttp.name, func(t *testing.T) { - assert.Equal(t, ttp.cfgInput.generateNamedPipelines(), ttp.expectedPipelines) + assert.Equal(t, ttp.expectedPipelines, ttp.cfgInput.generateNamedPipelines()) }) } } diff --git a/internal/controller/telemetry/pipeline/pipeline.go b/internal/controller/telemetry/pipeline/pipeline.go index c2b4919..956c024 100644 --- a/internal/controller/telemetry/pipeline/pipeline.go +++ b/internal/controller/telemetry/pipeline/pipeline.go @@ -17,22 +17,35 @@ package pipeline import ( "fmt" + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" ) func GeneratePipeline(receivers, processors, exporters []string) *otelv1beta1.Pipeline { - result := otelv1beta1.Pipeline{} - result.Receivers = receivers - result.Processors = processors - result.Exporters = exporters - - return &result + return &otelv1beta1.Pipeline{ + Receivers: filterEmptyPipelines(receivers), + Processors: filterEmptyPipelines(processors), + Exporters: filterEmptyPipelines(exporters), + } } -func GenerateRootPipeline(tenantName string) *otelv1beta1.Pipeline { +func GenerateRootPipeline(tenants []v1alpha1.Tenant, tenantName string) *otelv1beta1.Pipeline { tenantCountConnectorName := "count/tenant_metrics" - receiverName := fmt.Sprintf("filelog/%s", tenantName) - exporterName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName) + var receiverName string + var exporterName string + for _, tenant := range tenants { + if tenant.Name == tenantName { + // Add filelog receiver to tenant's pipeline if it has any logsource namespace selectors + if tenant.Spec.LogSourceNamespaceSelectors != nil { + receiverName = fmt.Sprintf("filelog/%s", tenantName) + } + // Add routing connector to tenant's pipeline if it has any subscription namespace selectors + if tenant.Spec.SubscriptionNamespaceSelectors != nil { + exporterName = fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName) + } + } + } + return GeneratePipeline([]string{receiverName}, []string{"k8sattributes", fmt.Sprintf("attributes/tenant_%s", tenantName)}, []string{exporterName, tenantCountConnectorName}) } @@ -51,3 +64,13 @@ func GenerateMetricsPipelines() map[string]*otelv1beta1.Pipeline { return metricsPipelines } + +func filterEmptyPipelines(items []string) []string { + var result []string + for _, item := range items { + if item != "" { + result = append(result, item) + } + } + return result +}