Skip to content

Commit

Permalink
fix(conf gen): remove extra hop in routing connector
Browse files Browse the repository at this point in the history
Signed-off-by: Szilard Parrag <szilard.parrag@axoflow.com>
  • Loading branch information
OverOrion committed May 2, 2024
1 parent 54ff0cb commit f49c175
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,24 +263,16 @@ connectors:
table:
- statement: route()
pipelines: [logs/tenant_example-tenant-b_subscription_example-tenant-b-ns_subscription-example-3]
routing/tenants_example-tenant-a:
table:
- statement: 'route() where '
pipelines: [logs/tenant_example-tenant-a]
routing/tenants_example-tenant-b:
table:
- statement: 'route() where '
pipelines: [logs/tenant_example-tenant-b]
service:
pipelines:
logs/all_example-tenant-a:
logs/tenant_example-tenant-a:
receivers: [filelog/example-tenant-a]
processors: [memory_limiter, k8sattributes]
exporters: [routing/tenants_example-tenant-a]
logs/all_example-tenant-b:
processors: [memory_limiter, k8sattributes, attributes/tenant_example-tenant-a]
exporters: [routing/tenant_example-tenant-a_subscriptions, count/tenant_metrics]
logs/tenant_example-tenant-b:
receivers: [filelog/example-tenant-b]
processors: [memory_limiter, k8sattributes]
exporters: [routing/tenants_example-tenant-b]
processors: [memory_limiter, k8sattributes, attributes/tenant_example-tenant-b]
exporters: [routing/tenant_example-tenant-b_subscriptions, count/tenant_metrics]
logs/output_example-tenant-a-ns_subscription-example-1_collector_loki-test-output:
receivers: [routing/subscription_example-tenant-a-ns_subscription-example-1_outputs]
processors: [memory_limiter, attributes/exporter_name_loki-test-output, attributes/loki_exporter_loki-test-output, resource/loki_exporter_loki-test-output]
Expand All @@ -297,10 +289,6 @@ service:
receivers: [routing/subscription_example-tenant-b-ns_subscription-example-3_outputs]
processors: [memory_limiter, attributes/exporter_name_otlp-test-output-2]
exporters: [otlp/collector_otlp-test-output-2, count/output_metrics]
logs/tenant_example-tenant-a:
receivers: [routing/tenants_example-tenant-a]
processors: [memory_limiter, attributes/tenant_example-tenant-a]
exporters: [routing/tenant_example-tenant-a_subscriptions, count/tenant_metrics]
logs/tenant_example-tenant-a_subscription_example-tenant-a-ns_subscription-example-1:
receivers: [routing/tenant_example-tenant-a_subscriptions]
processors: [memory_limiter, attributes/subscription_subscription-example-1]
Expand All @@ -309,10 +297,6 @@ service:
receivers: [routing/tenant_example-tenant-a_subscriptions]
processors: [memory_limiter, attributes/subscription_subscription-example-2]
exporters: [routing/subscription_example-tenant-a-ns_subscription-example-2_outputs]
logs/tenant_example-tenant-b:
receivers: [routing/tenants_example-tenant-b]
processors: [memory_limiter, attributes/tenant_example-tenant-b]
exporters: [routing/tenant_example-tenant-b_subscriptions, count/tenant_metrics]
logs/tenant_example-tenant-b_subscription_example-tenant-b-ns_subscription-example-3:
receivers: [routing/tenant_example-tenant-b_subscriptions]
processors: [memory_limiter, attributes/subscription_subscription-example-3]
Expand Down
48 changes: 7 additions & 41 deletions internal/controller/telemetry/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,34 +309,6 @@ func newRoutingConnector(name string) RoutingConnector {
return result
}

func buildRoutingTableItemForTenant(tenant v1alpha1.Tenant) RoutingConnectorTableItem {
conditions := make([]string, len(tenant.Status.LogSourceNamespaces))

for i, namespace := range tenant.Status.LogSourceNamespaces {
conditions[i] = fmt.Sprintf(`IsMatch(attributes["k8s.namespace.name"], %q)`, namespace)
}

conditionString := strings.Join(conditions, " or ")

newItem := RoutingConnectorTableItem{
Statement: fmt.Sprintf(`route() where %s`, conditionString),
Pipelines: []string{fmt.Sprintf("logs/tenant_%s", tenant.Name)},
}

return newItem
}

func generateRootRoutingConnectors(tenants []v1alpha1.Tenant) []RoutingConnector {
routingConnectors := make([]RoutingConnector, 0, len(tenants))
for _, tenant := range tenants {
defaultRcForTenant := newRoutingConnector(fmt.Sprintf("routing/tenants_%s", tenant.Name))
tableItem := buildRoutingTableItemForTenant(tenant)
defaultRcForTenant.AddRoutingConnectorTableElem(tableItem)
routingConnectors = append(routingConnectors, defaultRcForTenant)
}
return routingConnectors
}

