Skip to content

Commit

Permalink
feat: further stabilize operator
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
  • Loading branch information
csatib02 committed Nov 11, 2024
1 parent cd5d00e commit 14bf7f4
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 128 deletions.
2 changes: 0 additions & 2 deletions api/telemetry/v1alpha1/bridge_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ type BridgeSpec struct {

// BridgeStatus defines the observed state of Bridge
type BridgeStatus struct {
State State `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster,categories=telemetry-all
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Source Tenant",type=string,JSONPath=`.spec.sourceTenant`
//+kubebuilder:printcolumn:name="Target Tenant",type=string,JSONPath=`.spec.targetTenant`
//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state`

// Bridge is the Schema for the Bridges API
type Bridge struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ spec:
- jsonPath: .spec.targetTenant
name: Target Tenant
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -67,9 +64,6 @@ spec:
type: object
status:
description: BridgeStatus defines the observed state of Bridge
properties:
state:
type: string
type: object
type: object
served: true
Expand Down
6 changes: 0 additions & 6 deletions config/crd/bases/telemetry.kube-logging.dev_bridges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ spec:
- jsonPath: .spec.targetTenant
name: Target Tenant
type: string
- jsonPath: .status.state
name: State
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -67,9 +64,6 @@ spec:
type: object
status:
description: BridgeStatus defines the observed state of Bridge
properties:
state:
type: string
type: object
type: object
served: true
Expand Down
77 changes: 27 additions & 50 deletions internal/controller/telemetry/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -44,6 +43,10 @@ import (
"github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components"
)

var (
ErrTenantFailed = errors.New("tenant failed")
)

const (
requeueDelayOnFailedTenant = 20 * time.Second
axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0"
Expand All @@ -55,20 +58,15 @@ type CollectorReconciler struct {
Scheme *runtime.Scheme
}

type TenantFailedError struct {
msg string
}

type BasicAuthClientAuthConfig struct {
Username string
Password string
}

func (e *TenantFailedError) Error() string { return e.msg }

func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, collector *v1alpha1.Collector) (otelcolconfgen.OtelColConfigInput, error) {
logger := log.FromContext(ctx)
tenantSubscriptionMap := make(map[string][]v1alpha1.NamespacedName)
var bridgesReferencedByTenant []v1alpha1.Bridge
subscriptionOutputMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName)

tenants, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector)
Expand All @@ -81,15 +79,13 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
}

for _, tenant := range tenants {

if tenant.Status.State == v1alpha1.StateFailed {
logger.Info("tenant %q is in failed state, retrying later", tenant.Name)
return otelcolconfgen.OtelColConfigInput{}, &TenantFailedError{msg: "tenant failed"}
return otelcolconfgen.OtelColConfigInput{}, ErrTenantFailed
}

subscriptionNames := tenant.Status.Subscriptions
tenantSubscriptionMap[tenant.Name] = subscriptionNames

for _, subsName := range subscriptionNames {
queriedSubs := &v1alpha1.Subscription{}
if err = r.Client.Get(ctx, types.NamespacedName(subsName), queriedSubs); err != nil {
Expand All @@ -98,12 +94,22 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
}
subscriptions[subsName] = *queriedSubs
}

bridgeNames := tenant.Status.ConnectedBridges
for _, bridgeName := range bridgeNames {
queriedBridge := &v1alpha1.Bridge{}
if err = r.Client.Get(ctx, types.NamespacedName{Name: bridgeName}, queriedBridge); err != nil {
logger.Error(errors.WithStack(err), "failed getting bridges for tenant", "tenant", tenant.Name)
return otelcolconfgen.OtelColConfigInput{}, err
}

bridgesReferencedByTenant = append(bridgesReferencedByTenant, *queriedBridge)
}
}

