Skip to content

Commit

Permalink
fix: enable subscriptionless routing
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
  • Loading branch information
csatib02 committed Nov 8, 2024
1 parent 290cb5b commit 17a4bee
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ spec:
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .status.state
name: State
type: string
- jsonPath: .status.tenants
name: Tenants
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down
6 changes: 3 additions & 3 deletions config/crd/bases/telemetry.kube-logging.dev_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ spec:
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .status.state
name: State
type: string
- jsonPath: .status.tenants
name: Tenants
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down
35 changes: 0 additions & 35 deletions docs/examples/tenant-to-tenant-routing/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,6 @@ spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: shared
subscriptionNamespaceSelectors:
- matchLabels:
tenant: shared
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Subscription
metadata:
name: shared
namespace: shared
spec:
condition: "true"
outputs:
- name: openobserve-shared
namespace: shared
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Output
metadata:
name: openobserve-shared
namespace: shared
spec:
otlp:
endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081
headers:
Authorization: "Basic <TOKEN>"
organization: default
stream-name: shared
tls:
insecure: true
---
# A tenant that consumes logs from the shared tenant using a bridge
apiVersion: telemetry.kube-logging.dev/v1alpha1
Expand All @@ -62,9 +33,6 @@ metadata:
collector: cluster
name: database
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: database
subscriptionNamespaceSelectors:
- matchLabels:
tenant: database
Expand Down Expand Up @@ -112,9 +80,6 @@ metadata:
collector: cluster
name: web
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: web
subscriptionNamespaceSelectors:
- matchLabels:
tenant: web
Expand Down
31 changes: 0 additions & 31 deletions e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,6 @@ spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: shared
subscriptionNamespaceSelectors:
- matchLabels:
tenant: shared
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Subscription
metadata:
name: shared
namespace: shared
spec:
condition: "true"
outputs:
- name: otlp-test-output-shared
namespace: collector
---
apiVersion: telemetry.kube-logging.dev/v1alpha1
kind: Output
metadata:
name: otlp-test-output-shared
namespace: collector
spec:
otlp:
endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317
tls:
insecure: true
---
# A tenant that consumes logs from the shared tenant using a bridge
apiVersion: telemetry.kube-logging.dev/v1alpha1
Expand All @@ -84,9 +59,6 @@ metadata:
collector: cluster
name: database
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: database
subscriptionNamespaceSelectors:
- matchLabels:
tenant: database
Expand Down Expand Up @@ -130,9 +102,6 @@ metadata:
collector: cluster
name: web
spec:
logSourceNamespaceSelectors:
- matchLabels:
tenant: web
subscriptionNamespaceSelectors:
- matchLabels:
tenant: web
Expand Down
1 change: 1 addition & 0 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// deploying the collector that would immediately error due to configuration errors.
// Might also be a good place to add a validation webhook to validate the collector spec
if err := otelConfigInput.ValidateConfig(); err != nil {
collector.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(err), "failed validating otel config input")
return ctrl.Result{}, err
}
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any {
if tenantIdx := slices.IndexFunc(cfgInput.Tenants, func(t v1alpha1.Tenant) bool {
return tenantName == t.Name
}); tenantIdx != -1 {
k8sReceiverName := fmt.Sprintf("filelog/%s", tenantName)
namespaces := cfgInput.Tenants[tenantIdx].Status.LogSourceNamespaces
receivers[k8sReceiverName] = receiver.GenerateDefaultKubernetesReceiver(namespaces)
// Generate filelog receiver for the tenant if it has any logsource namespaces
if len(namespaces) > 0 {
receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces)
}
}
}

Expand All @@ -123,13 +125,19 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any {
maps.Copy(connectors, connector.GenerateCountConnectors())

for _, tenant := range cfgInput.Tenants {
rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions)
connectors[rc.Name] = rc
// Generate routing connector for the tenant's subscription if it has any
if len(cfgInput.TenantSubscriptionMap[tenant.Name]) > 0 {
rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions)
connectors[rc.Name] = rc
}
}

for _, subscription := range cfgInput.Subscriptions {
rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()])
connectors[rc.Name] = rc
// Generate routing connector for the subscription's outputs if it has any
if len(cfgInput.SubscriptionOutputMap[subscription.NamespacedName()]) > 0 {
rc := connector.GenerateRoutingConnectorForSubscriptionsOutputs(subscription.NamespacedName(), cfgInput.SubscriptionOutputMap[subscription.NamespacedName()])
connectors[rc.Name] = rc
}
}

for _, bridge := range cfgInput.Bridges {
Expand All @@ -146,7 +154,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
var namedPipelines = make(map[string]*otelv1beta1.Pipeline)
tenants := []string{}
for tenant := range cfgInput.TenantSubscriptionMap {
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(tenant)
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)
tenants = append(tenants, tenant)
}

Expand All @@ -155,7 +163,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
for _, tenant := range tenants {
// Generate a pipeline for the tenant
tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant)
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(tenant)
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)

connector.GenerateRoutingConnectorForBridgesTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges)
processor.GenerateTransformProcessorForTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Tenants)
Expand Down
Loading

0 comments on commit 17a4bee

Please sign in to comment.