diff --git a/.github/workflows/artifacts.yaml b/.github/workflows/artifacts.yaml index 49bdbcb8..c72cf713 100644 --- a/.github/workflows/artifacts.yaml +++ b/.github/workflows/artifacts.yaml @@ -103,12 +103,12 @@ jobs: uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825 # v4.1.1 with: context: . - platforms: linux/amd64,linux/arm64,linux/arm/v7 + platforms: linux/amd64,linux/arm64 tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha cache-to: type=gha,mode=max - outputs: ${{ steps.build-output.outputs.value }},name=target,annotation-index.org.opencontainers.image.description=${{ fromJSON(steps.meta.outputs.json).labels['org.opencontainers.image.description'] }} + outputs: ${{ steps.build-output.outputs.value }},name=target # push: ${{ inputs.publish }} - name: Set image ref diff --git a/.golangci.yml b/.golangci.yml index aed8644d..7901da5b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -31,7 +31,6 @@ linters: - ineffassign - lll - misspell - - nakedret - prealloc - staticcheck - typecheck diff --git a/api/telemetry/v1alpha1/collector_types.go b/api/telemetry/v1alpha1/collector_types.go index 933fccee..c64bce34 100644 --- a/api/telemetry/v1alpha1/collector_types.go +++ b/api/telemetry/v1alpha1/collector_types.go @@ -26,7 +26,7 @@ type CollectorSpec struct { // CollectorStatus defines the observed state of Collector type CollectorStatus struct { - Tenants []string `json:"tenants"` + Tenants []string `json:"tenants,omitempty"` } //+kubebuilder:object:root=true @@ -43,7 +43,7 @@ type Collector struct { Status CollectorStatus `json:"status,omitempty"` } -//+kubebuilder:object:root=true +// +kubebuilder:object:root=true // CollectorList contains a list of Collector type CollectorList struct { diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index d60acf5f..d7202ad0 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -18,26 +18,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! -// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. - // SubscriptionSpec defines the desired state of Subscription type SubscriptionSpec struct { - // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster - // Important: Run "make" to regenerate code after modifying this file - Outputs []NamespacedName `json:"outputs,omitempty"` OTTL string `json:"ottl,omitempty"` } // SubscriptionStatus defines the observed state of Subscription type SubscriptionStatus struct { - // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster - // Important: Run "make" to regenerate code after modifying this file + Tenant string `json:"tenant,omitempty"` } -//+kubebuilder:object:root=true -//+kubebuilder:subresource:status +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +//+kubebuilder:printcolumn:name="Tenant",type=string,JSONPath=`.status.tenant` // Subscription is the Schema for the subscriptions API type Subscription struct { @@ -48,7 +42,7 @@ type Subscription struct { Status SubscriptionStatus `json:"status,omitempty"` } -//+kubebuilder:object:root=true +// +kubebuilder:object:root=true // SubscriptionList contains a list of Subscription type SubscriptionList struct { diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index 51f339fb..b7015405 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -26,8 +26,9 @@ type TenantSpec struct { // TenantStatus defines the observed state of Tenant type TenantStatus struct { - Subscriptions []string `json:"subscriptions"` - LogSourceNamespaces []string `json:"logSourceNamespaces"` + Subscriptions []string `json:"subscriptions,omitempty"` + LogSourceNamespaces []string `json:"logSourceNamespaces,omitempty"` + Collector string `json:"collector"` } //+kubebuilder:object:root=true @@ -35,6 +36,7 @@ type TenantStatus struct { //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Subscriptions",type=string,JSONPath=`.status.subscriptions` //+kubebuilder:printcolumn:name="Logsource namespaces",type=string,JSONPath=`.status.logSourceNamespaces` +//+kubebuilder:printcolumn:name="Collector",type=string,JSONPath=`.status.collector` // Tenant is the Schema for the tenants API type Tenant struct { @@ -45,7 +47,7 @@ type Tenant struct { Status TenantStatus `json:"status,omitempty"` } -//+kubebuilder:object:root=true +// +kubebuilder:object:root=true // TenantList contains a list of Tenant type TenantList struct { diff --git a/cmd/main.go b/cmd/main.go index 3294704b..c280d560 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -30,11 +30,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + rbacv1 "k8s.io/api/rbac/v1" + telemetryv1alpha1 "github.com/kube-logging/subscription-operator/api/telemetry/v1alpha1" controller "github.com/kube-logging/subscription-operator/internal/controller/telemetry" - rbacv1 "k8s.io/api/rbac/v1" - //+kubebuilder:scaffold:imports + // +kubebuilder:scaffold:imports otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" ) @@ -51,7 +52,7 @@ func init() { utilruntime.Must(rbacv1.AddToScheme(scheme)) utilruntime.Must(telemetryv1alpha1.AddToScheme(scheme)) - //+kubebuilder:scaffold:scheme + // +kubebuilder:scaffold:scheme } func main() { @@ -77,6 +78,7 @@ func main() { HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "87a80094.kube-logging.dev", + // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily // when the Manager ends. This requires the binary to immediately end when the // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly @@ -101,7 +103,7 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Collector") os.Exit(1) } - //+kubebuilder:scaffold:builder + // +kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") diff --git a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml index 8c6ab426..c537a40a 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_collectors.yaml @@ -102,8 +102,6 @@ spec: items: type: string type: array - required: - - tenants type: object type: object served: true diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index efcbee7c..671fd156 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -14,7 +14,11 @@ spec: singular: subscription scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - jsonPath: .status.tenant + name: Tenant + type: string + name: v1alpha1 schema: openAPIV3Schema: description: Subscription is the Schema for the subscriptions API @@ -56,6 +60,9 @@ spec: type: object status: description: SubscriptionStatus defines the observed state of Subscription + properties: + tenant: + type: string type: object type: object served: true diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index bb4482db..84999a79 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -21,6 +21,9 @@ spec: - jsonPath: .status.logSourceNamespaces name: Logsource namespaces type: string + - jsonPath: .status.collector + name: Collector + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -148,6 +151,8 @@ spec: status: description: TenantStatus defines the observed state of Tenant properties: + collector: + type: string logSourceNamespaces: items: type: string @@ -157,8 +162,7 @@ spec: type: string type: array required: - - logSourceNamespaces - - subscriptions + - collector type: object type: object served: true diff --git a/docs/examples/two_tenants_one_subscription_each.yaml b/docs/examples/two_tenants_one_subscription_each.yaml new file mode 100644 index 00000000..e7c9bf47 --- /dev/null +++ b/docs/examples/two_tenants_one_subscription_each.yaml @@ -0,0 +1,110 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: collector +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + nsSelector: example-tenant-1 + name: example-tenant-ns-1 +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + nsSelector: example-tenant-2 + name: example-tenant-ns-2 +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Collector +metadata: + name: example-collector-1 +spec: + controlNamespace: collector + tenantSelector: + matchLabels: + collectorLabel: example-collector-1 +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Collector +metadata: + name: example-collector-2 +spec: + controlNamespace: collector + tenantSelector: + matchLabels: + collectorLabel: example-collector-2 +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collectorLabel: example-collector-1 + name: example-tenant-1 +spec: + subscriptionNamespaceSelectors: + - matchLabels: + nsSelector: example-tenant-1 + logSourceNamespaceSelectors: + - matchLabels: + nsSelector: example-tenant-1 +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collectorLabel: example-collector-2 + name: example-tenant-2 +spec: + subscriptionNamespaceSelectors: + - matchLabels: + nsSelector: example-tenant-2 + logSourceNamespaceSelectors: + - matchLabels: + nsSelector: example-tenant-2 +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: subscription-sample-1 + namespace: example-tenant-ns-1 +spec: + ottl: 'route()' + outputs: + - name: otlp-test-output-1 + namespace: collector +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: subscription-sample-2 + namespace: example-tenant-ns-2 +spec: + ottl: 'route()' + outputs: + - name: otlp-test-output-2 + namespace: collector +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: OtelOutput +metadata: + name: otlp-test-output-1 + namespace: collector +spec: + otlp: + endpoint: receiver-collector.example-tenant-ns.svc.cluster.local:4317 + tls: + insecure: true +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: OtelOutput +metadata: + name: otlp-test-output-2 + namespace: collector +spec: + otlp: + endpoint: receiver-collector.example-tenant-ns.svc.cluster.local:4317 + tls: + insecure: true diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 4173e594..cf4900dd 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -17,6 +17,7 @@ package telemetry import ( "context" "fmt" + "reflect" "strings" "emperror.dev/errors" @@ -24,10 +25,14 @@ import ( apiv1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kube-logging/subscription-operator/api/telemetry/v1alpha1" @@ -41,14 +46,17 @@ type CollectorReconciler struct { Scheme *runtime.Scheme } -//+kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;oteloutputs;,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;oteloutputs/status;,verbs=get;update;patch -//+kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update -//+kubebuilder:rbac:groups="",resources=nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch -//+kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups="",resources=services;persistentvolumeclaims;serviceaccounts;pods,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=apps,resources=statefulsets;daemonsets;replicasets,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;create;update;patch;delete +const collectorReferenceField = ".status.collector" +const tenantReferenceField = ".status.tenant" + +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;oteloutputs;,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;oteloutputs/status;,verbs=get;update;patch +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch +// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=services;persistentvolumeclaims;serviceaccounts;pods,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps,resources=statefulsets;daemonsets;replicasets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -70,16 +78,22 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + originalCollectorStatus := collector.Status.DeepCopy() + tenantSubscriptionMap := make(map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName) tenants, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector) if err != nil { return ctrl.Result{}, err } - tenantNames := getTenantNamesFromTenants(tenants) - slices.Sort(tenantNames) + tenantsToDisown, err := r.getTenantsReferencingCollectorButNotSelected(ctx, collector, tenants) + if err != nil { + return ctrl.Result{}, err + } - collector.Status.Tenants = tenantNames + r.disownTenants(ctx, tenantsToDisown) + + tenantNames := []string{} if err := r.Status().Update(ctx, collector); err != nil { return ctrl.Result{}, err @@ -87,17 +101,34 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( logger.Info("Setting collector status") - subscriptions := []v1alpha1.Subscription{} + allSubscriptions := []v1alpha1.Subscription{} for _, tenant := range tenants { + originalTenantStatus := tenant.Status.DeepCopy() + + // check if tenant is owned by us, or make it ours only if orphan + // this update will connect the tenant and collector exclusively + if tenant.Status.Collector != "" && tenant.Status.Collector != collector.Name { + logger.Error(errors.Errorf("tenant (%s) is owned by another collector (%s), skipping reconciliation for this collector (%s)", tenant.Name, tenant.Status.Collector, collector.Name), + "make sure to remove tenant from the previous collector before adopting to new collector") + continue + } - subscriptionsForTenant, err := r.getSubscriptionsForTenant(ctx, &tenant) + tenantNames = append(tenantNames, tenant.Name) + subscriptionsForTenant, updateList, err := r.getSubscriptionsForTenant(ctx, &tenant) if err != nil { return ctrl.Result{}, err } - subscriptions = append(subscriptions, subscriptionsForTenant...) + // add all newly updated subscriptions here + subscriptionsForTenant = append(subscriptionsForTenant, r.updateSubscriptionsForTenant(ctx, tenant.Name, updateList)...) + + subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, &tenant, subscriptionsForTenant) + + r.disownSubscriptions(ctx, subscriptionsToDisown) + + allSubscriptions = append(allSubscriptions, subscriptionsForTenant...) subscriptionNames := getSubscriptionNamesFromSubscription(subscriptionsForTenant) @@ -117,15 +148,29 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + tenant.Status.Collector = collector.Name + slices.Sort(logsourceNamespacesForTenant) tenant.Status.LogSourceNamespaces = logsourceNamespacesForTenant - if err := r.Status().Update(ctx, &tenant); err != nil { - return ctrl.Result{}, err + if !reflect.DeepEqual(*originalTenantStatus, tenant.Status) { + logger.Info("updating tenant tenant status") + if err := r.Status().Update(ctx, &tenant); err != nil { + return ctrl.Result{}, err + } } + } - logger.Info("Setting tenant status") + slices.Sort(tenantNames) + collector.Status.Tenants = tenantNames + + if reflect.DeepEqual(*originalCollectorStatus, collector.Status) { + logger.Info("updating collector status") + err = r.Status().Update(ctx, collector) + if err != nil { + return ctrl.Result{}, err + } } outputs, err := r.getAllOutputs(ctx) @@ -135,13 +180,13 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( subscriptionOutputMap := map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{} - for _, subscription := range subscriptions { + for _, subscription := range allSubscriptions { subscriptionOutputMap[subscription.NamespacedName()] = subscription.Spec.Outputs } otelConfigInput := OtelColConfigInput{ Tenants: tenants, - Subscriptions: subscriptions, + Subscriptions: allSubscriptions, Outputs: outputs, TenantSubscriptionMap: tenantSubscriptionMap, SubscriptionOutputMap: subscriptionOutputMap, @@ -217,8 +262,126 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // SetupWithManager sets up the controller with the Manager. func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { + + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Tenant{}, collectorReferenceField, func(rawObj client.Object) []string { + tenant := rawObj.(*v1alpha1.Tenant) + if tenant.Status.Collector == "" { + return nil + } + return []string{tenant.Status.Collector} + }); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, tenantReferenceField, func(rawObj client.Object) []string { + subscription := rawObj.(*v1alpha1.Subscription) + if subscription.Status.Tenant == "" { + return nil + } + return []string{subscription.Status.Tenant} + }); err != nil { + return err + } + addCollectorRequest := func(requests []reconcile.Request, collector string) []reconcile.Request { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: collector, + }, + }) + return requests + } + return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Collector{}). + Watches(&v1alpha1.Tenant{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + + logger := log.FromContext(ctx) + + tenant, _ := object.(*v1alpha1.Tenant) + + collectors := v1alpha1.CollectorList{} + err := r.List(ctx, &collectors) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing collectors for mapping requests, unable to send requests") + return nil + } + + CollectorLoop: + for _, collector := range collectors.Items { + tenantsForCollector, err := r.getTenantsMatchingSelectors(ctx, collector.Spec.TenantSelector) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for collector, notifying collector anyways") + requests = addCollectorRequest(requests, collector.Name) + continue CollectorLoop + } + + for _, t := range tenantsForCollector { + if t.Name == tenant.Name { + requests = addCollectorRequest(requests, collector.Name) + continue CollectorLoop + } + } + + tenantsToDisown, err := r.getTenantsReferencingCollectorButNotSelected(ctx, &collector, tenantsForCollector) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants disowned, notifying collector anyways") + requests = addCollectorRequest(requests, collector.Name) + continue CollectorLoop + } + + for _, t := range tenantsToDisown { + if t.Name == tenant.Name { + requests = addCollectorRequest(requests, collector.Name) + continue CollectorLoop + } + } + } + + return + })). + Watches(&v1alpha1.Subscription{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + subscription, _ := object.(*v1alpha1.Subscription) + + logger := log.FromContext(ctx) + + tenants := v1alpha1.TenantList{} + err := r.List(ctx, &tenants) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + TenantLoop: + for _, tenant := range tenants.Items { + if tenant.Status.Collector == "" { + logger.Error(errors.WithStack(err), fmt.Sprintf("tenant %s is orphan, skipping it, and its subscriptions when looking for changes", tenant.Name)) + } + subscriptionsForTenant, subscriptionsToUpdate, err := r.getSubscriptionsForTenant(ctx, &tenant) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing subscriptions for collector, notifying collector anyways") + requests = addCollectorRequest(requests, tenant.Status.Collector) + continue TenantLoop + } + + for _, s := range append(subscriptionsForTenant, subscriptionsToUpdate...) { + if s.Name == subscription.Name { + requests = addCollectorRequest(requests, tenant.Status.Collector) + continue TenantLoop + } + } + + subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, &tenant, subscriptionsForTenant) + + for _, s := range subscriptionsToDisown { + if s.Name == subscription.Name { + requests = addCollectorRequest(requests, tenant.Status.Collector) + continue TenantLoop + } + } + } + + return + })). Complete(r) } @@ -261,6 +424,7 @@ func (r *CollectorReconciler) reconcileServiceAccount(ctx context.Context, colle return v1alpha1.NamespacedName{Namespace: serviceAccount.Namespace, Name: serviceAccount.Name}, nil } + func (r *CollectorReconciler) reconcileClusterRoleBinding(ctx context.Context, collector *v1alpha1.Collector) error { logger := log.FromContext(ctx) @@ -323,15 +487,6 @@ func (r *CollectorReconciler) reconcileClusterRole(ctx context.Context, collecto return err } -func getTenantNamesFromTenants(tenants []v1alpha1.Tenant) []string { - tenantNames := make([]string, len(tenants)) - for i, tenant := range tenants { - tenantNames[i] = tenant.Name - } - - return tenantNames -} - func getSubscriptionNamesFromSubscription(subscriptions []v1alpha1.Subscription) []v1alpha1.NamespacedName { subscriptionNames := make([]v1alpha1.NamespacedName, len(subscriptions)) for i, subscription := range subscriptions { @@ -360,6 +515,100 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l return tenantsForSelector.Items, nil } +func (r *CollectorReconciler) getTenantsReferencingCollectorButNotSelected(ctx context.Context, collector *v1alpha1.Collector, selectedTenants []v1alpha1.Tenant) ([]v1alpha1.Tenant, error) { + var tenantsReferencing v1alpha1.TenantList + + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(collectorReferenceField, collector.Name), + } + + if err := r.Client.List(ctx, &tenantsReferencing, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + tenantsToDisown := []v1alpha1.Tenant{} + + for _, tenantReferencing := range tenantsReferencing.Items { + selected := false + + for _, selectedTenant := range selectedTenants { + if tenantReferencing.Name == selectedTenant.Name { + selected = true + break + } + } + + if !selected { + tenantsToDisown = append(tenantsToDisown, tenantReferencing) + } + + } + + return tenantsToDisown, nil + +} + +func (r *CollectorReconciler) disownTenants(ctx context.Context, tenantsToDisown []v1alpha1.Tenant) { + logger := log.FromContext(ctx) + for _, tenant := range tenantsToDisown { + tenant.Status.Collector = "" + err := r.Client.Status().Update(ctx, &tenant) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to detach tenant %s from collector", tenant.Name)) + } + logger.Info("Disowning tenant", "tenant", tenant.Name) + } +} + +func (r *CollectorReconciler) getSubscriptionsReferencingTenantButNotSelected(ctx context.Context, tenant *v1alpha1.Tenant, selectedSubscriptions []v1alpha1.Subscription) []v1alpha1.Subscription { + logger := log.FromContext(ctx) + var subscriptionsReferencing v1alpha1.SubscriptionList + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(tenantReferenceField, tenant.Name), + } + + if err := r.Client.List(ctx, &subscriptionsReferencing, listOpts); client.IgnoreNotFound(err) != nil { + logger.Error(err, "failed to list subscriptions that need to be detached from tenant") + return nil + } + + var subscriptionsToDisown []v1alpha1.Subscription + + for _, subscriptionReferencing := range subscriptionsReferencing.Items { + selected := false + + for _, selectedSubscription := range selectedSubscriptions { + if subscriptionReferencing.Name == selectedSubscription.Name { + selected = true + break + } + } + + if !selected { + subscriptionsToDisown = append(subscriptionsToDisown, subscriptionReferencing) + } + + } + + return subscriptionsToDisown + +} + +// disownSubscriptions fails internally by logging errors individually +// this is by design so that we don't fail the whole reconciliation when a single subscription update fails +func (r *CollectorReconciler) disownSubscriptions(ctx context.Context, subscriptionsToDisown []v1alpha1.Subscription) { + logger := log.FromContext(ctx) + for _, subscription := range subscriptionsToDisown { + subscription.Status.Tenant = "" + err := r.Client.Status().Update(ctx, &subscription) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to detach subscription %s/%s from collector", subscription.Namespace, subscription.Name)) + } else { + logger.Info("disowning subscription", "subscription", fmt.Sprintf("%s/%s", subscription.Namespace, subscription.Name)) + } + } +} + func (r *CollectorReconciler) getAllOutputs(ctx context.Context) ([]v1alpha1.OtelOutput, error) { var outputList v1alpha1.OtelOutputList @@ -371,32 +620,63 @@ func (r *CollectorReconciler) getAllOutputs(ctx context.Context) ([]v1alpha1.Ote return outputList.Items, nil } -func (r *CollectorReconciler) getSubscriptionsForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]v1alpha1.Subscription, error) { +// updateSubscriptionsForTenant fails internally and logs failures individually +// this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single subscription +func (r *CollectorReconciler) updateSubscriptionsForTenant(ctx context.Context, tenantName string, subscriptions []v1alpha1.Subscription) (updatedSubscriptions []v1alpha1.Subscription) { + logger := log.FromContext(ctx, "tenant", tenantName) + for _, subscription := range subscriptions { + subscription.Status.Tenant = tenantName - namespaces, err := r.getNamespacesForSelectorSlice(ctx, tentant.Spec.SubscriptionNamespaceSelectors) + logger.Info("updating subscription status for tenant ownership") + err := r.Status().Update(ctx, &subscription) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", subscription.Namespace, subscription.Name, tenantName)) + } else { + updatedSubscriptions = append(updatedSubscriptions, subscription) + } + } + return +} + +func (r *CollectorReconciler) getSubscriptionsForTenant(ctx context.Context, tenant *v1alpha1.Tenant) (ownedList []v1alpha1.Subscription, updateList []v1alpha1.Subscription, err error) { + logger := log.FromContext(ctx) + + namespaces, err := r.getNamespacesForSelectorSlice(ctx, tenant.Spec.SubscriptionNamespaceSelectors) if err != nil { - return nil, err + return nil, nil, err } - var subscriptions []v1alpha1.Subscription + var selectedSubscriptions []v1alpha1.Subscription for _, ns := range namespaces { - var subscriptionsForNS v1alpha1.SubscriptionList listOpts := &client.ListOptions{ Namespace: ns.Name, } if err := r.List(ctx, &subscriptionsForNS, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err + return nil, nil, err } - subscriptions = append(subscriptions, subscriptionsForNS.Items...) + selectedSubscriptions = append(selectedSubscriptions, subscriptionsForNS.Items...) + } + + for _, subscription := range selectedSubscriptions { + if subscription.Status.Tenant != "" && subscription.Status.Tenant != tenant.Name { + logger.Error(errors.Errorf("subscription (%s) is owned by another tenant (%s), skipping reconciliation for this tenant (%s)", subscription.Name, subscription.Status.Tenant, tenant.Name), + "make sure to remove subscription from the previous tenant before adopting to new tenant") + continue + } + if subscription.Status.Tenant == "" { + updateList = append(updateList, subscription) + } else { + ownedList = append(ownedList, subscription) + } } - return subscriptions, nil + return } func (r *CollectorReconciler) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { @@ -423,7 +703,7 @@ func (r *CollectorReconciler) getNamespacesForSelectorSlice(ctx context.Context, namespaces = append(namespaces, namespacesForSelector.Items...) } - normalizeNamespaceSlice(namespaces) + namespaces = normalizeNamespaceSlice(namespaces) return namespaces, nil } @@ -432,7 +712,7 @@ func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { allKeys := make(map[string]bool) uniqueList := []apiv1.Namespace{} for _, item := range inputList { - if allKeys[item.Name] { + if _, value := allKeys[item.Name]; !value { allKeys[item.Name] = true uniqueList = append(uniqueList, item) } @@ -447,7 +727,6 @@ func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { } func (r *CollectorReconciler) getLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { - namespaces, err := r.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) if err != nil { return nil, err