diff --git a/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml index 9942273d..cbb8d63c 100644 --- a/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_col_conf_test_fixtures/complex.yaml @@ -263,24 +263,16 @@ connectors: table: - statement: route() pipelines: [logs/tenant_example-tenant-b_subscription_example-tenant-b-ns_subscription-example-3] - routing/tenants_example-tenant-a: - table: - - statement: 'route() where ' - pipelines: [logs/tenant_example-tenant-a] - routing/tenants_example-tenant-b: - table: - - statement: 'route() where ' - pipelines: [logs/tenant_example-tenant-b] service: pipelines: - logs/all_example-tenant-a: + logs/tenant_example-tenant-a: receivers: [filelog/example-tenant-a] - processors: [memory_limiter, k8sattributes] - exporters: [routing/tenants_example-tenant-a] - logs/all_example-tenant-b: + processors: [memory_limiter, k8sattributes, attributes/tenant_example-tenant-a] + exporters: [routing/tenant_example-tenant-a_subscriptions, count/tenant_metrics] + logs/tenant_example-tenant-b: receivers: [filelog/example-tenant-b] - processors: [memory_limiter, k8sattributes] - exporters: [routing/tenants_example-tenant-b] + processors: [memory_limiter, k8sattributes, attributes/tenant_example-tenant-b] + exporters: [routing/tenant_example-tenant-b_subscriptions, count/tenant_metrics] logs/output_example-tenant-a-ns_subscription-example-1_collector_loki-test-output: receivers: [routing/subscription_example-tenant-a-ns_subscription-example-1_outputs] processors: [memory_limiter, attributes/exporter_name_loki-test-output, attributes/loki_exporter_loki-test-output, resource/loki_exporter_loki-test-output] @@ -297,10 +289,6 @@ service: receivers: [routing/subscription_example-tenant-b-ns_subscription-example-3_outputs] processors: [memory_limiter, attributes/exporter_name_otlp-test-output-2] exporters: [otlp/collector_otlp-test-output-2, count/output_metrics] - logs/tenant_example-tenant-a: - receivers: [routing/tenants_example-tenant-a] - processors: [memory_limiter, attributes/tenant_example-tenant-a] - exporters: [routing/tenant_example-tenant-a_subscriptions, count/tenant_metrics] logs/tenant_example-tenant-a_subscription_example-tenant-a-ns_subscription-example-1: receivers: [routing/tenant_example-tenant-a_subscriptions] processors: [memory_limiter, attributes/subscription_subscription-example-1] @@ -309,10 +297,6 @@ service: receivers: [routing/tenant_example-tenant-a_subscriptions] processors: [memory_limiter, attributes/subscription_subscription-example-2] exporters: [routing/subscription_example-tenant-a-ns_subscription-example-2_outputs] - logs/tenant_example-tenant-b: - receivers: [routing/tenants_example-tenant-b] - processors: [memory_limiter, attributes/tenant_example-tenant-b] - exporters: [routing/tenant_example-tenant-b_subscriptions, count/tenant_metrics] logs/tenant_example-tenant-b_subscription_example-tenant-b-ns_subscription-example-3: receivers: [routing/tenant_example-tenant-b_subscriptions] processors: [memory_limiter, attributes/subscription_subscription-example-3] diff --git a/internal/controller/telemetry/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen.go index c62742da..2a6d30ca 100644 --- a/internal/controller/telemetry/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen.go @@ -309,34 +309,6 @@ func newRoutingConnector(name string) RoutingConnector { return result } -func buildRoutingTableItemForTenant(tenant v1alpha1.Tenant) RoutingConnectorTableItem { - conditions := make([]string, len(tenant.Status.LogSourceNamespaces)) - - for i, namespace := range tenant.Status.LogSourceNamespaces { - conditions[i] = fmt.Sprintf(`IsMatch(attributes["k8s.namespace.name"], %q)`, namespace) - } - - conditionString := strings.Join(conditions, " or ") - - newItem := RoutingConnectorTableItem{ - Statement: fmt.Sprintf(`route() where %s`, conditionString), - Pipelines: []string{fmt.Sprintf("logs/tenant_%s", tenant.Name)}, - } - - return newItem -} - -func generateRootRoutingConnectors(tenants []v1alpha1.Tenant) []RoutingConnector { - routingConnectors := make([]RoutingConnector, 0, len(tenants)) - for _, tenant := range tenants { - defaultRcForTenant := newRoutingConnector(fmt.Sprintf("routing/tenants_%s", tenant.Name)) - tableItem := buildRoutingTableItemForTenant(tenant) - defaultRcForTenant.AddRoutingConnectorTableElem(tableItem) - routingConnectors = append(routingConnectors, defaultRcForTenant) - } - return routingConnectors -} - func buildRoutingTableItemForSubscription(tenantName string, subscription v1alpha1.Subscription, index int) RoutingConnectorTableItem { pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s_%s", tenantName, subscription.Namespace, subscription.Name) @@ -398,14 +370,8 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { var connectors = make(map[string]any) countConnectors := generateCountConnectors() - maps.Copy(connectors, countConnectors) - rootRoutingConnectors := generateRootRoutingConnectors(cfgInput.Tenants) - for _, rc := range rootRoutingConnectors { - connectors[rc.Name] = rc - } - for _, tenant := range cfgInput.Tenants { rc := cfgInput.generateRoutingConnectorForTenantsSubscriptions(tenant.Name, cfgInput.TenantSubscriptionMap[tenant.Name]) connectors[rc.Name] = rc @@ -620,19 +586,19 @@ func generateLokiExporterResourceProcessor() ResourceProcessor { } func generateRootPipeline(tenantName string) Pipeline { + tenantCountConnectorName := "count/tenant_metrics" receiverName := fmt.Sprintf("filelog/%s", tenantName) - exporterName := fmt.Sprintf("routing/tenants_%s", tenantName) - return generatePipeline([]string{receiverName}, []string{"k8sattributes"}, []string{exporterName}) + exporterName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName) + return generatePipeline([]string{receiverName}, []string{"k8sattributes", fmt.Sprintf("attributes/tenant_%s", tenantName)}, []string{exporterName, tenantCountConnectorName}) } func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline { - tenantCountConnectorName := "count/tenant_metrics" outputCountConnectorName := "count/output_metrics" var namedPipelines = make(map[string]Pipeline) tenants := []string{} for tenant := range cfgInput.TenantSubscriptionMap { - tenantRootPipeline := fmt.Sprintf("logs/all_%s", tenant) + tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant) namedPipelines[tenantRootPipeline] = generateRootPipeline(tenant) tenants = append(tenants, tenant) } @@ -642,13 +608,13 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline for _, tenant := range tenants { // Generate a pipeline for the tenant - tenantPipelineName := fmt.Sprintf("logs/tenant_%s", tenant) + tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant) tenantRoutingName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenant) - namedPipelines[tenantPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/tenants_%s", tenant)}, []string{fmt.Sprintf("attributes/tenant_%s", tenant)}, []string{tenantRoutingName, tenantCountConnectorName}) + namedPipelines[tenantRootPipeline] = generateRootPipeline(tenant) // Generate pipelines for the subscriptions for the tenant for _, subscription := range cfgInput.TenantSubscriptionMap[tenant] { - tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s_%s", tenantPipelineName, subscription.Namespace, subscription.Name) + tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s_%s", tenantRootPipeline, subscription.Namespace, subscription.Name) namedPipelines[tenantSubscriptionPipelineName] = generatePipeline([]string{tenantRoutingName}, []string{fmt.Sprintf("attributes/subscription_%s", subscription.Name)}, []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}) diff --git a/internal/controller/telemetry/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen_test.go index 244becd3..232e0f61 100644 --- a/internal/controller/telemetry/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen_test.go @@ -280,68 +280,6 @@ actual= ---`, otelColTargetYaml, actualYAML) } } - -func Test_generateRootRoutingConnector(t *testing.T) { - type args struct { - tenants []v1alpha1.Tenant - } - tests := []struct { - name string - args args - want []RoutingConnector - }{ - { - name: "two_tenants", - args: args{ - tenants: []v1alpha1.Tenant{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "tenantA", - }, - Status: v1alpha1.TenantStatus{ - LogSourceNamespaces: []string{"a", "b"}, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "tenantB", - }, - Status: v1alpha1.TenantStatus{ - LogSourceNamespaces: []string{"c", "d"}, - }, - }, - }, - }, - want: []RoutingConnector{ - { - Name: "routing/tenants_tenantA", - Table: []RoutingConnectorTableItem{ - { - Statement: `route() where IsMatch(attributes["k8s.namespace.name"], "a") or IsMatch(attributes["k8s.namespace.name"], "b")`, - Pipelines: []string{"logs/tenant_tenantA"}, - }, - }, - }, - { - Name: "routing/tenants_tenantB", - Table: []RoutingConnectorTableItem{ - { - Statement: `route() where IsMatch(attributes["k8s.namespace.name"], "c") or IsMatch(attributes["k8s.namespace.name"], "d")`, - Pipelines: []string{"logs/tenant_tenantB"}, - }, - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := generateRootRoutingConnectors(tt.args.tenants) - assert.Equal(t, got, tt.want) - }) - } -} - func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *testing.T) { type fields struct { Tenants []v1alpha1.Tenant