diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index 54f3c628..1158144a 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -26,12 +26,14 @@ type SubscriptionSpec struct { // SubscriptionStatus defines the observed state of Subscription type SubscriptionStatus struct { - Tenant string `json:"tenant,omitempty"` + Tenant string `json:"tenant,omitempty"` + Outputs []NamespacedName `json:"outputs,omitempty"` } // +kubebuilder:object:root=true // +kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Tenant",type=string,JSONPath=`.status.tenant` +//+kubebuilder:printcolumn:name="Outputs",type=string,JSONPath=`.status.outputs` //+kubebuilder:resource:categories=telemetry-all // Subscription is the Schema for the subscriptions API diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index b905fa31..d8eec6ab 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -24,11 +24,16 @@ type TenantSpec struct { LogSourceNamespaceSelectors []metav1.LabelSelector `json:"logSourceNamespaceSelectors,omitempty"` } +const ( + StateReady = "ready" + StateFailed = "failed" +) + // TenantStatus defines the observed state of Tenant type TenantStatus struct { - Subscriptions []string `json:"subscriptions,omitempty"` - LogSourceNamespaces []string `json:"logSourceNamespaces,omitempty"` - Collector string `json:"collector"` + Subscriptions []NamespacedName `json:"subscriptions,omitempty"` + LogSourceNamespaces []string `json:"logSourceNamespaces,omitempty"` + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true @@ -36,7 +41,7 @@ type TenantStatus struct { //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Subscriptions",type=string,JSONPath=`.status.subscriptions` //+kubebuilder:printcolumn:name="Logsource namespaces",type=string,JSONPath=`.status.logSourceNamespaces` -//+kubebuilder:printcolumn:name="Collector",type=string,JSONPath=`.status.collector` +//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` // Tenant is the Schema for the tenants API type Tenant struct { @@ -59,7 +64,3 @@ type TenantList struct { func init() { SchemeBuilder.Register(&Tenant{}, &TenantList{}) } - -func (t *Tenant) NamespacedName() NamespacedName { - return NamespacedName{Namespace: t.Namespace, Name: t.Name} -} diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index 3dd59244..2105098f 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -336,7 +336,7 @@ func (in *Subscription) DeepCopyInto(out *Subscription) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Subscription. @@ -412,6 +412,11 @@ func (in *SubscriptionSpec) DeepCopy() *SubscriptionSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { *out = *in + if in.Outputs != nil { + in, out := &in.Outputs, &out.Outputs + *out = make([]NamespacedName, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionStatus. @@ -548,7 +553,7 @@ func (in *TenantStatus) DeepCopyInto(out *TenantStatus) { *out = *in if in.Subscriptions != nil { in, out := &in.Subscriptions, &out.Subscriptions - *out = make([]string, len(*in)) + *out = make([]NamespacedName, len(*in)) copy(*out, *in) } if in.LogSourceNamespaces != nil { diff --git a/cmd/main.go b/cmd/main.go index 7d3295b6..6eb9c325 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -96,6 +96,13 @@ func main() { os.Exit(1) } + if err = (&controller.RouteReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Route") + os.Exit(1) + } if err = (&controller.CollectorReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index 83910b87..f112b51f 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -20,6 +20,9 @@ spec: - jsonPath: .status.tenant name: Tenant type: string + - jsonPath: .status.outputs + name: Outputs + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -63,6 +66,18 @@ spec: status: description: SubscriptionStatus defines the observed state of Subscription properties: + outputs: + items: + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object + type: array tenant: type: string type: object diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index 1524595c..ac03c66a 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -23,8 +23,8 @@ spec: - jsonPath: .status.logSourceNamespaces name: Logsource namespaces type: string - - jsonPath: .status.collector - name: Collector + - jsonPath: .status.state + name: State type: string name: v1alpha1 schema: @@ -153,18 +153,24 @@ spec: status: description: TenantStatus defines the observed state of Tenant properties: - collector: - type: string logSourceNamespaces: items: type: string type: array + state: + type: string subscriptions: items: - type: string + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object type: array - required: - - collector type: object type: object served: true diff --git a/go.mod b/go.mod index 0ed355d2..1d2183a0 100644 --- a/go.mod +++ b/go.mod @@ -27,10 +27,11 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/siliconbrain/go-mapseqs v0.2.0 // indirect + github.com/siliconbrain/go-seqs v0.1.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/wayneashleyberry/terminal-dimensions v1.1.0 // indirect go.opentelemetry.io/collector/featuregate v0.77.0 // indirect - logur.dev/logur v0.17.0 // indirect ) require ( @@ -72,7 +73,7 @@ require ( go.opentelemetry.io/collector/config/configcompression v0.91.0 go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/exp v0.0.0-20231006140011-7918f672742d + golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.14.0 // indirect golang.org/x/sys v0.15.0 // indirect @@ -84,7 +85,7 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.28.4 k8s.io/apiextensions-apiserver v0.28.4 // indirect @@ -92,8 +93,7 @@ require ( k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect k8s.io/utils v0.0.0-20231127182322-b307cd553661 // indirect - logur.dev/integration/logr v0.5.0 sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.4.0 // indirect + sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index af640f18..b0b0e1ef 100644 --- a/go.sum +++ b/go.sum @@ -34,7 +34,6 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -140,6 +139,10 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/siliconbrain/go-mapseqs v0.2.0 h1:TQrWhpFMZG6kr8KoNDJAqT83h9fM3DG1C0HsfkAfBNU= +github.com/siliconbrain/go-mapseqs v0.2.0/go.mod h1:IE8lGocwpiXDwm7hWXKz4keUvgYIjApJY+ZAiFwpy8U= +github.com/siliconbrain/go-seqs v0.1.0 h1:6esr7kbXi+zZ+NKM3cdSPBPRoPpxbQw7DBt4g/RGpA4= +github.com/siliconbrain/go-seqs v0.1.0/go.mod h1:C5Kz1fQpLUN6aSGKh4uFFyRSMJBxZtii/3+iRf5MaAk= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -273,10 +276,6 @@ k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/A k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6RxQGZDnzuLcrUTI= k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -logur.dev/integration/logr v0.5.0 h1:Xy4/PIBbGKKsKGt+86sBJhMa8MSX5+XsyOaoBBU+tnw= -logur.dev/integration/logr v0.5.0/go.mod h1:Kp0WOJ6LRV7ewobPZddHHoLmLOpDmi+phZFhpUWD1Qg= -logur.dev/logur v0.17.0 h1:lwFZk349ZBY7KhonJFLshP/VhfFa6BxOjHxNnPHnEyc= -logur.dev/logur v0.17.0/go.mod h1:DyA5B+b6WjjCcnpE1+HGtTLh2lXooxRq+JmAwXMRK08= sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 0f3f9e62..acc533a1 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -18,14 +18,16 @@ import ( "context" "fmt" "reflect" - "strings" + "slices" + "time" "emperror.dev/errors" - "golang.org/x/exp/slices" + "github.com/cisco-open/operator-tools/pkg/reconciler" + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" apiv1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -33,163 +35,111 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" - - "github.com/cisco-open/operator-tools/pkg/reconciler" - otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" ) +const tenantReferenceField = ".status.tenant" +const requeueDelayOnFailedTenant = 20 * time.Second + // CollectorReconciler reconciles a Collector object type CollectorReconciler struct { client.Client Scheme *runtime.Scheme } -const collectorReferenceField = ".status.collector" -const tenantReferenceField = ".status.tenant" +type TenantFailedError struct { + msg string +} -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;oteloutputs;,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;oteloutputs/status;,verbs=get;update;patch -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update -// +kubebuilder:rbac:groups="",resources=nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch -// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups="",resources=services;persistentvolumeclaims;serviceaccounts;pods,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=apps,resources=statefulsets;daemonsets;replicasets,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;create;update;patch;delete +func (e *TenantFailedError) Error() string { return e.msg } -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the Collector object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile -func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, collector *v1alpha1.Collector) (OtelColConfigInput, error) { logger := log.FromContext(ctx) + tenantSubscriptionMap := make(map[string][]v1alpha1.NamespacedName) + subscriptionOutputMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName) - collector := &v1alpha1.Collector{} - - logger.Info("Reconciling collector") - - if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil { - return ctrl.Result{}, err - } - - originalCollectorStatus := collector.Status.DeepCopy() - - tenantSubscriptionMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName) tenants, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector) - if err != nil { - return ctrl.Result{}, err - } + subscriptions := make(map[v1alpha1.NamespacedName]v1alpha1.Subscription) + outputs := []v1alpha1.OtelOutput{} - tenantsToDisown, err := r.getTenantsReferencingCollectorButNotSelected(ctx, collector, tenants) if err != nil { - return ctrl.Result{}, err - } - - r.disownTenants(ctx, tenantsToDisown) - - tenantNames := []string{} - - if err := r.Status().Update(ctx, collector); err != nil { - return ctrl.Result{}, err + logger.Error(errors.WithStack(err), "failed listing tenants") + return OtelColConfigInput{}, err } - logger.Info("Setting collector status") - - allSubscriptions := []v1alpha1.Subscription{} - for _, tenant := range tenants { - originalTenantStatus := tenant.Status.DeepCopy() - - // check if tenant is owned by us, or make it ours only if orphan - // this update will connect the tenant and collector exclusively - if tenant.Status.Collector != "" && tenant.Status.Collector != collector.Name { - logger.Error(errors.Errorf("tenant (%s) is owned by another collector (%s), skipping reconciliation for this collector (%s)", tenant.Name, tenant.Status.Collector, collector.Name), - "make sure to remove tenant from the previous collector before adopting to new collector") - continue - } - - tenantNames = append(tenantNames, tenant.Name) - subscriptionsForTenant, updateList, err := r.getSubscriptionsForTenant(ctx, &tenant) - if err != nil { - return ctrl.Result{}, err + if tenant.Status.State == v1alpha1.StateFailed { + logger.Info("tenant %q is in failed state, retrying later", tenant.Name) + return OtelColConfigInput{}, &TenantFailedError{msg: "tenant failed"} } - // add all newly updated subscriptions here - subscriptionsForTenant = append(subscriptionsForTenant, r.updateSubscriptionsForTenant(ctx, tenant.Name, updateList)...) + subscriptionNames := tenant.Status.Subscriptions + tenantSubscriptionMap[tenant.Name] = subscriptionNames - subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, &tenant, subscriptionsForTenant) - - r.disownSubscriptions(ctx, subscriptionsToDisown) - - allSubscriptions = append(allSubscriptions, subscriptionsForTenant...) - - subscriptionNames := getSubscriptionNamesFromSubscription(subscriptionsForTenant) - - tenantSubscriptionMap[tenant.NamespacedName()] = subscriptionNames - - stringSubscriptionNames := []string{} - - for _, name := range subscriptionNames { - stringSubscriptionNames = append(stringSubscriptionNames, name.String()) + for _, subsName := range subscriptionNames { + queriedSubs := &v1alpha1.Subscription{} + if err = r.Client.Get(ctx, types.NamespacedName(subsName), queriedSubs); err != nil { + logger.Error(errors.WithStack(err), "failed getting subscriptions for tenant", "tenant", tenant.Name) + return OtelColConfigInput{}, err + } + subscriptions[subsName] = *queriedSubs } + } - slices.Sort(stringSubscriptionNames) - tenant.Status.Subscriptions = stringSubscriptionNames - - logsourceNamespacesForTenant, err := r.getLogsourceNamespaceNamesForTenant(ctx, &tenant) - if err != nil { - return ctrl.Result{}, err + for _, subscription := range subscriptions { + outputNames := subscription.Status.Outputs + subscriptionOutputMap[subscription.NamespacedName()] = outputNames + + for _, outputName := range outputNames { + queriedOutput := &v1alpha1.OtelOutput{} + if err = r.Client.Get(ctx, types.NamespacedName(outputName), queriedOutput); err != nil { + logger.Error(errors.WithStack(err), "failed getting outputs for subscription", "subscription", subscription.NamespacedName().String()) + return OtelColConfigInput{}, err + } + outputs = append(outputs, *queriedOutput) } + } - tenant.Status.Collector = collector.Name + otelConfigInput := OtelColConfigInput{ + Tenants: tenants, + Subscriptions: subscriptions, + Outputs: outputs, + TenantSubscriptionMap: tenantSubscriptionMap, + SubscriptionOutputMap: subscriptionOutputMap, + } - slices.Sort(logsourceNamespacesForTenant) - tenant.Status.LogSourceNamespaces = logsourceNamespacesForTenant + return otelConfigInput, nil +} - if !reflect.DeepEqual(*originalTenantStatus, tenant.Status) { - logger.Info("updating tenant tenant status") - if err := r.Status().Update(ctx, &tenant); err != nil { - return ctrl.Result{}, err - } - } - } +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;oteloutputs;,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;oteloutputs/status;,verbs=get;update;patch +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch +// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=services;persistentvolumeclaims;serviceaccounts;pods,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps,resources=statefulsets;daemonsets;replicasets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;create;update;patch;delete - slices.Sort(tenantNames) +func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx, "collector", req.Name) - collector.Status.Tenants = tenantNames + collector := &v1alpha1.Collector{} - if !reflect.DeepEqual(*originalCollectorStatus, collector.Status) { - logger.Info("updating collector status") - err = r.Status().Update(ctx, collector) - if err != nil { - return ctrl.Result{}, err - } - } + logger.Info("Reconciling collector") - outputs, err := r.getAllOutputs(ctx) - if err != nil { + if err := r.Get(ctx, req.NamespacedName, collector); client.IgnoreNotFound(err) != nil { return ctrl.Result{}, err } - subscriptionOutputMap := map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{} + originalCollectorStatus := collector.Status.DeepCopy() - for _, subscription := range allSubscriptions { - subscriptionOutputMap[subscription.NamespacedName()] = subscription.Spec.Outputs - } + otelConfigInput, err := r.buildConfigInputForCollector(ctx, collector) + if err != nil { + if errors.Is(err, &TenantFailedError{}) { + return ctrl.Result{RequeueAfter: requeueDelayOnFailedTenant}, err - otelConfigInput := OtelColConfigInput{ - Tenants: tenants, - Subscriptions: allSubscriptions, - Outputs: outputs, - TenantSubscriptionMap: tenantSubscriptionMap, - SubscriptionOutputMap: subscriptionOutputMap, + } + return ctrl.Result{}, err } otelConfig, err := otelConfigInput.ToIntermediateRepresentation().ToYAML() @@ -258,31 +208,25 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - return ctrl.Result{}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { + tenantNames := []string{} - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Tenant{}, collectorReferenceField, func(rawObj client.Object) []string { - tenant := rawObj.(*v1alpha1.Tenant) - if tenant.Status.Collector == "" { - return nil - } - return []string{tenant.Status.Collector} - }); err != nil { - return err + for _, tenant := range otelConfigInput.Tenants { + tenantNames = append(tenantNames, tenant.Name) } - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, tenantReferenceField, func(rawObj client.Object) []string { - subscription := rawObj.(*v1alpha1.Subscription) - if subscription.Status.Tenant == "" { - return nil + collector.Status.Tenants = normalizeStringSlice(tenantNames) + + if !reflect.DeepEqual(originalCollectorStatus, collector.Status) { + if err = r.Client.Status().Update(ctx, collector); err != nil { + logger.Error(errors.WithStack(err), "failed updating collector status") } - return []string{subscription.Status.Tenant} - }); err != nil { - return err } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { addCollectorRequest := func(requests []reconcile.Request, collector string) []reconcile.Request { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -291,94 +235,68 @@ func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { }) return requests } - return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Collector{}). Watches(&v1alpha1.Tenant{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { - logger := log.FromContext(ctx) - tenant, _ := object.(*v1alpha1.Tenant) - - collectors := v1alpha1.CollectorList{} - err := r.List(ctx, &collectors) + collectors := &v1alpha1.CollectorList{} + err := r.List(ctx, collectors) if err != nil { - logger.Error(errors.WithStack(err), "failed listing collectors for mapping requests, unable to send requests") - return nil + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return } - CollectorLoop: - for _, collector := range collectors.Items { - tenantsForCollector, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector) - if err != nil { - logger.Error(errors.WithStack(err), "failed listing tenants for collector, notifying collector anyways") - requests = addCollectorRequest(requests, collector.Name) - continue CollectorLoop - } - - for _, t := range tenantsForCollector { - if t.Name == tenant.Name { - requests = addCollectorRequest(requests, collector.Name) - continue CollectorLoop - } - } - - tenantsToDisown, err := r.getTenantsReferencingCollectorButNotSelected(ctx, &collector, tenantsForCollector) - if err != nil { - logger.Error(errors.WithStack(err), "failed listing tenants disowned, notifying collector anyways") - requests = addCollectorRequest(requests, collector.Name) - continue CollectorLoop - } - - for _, t := range tenantsToDisown { - if t.Name == tenant.Name { - requests = addCollectorRequest(requests, collector.Name) - continue CollectorLoop - } - } + for _, tenant := range collectors.Items { + requests = addCollectorRequest(requests, tenant.Name) } return })). Watches(&v1alpha1.Subscription{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { - subscription, _ := object.(*v1alpha1.Subscription) + logger := log.FromContext(ctx) + + collectors := &v1alpha1.CollectorList{} + err := r.List(ctx, collectors) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + for _, tenant := range collectors.Items { + requests = addCollectorRequest(requests, tenant.Name) + } + return + })). + Watches(&v1alpha1.OtelOutput{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + logger := log.FromContext(ctx) + + collectors := &v1alpha1.CollectorList{} + err := r.List(ctx, collectors) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + for _, tenant := range collectors.Items { + requests = addCollectorRequest(requests, tenant.Name) + } + + return + })). + Watches(&apiv1.Namespace{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { logger := log.FromContext(ctx) - tenants := v1alpha1.TenantList{} - err := r.List(ctx, &tenants) + collectors := &v1alpha1.CollectorList{} + err := r.List(ctx, collectors) if err != nil { logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") return } - TenantLoop: - for _, tenant := range tenants.Items { - if tenant.Status.Collector == "" { - logger.Error(errors.WithStack(err), fmt.Sprintf("tenant %s is orphan, skipping it, and its subscriptions when looking for changes", tenant.Name)) - } - subscriptionsForTenant, subscriptionsToUpdate, err := r.getSubscriptionsForTenant(ctx, &tenant) - if err != nil { - logger.Error(errors.WithStack(err), "failed listing subscriptions for collector, notifying collector anyways") - requests = addCollectorRequest(requests, tenant.Status.Collector) - continue TenantLoop - } - - for _, s := range append(subscriptionsForTenant, subscriptionsToUpdate...) { - if s.Name == subscription.Name { - requests = addCollectorRequest(requests, tenant.Status.Collector) - continue TenantLoop - } - } - - subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, &tenant, subscriptionsForTenant) - - for _, s := range subscriptionsToDisown { - if s.Name == subscription.Name { - requests = addCollectorRequest(requests, tenant.Status.Collector) - continue TenantLoop - } - } + for _, tenant := range collectors.Items { + requests = addCollectorRequest(requests, tenant.Name) } return @@ -489,15 +407,6 @@ func (r *CollectorReconciler) reconcileClusterRole(ctx context.Context, collecto return err } -func getSubscriptionNamesFromSubscription(subscriptions []v1alpha1.Subscription) []v1alpha1.NamespacedName { - subscriptionNames := make([]v1alpha1.NamespacedName, len(subscriptions)) - for i, subscription := range subscriptions { - subscriptionNames[i] = subscription.NamespacedName() - } - - return subscriptionNames -} - func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, labelSelector metav1.LabelSelector) ([]v1alpha1.Tenant, error) { selector, err := metav1.LabelSelectorAsSelector(&labelSelector) @@ -517,229 +426,16 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l return tenantsForSelector.Items, nil } -func (r *CollectorReconciler) getTenantsReferencingCollectorButNotSelected(ctx context.Context, collector *v1alpha1.Collector, selectedTenants []v1alpha1.Tenant) ([]v1alpha1.Tenant, error) { - var tenantsReferencing v1alpha1.TenantList - - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(collectorReferenceField, collector.Name), - } - - if err := r.Client.List(ctx, &tenantsReferencing, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - tenantsToDisown := []v1alpha1.Tenant{} - - for _, tenantReferencing := range tenantsReferencing.Items { - selected := false - - for _, selectedTenant := range selectedTenants { - if tenantReferencing.Name == selectedTenant.Name { - selected = true - break - } - } - - if !selected { - tenantsToDisown = append(tenantsToDisown, tenantReferencing) - } - - } - - return tenantsToDisown, nil - -} - -func (r *CollectorReconciler) disownTenants(ctx context.Context, tenantsToDisown []v1alpha1.Tenant) { - logger := log.FromContext(ctx) - for _, tenant := range tenantsToDisown { - tenant.Status.Collector = "" - err := r.Client.Status().Update(ctx, &tenant) - if err != nil { - logger.Error(err, fmt.Sprintf("failed to detach tenant %s from collector", tenant.Name)) - } - logger.Info("Disowning tenant", "tenant", tenant.Name) - } -} - -func (r *CollectorReconciler) getSubscriptionsReferencingTenantButNotSelected(ctx context.Context, tenant *v1alpha1.Tenant, selectedSubscriptions []v1alpha1.Subscription) []v1alpha1.Subscription { - logger := log.FromContext(ctx) - var subscriptionsReferencing v1alpha1.SubscriptionList - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(tenantReferenceField, tenant.Name), - } - - if err := r.Client.List(ctx, &subscriptionsReferencing, listOpts); client.IgnoreNotFound(err) != nil { - logger.Error(err, "failed to list subscriptions that need to be detached from tenant") - return nil - } - - var subscriptionsToDisown []v1alpha1.Subscription - - for _, subscriptionReferencing := range subscriptionsReferencing.Items { - selected := false - - for _, selectedSubscription := range selectedSubscriptions { - if subscriptionReferencing.Name == selectedSubscription.Name { - selected = true - break - } - } - - if !selected { - subscriptionsToDisown = append(subscriptionsToDisown, subscriptionReferencing) - } - - } - - return subscriptionsToDisown - -} - -// disownSubscriptions fails internally by logging errors individually -// this is by design so that we don't fail the whole reconciliation when a single subscription update fails -func (r *CollectorReconciler) disownSubscriptions(ctx context.Context, subscriptionsToDisown []v1alpha1.Subscription) { - logger := log.FromContext(ctx) - for _, subscription := range subscriptionsToDisown { - subscription.Status.Tenant = "" - err := r.Client.Status().Update(ctx, &subscription) - if err != nil { - logger.Error(err, fmt.Sprintf("failed to detach subscription %s/%s from collector", subscription.Namespace, subscription.Name)) - } else { - logger.Info("disowning subscription", "subscription", fmt.Sprintf("%s/%s", subscription.Namespace, subscription.Name)) - } - } -} - -func (r *CollectorReconciler) getAllOutputs(ctx context.Context) ([]v1alpha1.OtelOutput, error) { - - var outputList v1alpha1.OtelOutputList - - if err := r.List(ctx, &outputList); client.IgnoreNotFound(err) != nil { - return nil, err - } - - return outputList.Items, nil -} - -// updateSubscriptionsForTenant fails internally and logs failures individually -// this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single subscription -func (r *CollectorReconciler) updateSubscriptionsForTenant(ctx context.Context, tenantName string, subscriptions []v1alpha1.Subscription) (updatedSubscriptions []v1alpha1.Subscription) { - logger := log.FromContext(ctx, "tenant", tenantName) - for _, subscription := range subscriptions { - subscription.Status.Tenant = tenantName - - logger.Info("updating subscription status for tenant ownership") - err := r.Status().Update(ctx, &subscription) - if err != nil { - logger.Error(err, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", subscription.Namespace, subscription.Name, tenantName)) - } else { - updatedSubscriptions = append(updatedSubscriptions, subscription) - } - } - return -} - -func (r *CollectorReconciler) getSubscriptionsForTenant(ctx context.Context, tenant *v1alpha1.Tenant) (ownedList []v1alpha1.Subscription, updateList []v1alpha1.Subscription, err error) { - logger := log.FromContext(ctx) - - namespaces, err := r.getNamespacesForSelectorSlice(ctx, tenant.Spec.SubscriptionNamespaceSelectors) - - if err != nil { - return nil, nil, err - } - - var selectedSubscriptions []v1alpha1.Subscription - - for _, ns := range namespaces { - var subscriptionsForNS v1alpha1.SubscriptionList - listOpts := &client.ListOptions{ - Namespace: ns.Name, - } - - if err := r.List(ctx, &subscriptionsForNS, listOpts); client.IgnoreNotFound(err) != nil { - return nil, nil, err - } - - selectedSubscriptions = append(selectedSubscriptions, subscriptionsForNS.Items...) - } - - for _, subscription := range selectedSubscriptions { - if subscription.Status.Tenant != "" && subscription.Status.Tenant != tenant.Name { - logger.Error(errors.Errorf("subscription (%s) is owned by another tenant (%s), skipping reconciliation for this tenant (%s)", subscription.Name, subscription.Status.Tenant, tenant.Name), - "make sure to remove subscription from the previous tenant before adopting to new tenant") - continue - } - - if subscription.Status.Tenant == "" { - updateList = append(updateList, subscription) - } else { - ownedList = append(ownedList, subscription) - } - } - - return -} - -func (r *CollectorReconciler) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { - - var namespaces []apiv1.Namespace - - for _, ls := range labelSelectors { - var namespacesForSelector apiv1.NamespaceList - - selector, err := metav1.LabelSelectorAsSelector(&ls) - - if err != nil { - return nil, err - } - - listOpts := &client.ListOptions{ - LabelSelector: selector, - } - - if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - namespaces = append(namespaces, namespacesForSelector.Items...) - } - - namespaces = normalizeNamespaceSlice(namespaces) - - return namespaces, nil -} - -func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { +func normalizeStringSlice(inputList []string) []string { allKeys := make(map[string]bool) - uniqueList := []apiv1.Namespace{} + uniqueList := []string{} for _, item := range inputList { - if _, value := allKeys[item.Name]; !value { - allKeys[item.Name] = true + if _, value := allKeys[item]; !value { + allKeys[item] = true uniqueList = append(uniqueList, item) } } + slices.Sort(uniqueList) - cmp := func(a, b apiv1.Namespace) int { - return strings.Compare(a.Name, b.Name) - } - - slices.SortFunc(uniqueList, cmp) return uniqueList } - -func (r *CollectorReconciler) getLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { - namespaces, err := r.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) - if err != nil { - return nil, err - } - - namespaceNames := make([]string, len(namespaces)) - - for i, namespace := range namespaces { - namespaceNames[i] = namespace.Name - } - - return namespaceNames, nil - -} diff --git a/internal/controller/telemetry/controller_integration_test.go b/internal/controller/telemetry/controller_integration_test.go index d8b2ab2e..d98b891c 100644 --- a/internal/controller/telemetry/controller_integration_test.go +++ b/internal/controller/telemetry/controller_integration_test.go @@ -133,9 +133,8 @@ var _ = Describe("Telemetry controller integration test", func() { }, }, Status: v1alpha1.TenantStatus{ - Subscriptions: []string{"asd", "bsd"}, + Subscriptions: []v1alpha1.NamespacedName{{Name: "asd", Namespace: "bsd"}}, LogSourceNamespaces: []string{}, - Collector: "asd", }, }, { @@ -159,9 +158,8 @@ var _ = Describe("Telemetry controller integration test", func() { }, }, Status: v1alpha1.TenantStatus{ - Subscriptions: []string{"asd", "bsd"}, + Subscriptions: []v1alpha1.NamespacedName{{Name: "asd", Namespace: "bsd"}}, LogSourceNamespaces: []string{}, - Collector: "asd", }, }, } diff --git a/internal/controller/telemetry/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen.go index db6bac69..a2133bf3 100644 --- a/internal/controller/telemetry/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen.go @@ -16,21 +16,20 @@ package telemetry import ( "fmt" - "slices" "strings" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" "gopkg.in/yaml.v3" ) -// TODO move this to its appropriate place type OtelColConfigInput struct { + // These must only include resources that are selected by the collector, tenant labelselectors, and listed outputs in the subscriptions Tenants []v1alpha1.Tenant - Subscriptions []v1alpha1.Subscription + Subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription Outputs []v1alpha1.OtelOutput - // Subscriptions map, where the key is the Tenants' namespaced name, value is a slice of subscriptions' namespaced name - TenantSubscriptionMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName + // Subscriptions map, where the key is the Tenants' name, value is a slice of subscriptions' namespaced name + TenantSubscriptionMap map[string][]v1alpha1.NamespacedName SubscriptionOutputMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName } @@ -182,15 +181,7 @@ func (cfgInput *OtelColConfigInput) generateRoutingConnectorForTenantsSubscripti for index, subscriptionRef := range subscriptionNames { - subscriptionIdx := slices.IndexFunc(cfgInput.Subscriptions, func(output v1alpha1.Subscription) bool { - return output.Name == subscriptionRef.Name && output.Namespace == subscriptionRef.Namespace - }) - - if subscriptionIdx == -1 { - continue - } - - subscription := cfgInput.Subscriptions[subscriptionIdx] + subscription := cfgInput.Subscriptions[subscriptionRef] tableItem := buildRoutingTableItemForSubscription(tenantName, subscription, index) rc.AddRoutingConnectorTableElem(tableItem) @@ -206,7 +197,7 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { connectors[rootRoutingConnector.Name] = rootRoutingConnector for _, tenant := range cfgInput.Tenants { - rc := cfgInput.generateRoutingConnectorForTenantsSubscription(tenant.Name, cfgInput.TenantSubscriptionMap[tenant.NamespacedName()]) + rc := cfgInput.generateRoutingConnectorForTenantsSubscription(tenant.Name, cfgInput.TenantSubscriptionMap[tenant.Name]) connectors[rc.Name] = rc } @@ -271,38 +262,27 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline namedPipelines["logs/all"] = generateRootPipeline() - tenants := []v1alpha1.NamespacedName{} + tenants := []string{} for tenant := range cfgInput.TenantSubscriptionMap { tenants = append(tenants, tenant) } for _, tenant := range tenants { // Generate a pipeline for the tenant - tenantPipelineName := fmt.Sprintf("logs/tenant_%s", tenant.Name) - tenantRoutingName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenant.Name) - namedPipelines[tenantPipelineName] = generatePipeline([]string{"routing/tenants"}, []string{fmt.Sprintf("attributes/tenant_%s", tenant.Name)}, []string{tenantRoutingName}) + tenantPipelineName := fmt.Sprintf("logs/tenant_%s", tenant) + tenantRoutingName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenant) + namedPipelines[tenantPipelineName] = generatePipeline([]string{"routing/tenants"}, []string{fmt.Sprintf("attributes/tenant_%s", tenant)}, []string{tenantRoutingName}) // Generate pipelines for the subscriptions for the tenant for _, subscription := range cfgInput.TenantSubscriptionMap[tenant] { tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s", tenantPipelineName, subscription.Name) - targetOutputNames := []string{} + targetExporterNames := []string{} for _, outputRef := range cfgInput.SubscriptionOutputMap[subscription] { - outputIdx := slices.IndexFunc(cfgInput.Outputs, func(output v1alpha1.OtelOutput) bool { - return output.Name == outputRef.Name && output.Namespace == outputRef.Namespace - }) - - if outputIdx == -1 { - continue - } - - targetOutputName := fmt.Sprintf("otlp/%s_%s", outputRef.Namespace, outputRef.Name) - - targetOutputNames = append(targetOutputNames, targetOutputName) - + targetExporterName := fmt.Sprintf("otlp/%s_%s", outputRef.Namespace, outputRef.Name) + targetExporterNames = append(targetExporterNames, targetExporterName) } - namedPipelines[tenantSubscriptionPipelineName] = generatePipeline([]string{tenantRoutingName}, []string{fmt.Sprintf("attributes/subscription_%s", subscription.Name)}, targetOutputNames) - + namedPipelines[tenantSubscriptionPipelineName] = generatePipeline([]string{tenantRoutingName}, []string{fmt.Sprintf("attributes/subscription_%s", subscription.Name)}, targetExporterNames) } // Add default (catch all) pipelines diff --git a/internal/controller/telemetry/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen_test.go index 16beca52..9fe54652 100644 --- a/internal/controller/telemetry/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen_test.go @@ -20,10 +20,11 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" - "gopkg.in/yaml.v2" - "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/siliconbrain/go-mapseqs/mapseqs" + "github.com/siliconbrain/go-seqs/seqs" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -32,8 +33,8 @@ var otelColTargetYaml string func TestOtelColConfComplex(t *testing.T) { // Required inputs - var subscriptions = []v1alpha1.Subscription{ - { + var subscriptions = map[v1alpha1.NamespacedName]v1alpha1.Subscription{ + {Name: "subscription-example-1", Namespace: "example-tenant-ns"}: { ObjectMeta: metav1.ObjectMeta{ Name: "subscription-example-1", Namespace: "example-tenant-ns", @@ -48,7 +49,7 @@ func TestOtelColConfComplex(t *testing.T) { }, }, }, - { + {Name: "subscription-example-2", Namespace: "example-tenant-ns"}: { ObjectMeta: metav1.ObjectMeta{ Name: "subscription-example-2", Namespace: "example-tenant-ns", @@ -89,7 +90,6 @@ func TestOtelColConfComplex(t *testing.T) { }, }, }, - Outputs: []v1alpha1.OtelOutput{ { ObjectMeta: metav1.ObjectMeta{ @@ -134,9 +134,10 @@ func TestOtelColConfComplex(t *testing.T) { } inputCfg.SubscriptionOutputMap = subscriptionOutputMap - inputCfg.TenantSubscriptionMap = map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{} + inputCfg.TenantSubscriptionMap = map[string][]v1alpha1.NamespacedName{} tenant := inputCfg.Tenants[0] - inputCfg.TenantSubscriptionMap[tenant.NamespacedName()] = getSubscriptionNamesFromSubscription(inputCfg.Subscriptions) + + inputCfg.TenantSubscriptionMap[tenant.Name] = seqs.ToSlice(mapseqs.KeysOf(inputCfg.Subscriptions)) // IR generatedIR := inputCfg.ToIntermediateRepresentation() @@ -209,8 +210,7 @@ func Test_generateRootRoutingConnector(t *testing.T) { tenants: []v1alpha1.Tenant{ { ObjectMeta: metav1.ObjectMeta{ - Name: "tenantA", - Namespace: "nsA", + Name: "tenantA", }, Status: v1alpha1.TenantStatus{ LogSourceNamespaces: []string{"a", "b"}, @@ -218,8 +218,7 @@ func Test_generateRootRoutingConnector(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "tenantB", - Namespace: "nsB", + Name: "tenantB", }, Status: v1alpha1.TenantStatus{ LogSourceNamespaces: []string{"c", "d"}, @@ -254,9 +253,9 @@ func Test_generateRootRoutingConnector(t *testing.T) { func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *testing.T) { type fields struct { Tenants []v1alpha1.Tenant - Subscriptions []v1alpha1.Subscription + Subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription Outputs []v1alpha1.OtelOutput - TenantSubscriptionMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName + TenantSubscriptionMap map[string][]v1alpha1.NamespacedName SubscriptionOutputMap map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName } type args struct { @@ -272,18 +271,21 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te { name: "two_subscriptions", fields: fields{ - Tenants: []v1alpha1.Tenant{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "tenantA", - Namespace: "nsA", - }, - Spec: v1alpha1.TenantSpec{}, - Status: v1alpha1.TenantStatus{ - LogSourceNamespaces: []string{"a", "b"}, - }, - }}, - Subscriptions: []v1alpha1.Subscription{ + Tenants: []v1alpha1.Tenant{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tenantA", + }, + Spec: v1alpha1.TenantSpec{}, + Status: v1alpha1.TenantStatus{ + LogSourceNamespaces: []string{"a", "b"}, + }, + }}, + Subscriptions: map[v1alpha1.NamespacedName]v1alpha1.Subscription{ { + Name: "subsA", + Namespace: "nsA", + }: { ObjectMeta: metav1.ObjectMeta{ Name: "subsA", Namespace: "nsA", @@ -296,6 +298,9 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te }, OTTL: `set(attributes["subscription"], "subscriptionA")`}, }, { + Name: "subsB", + Namespace: "nsA", + }: { ObjectMeta: metav1.ObjectMeta{ Name: "subsB", Namespace: "nsA", @@ -309,11 +314,8 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te }, }, Outputs: []v1alpha1.OtelOutput{}, - TenantSubscriptionMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ - { - Namespace: "nsA", - Name: "tenantA", - }: { + TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ + "tenantA": { { Namespace: "nsA", Name: "subsA", diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go new file mode 100644 index 00000000..3560cdc4 --- /dev/null +++ b/internal/controller/telemetry/route_controller.go @@ -0,0 +1,395 @@ +// Copyright © 2023 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "context" + "fmt" + "reflect" + "slices" + "strings" + + "emperror.dev/errors" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" +) + +// CollectorReconciler reconciles a Collector object +type RouteReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;oteloutputs;,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;oteloutputs/status;,verbs=get;update;patch +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch +// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=services;persistentvolumeclaims;serviceaccounts;pods,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps,resources=statefulsets;daemonsets;replicasets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;create;update;patch;delete + +func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + tenant := &v1alpha1.Tenant{} + + logger.Info(fmt.Sprintf("getting tenant: %q", req.NamespacedName.Name)) + + if err := r.Get(ctx, req.NamespacedName, tenant); client.IgnoreNotFound(err) != nil { + logger.Error(errors.New("failed getting tenant, possible API server error"), "failed getting tenant, possible API server error", + "tenant", req.NamespacedName.String()) + return ctrl.Result{}, err + } + + logger.Info(fmt.Sprintf("reconciling tenant: %q", tenant.Name)) + + originalTenantStatus := tenant.Status + + subscriptionsForTenant, updateList, err := r.getSubscriptionsForTenant(ctx, tenant) + if err != nil { + + tenant.Status.State = v1alpha1.StateFailed + logger.Error(errors.WithStack(err), "failed to get subscriptions for tenant", "tenant", tenant.Name) + if updateErr := r.Status().Update(ctx, tenant); err != nil { + logger.Error(errors.WithStack(updateErr), "failed update tenant status", "tenant", tenant.Name) + return ctrl.Result{}, err + } + return ctrl.Result{}, err + } + + // add all newly updated subscriptions here + subscriptionsForTenant = append(subscriptionsForTenant, r.updateSubscriptionsForTenant(ctx, tenant.Name, updateList)...) + + subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, tenant, subscriptionsForTenant) + + r.disownSubscriptions(ctx, subscriptionsToDisown) + + subscriptionNames := getSubscriptionNamesFromSubscription(subscriptionsForTenant) + + cmp := func(a, b v1alpha1.NamespacedName) int { + return strings.Compare(a.String(), b.String()) + } + + slices.SortFunc(subscriptionNames, cmp) + tenant.Status.Subscriptions = subscriptionNames + + for _, subscription := range subscriptionsForTenant { + originalSubscriptionStatus := subscription.Status.DeepCopy() + validOutputs := []v1alpha1.NamespacedName{} + for _, outputRef := range subscription.Spec.Outputs { + checkedOutput := &v1alpha1.OtelOutput{} + if err := r.Client.Get(ctx, types.NamespacedName(outputRef), checkedOutput); err != nil { + logger.Error(err, "referred output invalid", "output", outputRef.String()) + } else { + validOutputs = append(validOutputs, outputRef) + } + + } + subscription.Status.Outputs = validOutputs + + if !reflect.DeepEqual(originalSubscriptionStatus, subscription.Status) { + if updateErr := r.Status().Update(ctx, &subscription); err != nil { + logger.Error(errors.WithStack(updateErr), "failed update subscription status", "subscription", subscription.NamespacedName().String()) + return ctrl.Result{}, err + } + } + } + + logsourceNamespacesForTenant, err := r.getLogsourceNamespaceNamesForTenant(ctx, tenant) + if err != nil { + tenant.Status.State = v1alpha1.StateFailed + logger.Error(errors.WithStack(err), "failed to get logsource namespaces for tenant", "tenant", tenant.Name) + if updateErr := r.Status().Update(ctx, tenant); err != nil { + logger.Error(errors.WithStack(updateErr), "failed update tenant status", "tenant", tenant.Name) + return ctrl.Result{}, err + } + return ctrl.Result{}, err + } + + slices.Sort(logsourceNamespacesForTenant) + tenant.Status.LogSourceNamespaces = logsourceNamespacesForTenant + + tenant.Status.State = v1alpha1.StateReady + + if !reflect.DeepEqual(originalTenantStatus, tenant.Status) { + logger.Info("tenant status changed") + if err := r.Status().Update(ctx, tenant); err != nil { + logger.Error(errors.New("failed update tenant status"), "failed update tenant status", "tenant", tenant.Name) + return ctrl.Result{}, err + } + } + + logger.Info("tenant reconciliation complete", "tenant", tenant.Name) + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, tenantReferenceField, func(rawObj client.Object) []string { + subscription := rawObj.(*v1alpha1.Subscription) + if subscription.Status.Tenant == "" { + return nil + } + return []string{subscription.Status.Tenant} + }); err != nil { + return err + } + addTenantRequest := func(requests []reconcile.Request, tenant string) []reconcile.Request { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: tenant, + }, + }) + return requests + } + + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.Tenant{}). + Watches(&v1alpha1.Subscription{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + logger := log.FromContext(ctx) + + tenants := &v1alpha1.TenantList{} + err := r.List(ctx, tenants) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + for _, tenant := range tenants.Items { + requests = addTenantRequest(requests, tenant.Name) + } + + return + })). + Watches(&v1alpha1.OtelOutput{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + logger := log.FromContext(ctx) + + tenants := &v1alpha1.TenantList{} + err := r.List(ctx, tenants) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + for _, tenant := range tenants.Items { + requests = addTenantRequest(requests, tenant.Name) + } + + return + })). + Watches(&apiv1.Namespace{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + logger := log.FromContext(ctx) + + tenants := &v1alpha1.TenantList{} + err := r.List(ctx, tenants) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + for _, tenant := range tenants.Items { + requests = addTenantRequest(requests, tenant.Name) + } + + return + })). + Complete(r) +} + +func (r *RouteReconciler) getSubscriptionsForTenant(ctx context.Context, tenant *v1alpha1.Tenant) (ownedList []v1alpha1.Subscription, updateList []v1alpha1.Subscription, err error) { + logger := log.FromContext(ctx) + + namespaces, err := r.getNamespacesForSelectorSlice(ctx, tenant.Spec.SubscriptionNamespaceSelectors) + + if err != nil { + return nil, nil, err + } + + var selectedSubscriptions []v1alpha1.Subscription + + for _, ns := range namespaces { + var subscriptionsForNS v1alpha1.SubscriptionList + listOpts := &client.ListOptions{ + Namespace: ns.Name, + } + + if err := r.List(ctx, &subscriptionsForNS, listOpts); client.IgnoreNotFound(err) != nil { + return nil, nil, err + } + + selectedSubscriptions = append(selectedSubscriptions, subscriptionsForNS.Items...) + } + + for _, subscription := range selectedSubscriptions { + if subscription.Status.Tenant != "" && subscription.Status.Tenant != tenant.Name { + logger.Error(errors.Errorf("subscription (%s) is owned by another tenant (%s), skipping reconciliation for this tenant (%s)", subscription.Name, subscription.Status.Tenant, tenant.Name), + "make sure to remove subscription from the previous tenant before adopting to new tenant") + continue + } + + if subscription.Status.Tenant == "" { + updateList = append(updateList, subscription) + } else { + ownedList = append(ownedList, subscription) + } + } + + return +} +func (r *RouteReconciler) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { + + var namespaces []apiv1.Namespace + + for _, ls := range labelSelectors { + var namespacesForSelector apiv1.NamespaceList + + selector, err := metav1.LabelSelectorAsSelector(&ls) + + if err != nil { + return nil, err + } + + listOpts := &client.ListOptions{ + LabelSelector: selector, + } + + if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + namespaces = append(namespaces, namespacesForSelector.Items...) + } + + namespaces = normalizeNamespaceSlice(namespaces) + + return namespaces, nil +} + +// disownSubscriptions fails internally by logging errors individually +// this is by design so that we don't fail the whole reconciliation when a single subscription update fails +func (r *RouteReconciler) disownSubscriptions(ctx context.Context, subscriptionsToDisown []v1alpha1.Subscription) { + logger := log.FromContext(ctx) + for _, subscription := range subscriptionsToDisown { + subscription.Status.Tenant = "" + err := r.Client.Status().Update(ctx, &subscription) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to detach subscription %s/%s from collector", subscription.Namespace, subscription.Name)) + } else { + logger.Info("disowning subscription", "subscription", fmt.Sprintf("%s/%s", subscription.Namespace, subscription.Name)) + } + } +} + +// updateSubscriptionsForTenant fails internally and logs failures individually +// this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single subscription +func (r *RouteReconciler) updateSubscriptionsForTenant(ctx context.Context, tenantName string, subscriptions []v1alpha1.Subscription) (updatedSubscriptions []v1alpha1.Subscription) { + logger := log.FromContext(ctx, "tenant", tenantName) + for _, subscription := range subscriptions { + subscription.Status.Tenant = tenantName + + logger.Info("updating subscription status for tenant ownership") + err := r.Status().Update(ctx, &subscription) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", subscription.Namespace, subscription.Name, tenantName)) + } else { + updatedSubscriptions = append(updatedSubscriptions, subscription) + } + } + return +} + +func (r *RouteReconciler) getSubscriptionsReferencingTenantButNotSelected(ctx context.Context, tenant *v1alpha1.Tenant, selectedSubscriptions []v1alpha1.Subscription) []v1alpha1.Subscription { + logger := log.FromContext(ctx) + var subscriptionsReferencing v1alpha1.SubscriptionList + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(tenantReferenceField, tenant.Name), + } + + if err := r.Client.List(ctx, &subscriptionsReferencing, listOpts); client.IgnoreNotFound(err) != nil { + logger.Error(err, "failed to list subscriptions that need to be detached from tenant") + return nil + } + + var subscriptionsToDisown []v1alpha1.Subscription + + for _, subscriptionReferencing := range subscriptionsReferencing.Items { + + idx := slices.IndexFunc(selectedSubscriptions, func(selected v1alpha1.Subscription) bool { + return reflect.DeepEqual(subscriptionReferencing.NamespacedName(), selected.NamespacedName()) + }) + + if idx == -1 { + subscriptionsToDisown = append(subscriptionsToDisown, subscriptionReferencing) + } + + } + + return subscriptionsToDisown + +} + +func (r *RouteReconciler) getLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { + namespaces, err := r.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) + if err != nil { + return nil, err + } + + namespaceNames := make([]string, len(namespaces)) + + for i, namespace := range namespaces { + namespaceNames[i] = namespace.Name + } + + return namespaceNames, nil + +} + +func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { + allKeys := make(map[string]bool) + uniqueList := []apiv1.Namespace{} + for _, item := range inputList { + if _, value := allKeys[item.Name]; !value { + allKeys[item.Name] = true + uniqueList = append(uniqueList, item) + } + } + + cmp := func(a, b apiv1.Namespace) int { + return strings.Compare(a.Name, b.Name) + } + + slices.SortFunc(uniqueList, cmp) + return uniqueList +} + +func getSubscriptionNamesFromSubscription(subscriptions []v1alpha1.Subscription) []v1alpha1.NamespacedName { + subscriptionNames := make([]v1alpha1.NamespacedName, len(subscriptions)) + for i, subscription := range subscriptions { + subscriptionNames[i] = subscription.NamespacedName() + } + + return subscriptionNames +} diff --git a/internal/controller/telemetry/suite_test.go b/internal/controller/telemetry/suite_test.go index 95e78a2f..7d138770 100644 --- a/internal/controller/telemetry/suite_test.go +++ b/internal/controller/telemetry/suite_test.go @@ -106,9 +106,17 @@ var _ = BeforeSuite(func() { Scheme: scheme.Scheme, } + routeReconciler := &RouteReconciler{ + Client: k8sClient, + Scheme: scheme.Scheme, + } + err = collectorReconciler.SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) + err = routeReconciler.SetupWithManager(k8sManager) + Expect(err).NotTo(HaveOccurred()) + go func() { defer GinkgoRecover() err = k8sManager.Start(ctx)