Skip to content

Commit

Permalink
add missing subscription namespace everywhere
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Wilcsinszky <peter.wilcsinszky@axoflow.com>
  • Loading branch information
pepov committed Mar 21, 2024
1 parent 876f580 commit ff789bb
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions internal/controller/telemetry/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"slices"
"strings"

"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
"golang.org/x/exp/maps"
"gopkg.in/yaml.v3"

"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
)

type OtelColConfigInput struct {
Expand Down Expand Up @@ -206,7 +207,7 @@ func generateRootRoutingConnector(tenants []v1alpha1.Tenant) RoutingConnector {

func buildRoutingTableItemForSubscription(tenantName string, subscription v1alpha1.Subscription, index int) RoutingConnectorTableItem {

pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s", tenantName, subscription.Name)
pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s_%s", tenantName, subscription.Namespace, subscription.Name)

appendedSpaces := strings.Repeat(" ", index)

Expand Down Expand Up @@ -238,7 +239,7 @@ func (cfgInput *OtelColConfigInput) generateRoutingConnectorForTenantsSubscripti
}

func (cfgInput *OtelColConfigInput) generateRoutingConnectorForSubscriptionsOutputs(subscriptionRef v1alpha1.NamespacedName, outputNames []v1alpha1.NamespacedName) RoutingConnector {
rcName := fmt.Sprintf("routing/subscription_%s_outputs", subscriptionRef.Name)
rcName := fmt.Sprintf("routing/subscription_%s_%s_outputs", subscriptionRef.Namespace, subscriptionRef.Name)
rc := newRoutingConnector(rcName)

slices.SortFunc(outputNames, func(a, b v1alpha1.NamespacedName) int {
Expand Down Expand Up @@ -386,9 +387,9 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline

// Generate pipelines for the subscriptions for the tenant
for _, subscription := range cfgInput.TenantSubscriptionMap[tenant] {
tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s", tenantPipelineName, subscription.Name)
tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s_%s", tenantPipelineName, subscription.Namespace, subscription.Name)

namedPipelines[tenantSubscriptionPipelineName] = generatePipeline([]string{tenantRoutingName}, []string{fmt.Sprintf("attributes/subscription_%s", subscription.Name)}, []string{fmt.Sprintf("routing/subscription_%s_outputs", subscription.Name)})
namedPipelines[tenantSubscriptionPipelineName] = generatePipeline([]string{tenantRoutingName}, []string{fmt.Sprintf("attributes/subscription_%s", subscription.Name)}, []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)})

for _, outputRef := range cfgInput.SubscriptionOutputMap[subscription] {
outputPipelineName := fmt.Sprintf("logs/output_%s_%s_%s_%s", subscription.Namespace, subscription.Name, outputRef.Namespace, outputRef.Name)
Expand All @@ -401,11 +402,11 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline
output := cfgInput.Outputs[idx]

if output.Spec.Loki != nil {
namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_outputs", subscription.Name)}, []string{fmt.Sprintf("attributes/loki_exporter_%s", output.Name), fmt.Sprintf("resource/loki_exporter_%s", output.Name)}, []string{fmt.Sprintf("loki/%s_%s", output.Namespace, output.Name)})
namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}, []string{fmt.Sprintf("attributes/loki_exporter_%s", output.Name), fmt.Sprintf("resource/loki_exporter_%s", output.Name)}, []string{fmt.Sprintf("loki/%s_%s", output.Namespace, output.Name)})
}

if output.Spec.OTLP != nil {
namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_outputs", subscription.Name)}, []string{}, []string{fmt.Sprintf("otlp/%s_%s", output.Namespace, output.Name)})
namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}, []string{}, []string{fmt.Sprintf("otlp/%s_%s", output.Namespace, output.Name)})
}
}
}
Expand Down Expand Up @@ -574,7 +575,7 @@ func (cfgInput *OtelColConfigInput) ToIntermediateRepresentation() *OtelColConfi
result.Processors = cfgInput.generateProcessors()

result.Receivers = make(map[string]any)
k8sReceiverName := "filelog/kubernetes" //only one instance for now
k8sReceiverName := "filelog/kubernetes" // only one instance for now
result.Receivers[k8sReceiverName] = cfgInput.generateDefaultKubernetesReceiver()

result.Connectors = cfgInput.generateConnectors()
Expand Down

0 comments on commit ff789bb

Please sign in to comment.