for _, subscription := range subscriptions {
outputNames := subscription.Status.Outputs
subscriptionOutputMap[subscription.NamespacedName()] = outputNames

for _, outputName := range outputNames {
outputWithSecretData := components.OutputWithSecretData{}

Expand All @@ -117,29 +123,20 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context,
if err := r.populateSecretForOutput(ctx, queriedOutput, &outputWithSecretData); err != nil {
return otelcolconfgen.OtelColConfigInput{}, err
}

outputs = append(outputs, outputWithSecretData)
}
}

bridges, err := r.getBridges(ctx, client.ListOptions{})
if err != nil {
logger.Error(errors.WithStack(err), "failed listing bridges")
return otelcolconfgen.OtelColConfigInput{}, err
}

otelConfigInput := otelcolconfgen.OtelColConfigInput{
return otelcolconfgen.OtelColConfigInput{
Tenants: tenants,
Subscriptions: subscriptions,
Bridges: bridges,
Bridges: bridgesReferencedByTenant,
OutputsWithSecretData: outputs,
TenantSubscriptionMap: tenantSubscriptionMap,
SubscriptionOutputMap: subscriptionOutputMap,
Debug: collector.Spec.Debug,
MemoryLimiter: *collector.Spec.MemoryLimiter,
}

return otelConfigInput, nil
}, nil
}

