Skip to content

Commit

Permalink
fix: resolve bridge tenant status conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
  • Loading branch information
csatib02 committed Nov 11, 2024
1 parent 744f859 commit 96ddc6f
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 92 deletions.
2 changes: 0 additions & 2 deletions api/telemetry/v1alpha1/bridge_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ type BridgeSpec struct {

// BridgeStatus defines the observed state of Bridge
type BridgeStatus struct {
State State `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster,categories=telemetry-all
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Source Tenant",type=string,JSONPath=`.spec.sourceTenant`
//+kubebuilder:printcolumn:name="Target Tenant",type=string,JSONPath=`.spec.targetTenant`
//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state`

// Bridge is the Schema for the Bridges API
type Bridge struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ spec:
- jsonPath: .spec.targetTenant
name: Target Tenant
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -67,9 +64,6 @@ spec:
type: object
status:
description: BridgeStatus defines the observed state of Bridge
properties:
state:
type: string
type: object
type: object
served: true
Expand Down
6 changes: 0 additions & 6 deletions config/crd/bases/telemetry.kube-logging.dev_bridges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ spec:
- jsonPath: .spec.targetTenant
name: Target Tenant
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -67,9 +64,6 @@ spec:
type: object
status:
description: BridgeStatus defines the observed state of Bridge
properties:
state:
type: string
type: object
type: object
served: true
Expand Down
35 changes: 13 additions & 22 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"reflect"
"slices"
"sort"
"time"

"emperror.dev/errors"
Expand Down Expand Up @@ -67,6 +66,7 @@ type BasicAuthClientAuthConfig struct {
func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, collector *v1alpha1.Collector) (otelcolconfgen.OtelColConfigInput, error) {
logger := log.FromContext(ctx)
tenantSubscriptionMap := make(map[string][]v1alpha1.NamespacedName)
var bridgesReferencedByTenant []v1alpha1.Bridge
subscriptionOutputMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName)

tenants, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector)
Expand Down Expand Up @@ -94,6 +94,17 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
}
subscriptions[subsName] = *queriedSubs
}

bridgeNames := tenant.Status.ConnectedBridges
for _, bridgeName := range bridgeNames {
queriedBridge := &v1alpha1.Bridge{}
if err = r.Client.Get(ctx, types.NamespacedName{Name: bridgeName}, queriedBridge); err != nil {
logger.Error(errors.WithStack(err), "failed getting bridges for tenant", "tenant", tenant.Name)
return otelcolconfgen.OtelColConfigInput{}, err
}

bridgesReferencedByTenant = append(bridgesReferencedByTenant, *queriedBridge)
}
}

for _, subscription := range subscriptions {
Expand All @@ -116,16 +127,10 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
}
}

bridges, err := r.getBridges(ctx, client.ListOptions{})
if err != nil {
logger.Error(errors.WithStack(err), "failed listing bridges")
return otelcolconfgen.OtelColConfigInput{}, err
}