func buildRoutingTableItemForSubscription(tenantName string, subscription v1alpha1.Subscription, index int) RoutingConnectorTableItem {

pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s_%s", tenantName, subscription.Namespace, subscription.Name)
Expand Down Expand Up @@ -398,14 +370,8 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any {
var connectors = make(map[string]any)

countConnectors := generateCountConnectors()

maps.Copy(connectors, countConnectors)

rootRoutingConnectors := generateRootRoutingConnectors(cfgInput.Tenants)
for _, rc := range rootRoutingConnectors {
connectors[rc.Name] = rc
}

for _, tenant := range cfgInput.Tenants {
rc := cfgInput.generateRoutingConnectorForTenantsSubscriptions(tenant.Name, cfgInput.TenantSubscriptionMap[tenant.Name])
connectors[rc.Name] = rc
Expand Down Expand Up @@ -620,19 +586,19 @@ func generateLokiExporterResourceProcessor() ResourceProcessor {
}

func generateRootPipeline(tenantName string) Pipeline {
tenantCountConnectorName := "count/tenant_metrics"
receiverName := fmt.Sprintf("filelog/%s", tenantName)
exporterName := fmt.Sprintf("routing/tenants_%s", tenantName)
return generatePipeline([]string{receiverName}, []string{"k8sattributes"}, []string{exporterName})
exporterName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName)
return generatePipeline([]string{receiverName}, []string{"k8sattributes", fmt.Sprintf("attributes/tenant_%s", tenantName)}, []string{exporterName, tenantCountConnectorName})
}

func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline {
tenantCountConnectorName := "count/tenant_metrics"
outputCountConnectorName := "count/output_metrics"
var namedPipelines = make(map[string]Pipeline)

tenants := []string{}
for tenant := range cfgInput.TenantSubscriptionMap {
tenantRootPipeline := fmt.Sprintf("logs/all_%s", tenant)
tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant)
namedPipelines[tenantRootPipeline] = generateRootPipeline(tenant)
tenants = append(tenants, tenant)
}
Expand All @@ -642,13 +608,13 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline

for _, tenant := range tenants {
// Generate a pipeline for the tenant
tenantPipelineName := fmt.Sprintf("logs/tenant_%s", tenant)
tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant)
tenantRoutingName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenant)
namedPipelines[tenantPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/tenants_%s", tenant)}, []string{fmt.Sprintf("attributes/tenant_%s", tenant)}, []string{tenantRoutingName, tenantCountConnectorName})
namedPipelines[tenantRootPipeline] = generateRootPipeline(tenant)

// Generate pipelines for the subscriptions for the tenant
for _, subscription := range cfgInput.TenantSubscriptionMap[tenant] {
tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s_%s", tenantPipelineName, subscription.Namespace, subscription.Name)
tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s_%s", tenantRootPipeline, subscription.Namespace, subscription.Name)

namedPipelines[tenantSubscriptionPipelineName] = generatePipeline([]string{tenantRoutingName}, []string{fmt.Sprintf("attributes/subscription_%s", subscription.Name)}, []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)})

Expand Down
62 changes: 0 additions & 62 deletions internal/controller/telemetry/otel_conf_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,68 +280,6 @@ actual=
---`, otelColTargetYaml, actualYAML)
}
}

func Test_generateRootRoutingConnector(t *testing.T) {
type args struct {
tenants []v1alpha1.Tenant
}
tests := []struct {
name string
args args
want []RoutingConnector
}{
{
name: "two_tenants",
args: args{
tenants: []v1alpha1.Tenant{
{
ObjectMeta: metav1.ObjectMeta{
Name: "tenantA",
},
Status: v1alpha1.TenantStatus{
LogSourceNamespaces: []string{"a", "b"},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "tenantB",
},
Status: v1alpha1.TenantStatus{
LogSourceNamespaces: []string{"c", "d"},
},
},
},
},
want: []RoutingConnector{
{
Name: "routing/tenants_tenantA",
Table: []RoutingConnectorTableItem{
{
Statement: `route() where IsMatch(attributes["k8s.namespace.name"], "a") or IsMatch(attributes["k8s.namespace.name"], "b")`,
Pipelines: []string{"logs/tenant_tenantA"},
},
},
},
{
Name: "routing/tenants_tenantB",
Table: []RoutingConnectorTableItem{
{
Statement: `route() where IsMatch(attributes["k8s.namespace.name"], "c") or IsMatch(attributes["k8s.namespace.name"], "d")`,
Pipelines: []string{"logs/tenant_tenantB"},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := generateRootRoutingConnectors(tt.args.tenants)
assert.Equal(t, got, tt.want)
})
}
}

func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *testing.T) {
type fields struct {
Tenants []v1alpha1.Tenant
Expand Down

0 comments on commit f49c175

Please sign in to comment.