func (r *CollectorReconciler) populateSecretForOutput(ctx context.Context, queriedOutput *v1alpha1.Output, outputWithSecret *components.OutputWithSecretData) error {
Expand Down Expand Up @@ -182,10 +179,8 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
collector := &v1alpha1.Collector{}
logger.Info(fmt.Sprintf("getting collector: %q", req.Name))

if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil {
logger.Error(errors.New("failed getting collector, possible API server error"), "failed getting collector, possible API server error",
"collector", req.Name)
return ctrl.Result{}, err
if err := r.Get(ctx, req.NamespacedName, collector); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

collector.Spec.SetDefaults()
Expand All @@ -194,13 +189,13 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

otelConfigInput, err := r.buildConfigInputForCollector(ctx, collector)
if err != nil {
if errors.Is(err, &TenantFailedError{}) {
if errors.Is(err, ErrTenantFailed) {
return ctrl.Result{RequeueAfter: requeueDelayOnFailedTenant}, err
}
return ctrl.Result{}, err
}

if err := otelConfigInput.ValidateConfig(); ignoreNoResourcesError(err) != nil {
if err := otelConfigInput.ValidateConfig(); err != nil {
collector.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(err), "invalid otel config input")
return ctrl.Result{}, err
Expand Down Expand Up @@ -281,10 +276,8 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
_, err = resourceReconciler.ReconcileResource(&otelCollector, reconciler.StatePresent)
if err != nil {
collector.Status.State = v1alpha1.StateFailed
if apierrors.IsConflict(err) {
logger.Info("conflict while creating otel collector, retrying")
return ctrl.Result{Requeue: true}, nil
}
logger.Error(errors.WithStack(err), "failed reconciling otel collector")
return ctrl.Result{}, err
}

tenantNames := []string{}
Expand All @@ -297,6 +290,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if !reflect.DeepEqual(originalCollectorStatus, collector.Status) {
logger.Info("collector status changed")
if err = r.Client.Status().Update(ctx, collector); err != nil {
collector.Status.State = v1alpha1.StateFailed
logger.Error(errors.WithStack(err), "failed updating collector status")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -536,15 +530,6 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l
return tenantsForSelector.Items, nil
}

func (r *CollectorReconciler) getBridges(ctx context.Context, listOpts client.ListOptions) ([]v1alpha1.Bridge, error) {
var bridges v1alpha1.BridgeList
if err := r.Client.List(ctx, &bridges, &listOpts); client.IgnoreNotFound(err) != nil {
return nil, err
}

return bridges.Items, nil
}

func normalizeStringSlice(inputList []string) []string {
allKeys := make(map[string]bool)
uniqueList := []string{}
Expand All @@ -558,11 +543,3 @@ func normalizeStringSlice(inputList []string) []string {

return uniqueList
}

func ignoreNoResourcesError(err error) error {
if err == otelcolconfgen.ErrNoResources {
return nil
}

return err
}
31 changes: 25 additions & 6 deletions internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"slices"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -49,6 +50,21 @@ type OtelColConfigInput struct {
Debug bool
}

func (cfgInput *OtelColConfigInput) IsEmpty() bool {
v := reflect.ValueOf(*cfgInput)
for i := range v.NumField() {
field := v.Field(i)
switch field.Kind() {
case reflect.Slice, reflect.Map:
if field.Len() != 0 {
return false
}
}
}

return true
}

func (cfgInput *OtelColConfigInput) generateExporters(ctx context.Context) map[string]any {
exporters := map[string]any{}
maps.Copy(exporters, exporter.GenerateMetricsExporters())
Expand Down Expand Up @@ -248,9 +264,14 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be
}
}

extensionNames := make([]string, 0, len(extensions))
for k := range extensions {
extensionNames = append(extensionNames, k)
var extensionNames []string
if len(extensions) > 0 {
extensionNames = make([]string, 0, len(extensions))
for k := range extensions {
extensionNames = append(extensionNames, k)
}
} else {
extensionNames = nil
}

return otelv1beta1.Config{
Expand Down Expand Up @@ -320,16 +341,14 @@ func validateSubscriptionsAndBridges(tenants *[]v1alpha1.Tenant, subscriptions *
}

func (cfgInput *OtelColConfigInput) ValidateConfig() error {
if cfgInput == nil {
if cfgInput.IsEmpty() {
return ErrNoResources
}

var result *multierror.Error

if err := validateTenants(&cfgInput.Tenants); err != nil {
result = multierror.Append(result, err)
}

if err := validateSubscriptionsAndBridges(&cfgInput.Tenants, &cfgInput.Subscriptions, &cfgInput.Bridges); err != nil {
result = multierror.Append(result, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ func GenerateRoutingConnectorForBridge(bridge v1alpha1.Bridge) RoutingConnector
return rc
}

func hasPipelineReceiverOrExporter(pipeline *otelv1beta1.Pipeline, receiverName string) bool {
for _, receiver := range pipeline.Receivers {
if receiver == receiverName {
return true
}
}

for _, exporter := range pipeline.Exporters {
if exporter == receiverName {
return true
}
}

return false
}

func addConnectorToPipeline(pipeline *otelv1beta1.Pipeline, connectorName string, needsReceiver, needsExporter bool) {
if !hasPipelineReceiverOrExporter(pipeline, connectorName) {
if needsReceiver {
pipeline.Receivers = append(pipeline.Receivers, connectorName)
}
if needsExporter {
pipeline.Exporters = append(pipeline.Exporters, connectorName)
}
}
}

func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (needsReceiver bool, needsExporter bool, bridgeName string) {
if bridge.Spec.SourceTenant == tenantName {
needsExporter = true
Expand All @@ -118,11 +145,7 @@ func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (n
func GenerateRoutingConnectorForBridgesTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline, bridges []v1alpha1.Bridge) {
for _, bridge := range bridges {
needsReceiver, needsExporter, bridgeName := checkBridgeConnectorForTenant(tenantName, bridge)
if needsReceiver {
pipeline.Receivers = append(pipeline.Receivers, fmt.Sprintf("routing/bridge_%s", bridgeName))
}
if needsExporter {
pipeline.Exporters = append(pipeline.Exporters, fmt.Sprintf("routing/bridge_%s", bridgeName))
}
connectorName := fmt.Sprintf("routing/bridge_%s", bridgeName)
addConnectorToPipeline(pipeline, connectorName, needsReceiver, needsExporter)
}
}
Loading

0 comments on commit 14bf7f4

Please sign in to comment.