return otelcolconfgen.OtelColConfigInput{
Tenants: tenants,
Subscriptions: subscriptions,
Bridges: bridges,
Bridges: bridgesReferencedByTenant,
OutputsWithSecretData: outputs,
TenantSubscriptionMap: tenantSubscriptionMap,
SubscriptionOutputMap: subscriptionOutputMap,
Expand Down Expand Up @@ -175,8 +180,6 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
logger.Info(fmt.Sprintf("getting collector: %q", req.Name))

if err := r.Get(ctx, req.NamespacedName, collector); err != nil {
logger.Error(errors.New("failed getting collector, possible API server error"), "failed getting collector, possible API server error",
"collector", req.Name)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand Down Expand Up @@ -527,18 +530,6 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l
return tenantsForSelector.Items, nil
}

func (r *CollectorReconciler) getBridges(ctx context.Context, listOpts client.ListOptions) ([]v1alpha1.Bridge, error) {
var bridges v1alpha1.BridgeList
if err := r.Client.List(ctx, &bridges, &listOpts); client.IgnoreNotFound(err) != nil {
return nil, err
}
sort.Slice(bridges.Items, func(i, j int) bool {
return bridges.Items[i].Name < bridges.Items[j].Name
})

return bridges.Items, nil
}

func normalizeStringSlice(inputList []string) []string {
allKeys := make(map[string]bool)
uniqueList := []string{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ func GenerateRoutingConnectorForBridge(bridge v1alpha1.Bridge) RoutingConnector
return rc
}

func hasPipelineReceiverOrExporter(pipeline *otelv1beta1.Pipeline, receiverName string) bool {
for _, receiver := range pipeline.Receivers {
if receiver == receiverName {
return true
}
}

for _, exporter := range pipeline.Exporters {
if exporter == receiverName {
return true
}
}

return false
}

func addConnectorToPipeline(pipeline *otelv1beta1.Pipeline, connectorName string, needsReceiver, needsExporter bool) {
if !hasPipelineReceiverOrExporter(pipeline, connectorName) {
if needsReceiver {
pipeline.Receivers = append(pipeline.Receivers, connectorName)
}
if needsExporter {
pipeline.Exporters = append(pipeline.Exporters, connectorName)
}
}
}

func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (needsReceiver bool, needsExporter bool, bridgeName string) {
if bridge.Spec.SourceTenant == tenantName {
needsExporter = true
Expand All @@ -118,11 +145,7 @@ func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (n
func GenerateRoutingConnectorForBridgesTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline, bridges []v1alpha1.Bridge) {
for _, bridge := range bridges {
needsReceiver, needsExporter, bridgeName := checkBridgeConnectorForTenant(tenantName, bridge)
if needsReceiver {
pipeline.Receivers = append(pipeline.Receivers, fmt.Sprintf("routing/bridge_%s", bridgeName))
}
if needsExporter {
pipeline.Exporters = append(pipeline.Exporters, fmt.Sprintf("routing/bridge_%s", bridgeName))
}
connectorName := fmt.Sprintf("routing/bridge_%s", bridgeName)
addConnectorToPipeline(pipeline, connectorName, needsReceiver, needsExporter)
}
}
54 changes: 4 additions & 50 deletions internal/controller/telemetry/route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"slices"
"sort"
"strings"

"emperror.dev/errors"
Expand Down Expand Up @@ -68,8 +69,6 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
logger.Info(fmt.Sprintf("getting tenant: %q", req.NamespacedName.Name))

if err := r.Get(ctx, req.NamespacedName, tenant); err != nil {
logger.Error(errors.New("failed getting tenant, possible API server error"), "failed getting tenant, possible API server error",
"tenant", req.NamespacedName.String())
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand Down Expand Up @@ -135,25 +134,9 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}
return ctrl.Result{}, err
}
tenant.Status.ConnectedBridges = getBridgeNamesFromBridges(bridgesForTenant)

for _, bridge := range bridgesForTenant {
originalBridgeStatus := bridge.Status.DeepCopy()
if err := r.checkBridgeConnections(ctx, &bridge); err != nil {
bridge.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(err), "failed bridge connection verification", "bridge", bridge.Name)
return ctrl.Result{}, err
}

bridge.Status.State = v1alpha1.StateReady
if !reflect.DeepEqual(originalBridgeStatus, bridge.Status) {
if updateErr := r.Status().Update(ctx, &bridge); updateErr != nil {
bridge.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(updateErr), "failed update bridge status", "bridge", bridge.Name)
return ctrl.Result{}, err
}
}
}
bridgesForTenantNames := getBridgeNamesFromBridges(bridgesForTenant)
sort.Strings(bridgesForTenantNames)
tenant.Status.ConnectedBridges = bridgesForTenantNames

logsourceNamespacesForTenant, err := r.getLogsourceNamespaceNamesForTenant(ctx, tenant)
if err != nil {
Expand Down Expand Up @@ -469,35 +452,6 @@ func (r *RouteReconciler) getBridgesForTenant(ctx context.Context, tenantName st
return
}

func (r *RouteReconciler) getTenants(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Tenant, error) {
var tenants v1alpha1.TenantList
if err := r.Client.List(ctx, &tenants, listOpts); client.IgnoreNotFound(err) != nil {
return nil, err
}

return tenants.Items, nil
}

func (r *RouteReconciler) checkBridgeConnections(ctx context.Context, bridge *v1alpha1.Bridge) error {
for _, tenant := range []string{bridge.Spec.SourceTenant, bridge.Spec.TargetTenant} {
listOpts := &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(tenantNameField, tenant),
}
tenants, err := r.getTenants(ctx, listOpts)
if err != nil {
return err
}
if len(tenants) == 0 {
return errors.Errorf("tenant: %s not found for bridge: %s", tenant, bridge.Name)
}
if len(tenants) != 1 || tenants[0].Name != tenant {
return errors.Errorf("bridge: %s has invalid tenant reference: %s", bridge.Name, tenant)
}
}

return nil
}

func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace {
allKeys := make(map[string]bool)
uniqueList := []apiv1.Namespace{}
Expand Down

0 comments on commit 96ddc6f

Please sign in to comment.