Skip to content

Commit

Permalink
[transporturl] watch for named rabbit cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
stuggi committed Dec 4, 2023
1 parent c677ca3 commit d112a58
Showing 1 changed file with 92 additions and 29 deletions.
121 changes: 92 additions & 29 deletions controllers/rabbitmq/transporturl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

rabbitmqv1beta1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1"
rabbitmqv1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1"
condition "github.com/openstack-k8s-operators/lib-common/modules/common/condition"
helper "github.com/openstack-k8s-operators/lib-common/modules/common/helper"
oko_secret "github.com/openstack-k8s-operators/lib-common/modules/common/secret"
rabbitmqv1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
rabbitmqclusterv1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -82,7 +88,7 @@ func (r *TransportURLReconciler) GetLogger(ctx context.Context) logr.Logger {
func (r *TransportURLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
Log := r.GetLogger(ctx)
// Fetch the TransportURL instance
instance := &rabbitmqv1beta1.TransportURL{}
instance := &rabbitmqv1.TransportURL{}
err := r.Client.Get(ctx, req.NamespacedName, instance)
if err != nil {
if k8s_errors.IsNotFound(err) {
Expand All @@ -101,7 +107,7 @@ func (r *TransportURLReconciler) Reconcile(ctx context.Context, req ctrl.Request
if instance.Status.Conditions == nil {
instance.Status.Conditions = condition.Conditions{}

cl := condition.CreateList(condition.UnknownCondition(rabbitmqv1beta1.TransportURLReadyCondition, condition.InitReason, rabbitmqv1beta1.TransportURLReadyInitMessage))
cl := condition.CreateList(condition.UnknownCondition(rabbitmqv1.TransportURLReadyCondition, condition.InitReason, rabbitmqv1.TransportURLReadyInitMessage))

instance.Status.Conditions.Init(&cl)

Expand Down Expand Up @@ -147,7 +153,7 @@ func (r *TransportURLReconciler) Reconcile(ctx context.Context, req ctrl.Request

}

func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *rabbitmqv1beta1.TransportURL, helper *helper.Helper) (ctrl.Result, error) {
func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *rabbitmqv1.TransportURL, helper *helper.Helper) (ctrl.Result, error) {
Log := r.GetLogger(ctx)
Log.Info("Reconciling Service")

Expand All @@ -167,10 +173,10 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
}
if !rabbitReady {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1beta1.TransportURLInProgressMessage))
rabbitmqv1.TransportURLInProgressMessage))
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}

Expand All @@ -180,17 +186,17 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
if err != nil {
if k8s_errors.IsNotFound(err) {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1beta1.TransportURLInProgressMessage))
rabbitmqv1.TransportURLInProgressMessage))
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, nil
}
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1beta1.TransportURLReadyErrorMessage,
rabbitmqv1.TransportURLReadyErrorMessage,
err.Error()))
return ctrl.Result{}, err
}
Expand All @@ -201,10 +207,10 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
} else {
err := fmt.Errorf("username does not exist in rabbitmq secret %s", rabbitSecret.Name)
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1beta1.TransportURLReadyErrorMessage,
rabbitmqv1.TransportURLReadyErrorMessage,
err.Error()))
return ctrl.Result{}, err
}
Expand All @@ -215,10 +221,10 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
} else {
err := fmt.Errorf("password does not exist in rabbitmq secret %s", rabbitSecret.Name)
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1beta1.TransportURLReadyErrorMessage,
rabbitmqv1.TransportURLReadyErrorMessage,
err.Error()))
return ctrl.Result{}, err
}
Expand All @@ -229,10 +235,10 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
} else {
err := fmt.Errorf("host does not exist in rabbitmq secret %s", rabbitSecret.Name)
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1beta1.TransportURLReadyErrorMessage,
rabbitmqv1.TransportURLReadyErrorMessage,
err.Error()))
return ctrl.Result{}, err
}
Expand All @@ -243,10 +249,10 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
} else {
err := fmt.Errorf("port does not exist in rabbitmq secret %s", rabbitSecret.Name)
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1beta1.TransportURLReadyErrorMessage,
rabbitmqv1.TransportURLReadyErrorMessage,
err.Error()))
return ctrl.Result{}, err
}
Expand All @@ -262,34 +268,34 @@ func (r *TransportURLReconciler) reconcileNormal(ctx context.Context, instance *
_, op, err := oko_secret.CreateOrPatchSecret(ctx, helper, instance, secret)
if err != nil {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
rabbitmqv1beta1.TransportURLReadyErrorMessage,
rabbitmqv1.TransportURLReadyErrorMessage,
err.Error()))
return ctrl.Result{}, err
}
if op != controllerutil.OperationResultNone {
instance.Status.Conditions.Set(condition.FalseCondition(
rabbitmqv1beta1.TransportURLReadyCondition,
rabbitmqv1.TransportURLReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
rabbitmqv1beta1.TransportURLReadyInitMessage))
rabbitmqv1.TransportURLReadyInitMessage))
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// Update the CR and return
instance.Status.SecretName = secret.Name

instance.Status.Conditions.MarkTrue(rabbitmqv1beta1.TransportURLReadyCondition, rabbitmqv1beta1.TransportURLReadyMessage)
instance.Status.Conditions.MarkTrue(rabbitmqv1.TransportURLReadyCondition, rabbitmqv1.TransportURLReadyMessage)

Log.Info("Reconciled Service successfully")
return ctrl.Result{}, nil
}

// Create k8s secret with transport URL
func (r *TransportURLReconciler) createTransportURLSecret(
instance *rabbitmqv1beta1.TransportURL,
instance *rabbitmqv1.TransportURL,
username string,
password string,
host string,
Expand All @@ -315,21 +321,78 @@ func (r *TransportURLReconciler) createTransportURLSecret(
}
}

// fields to index to reconcile when change
const (
rabbitmqClusterNameField = ".spec.rabbitmqClusterName"
)

var (
allWatchFields = []string{
rabbitmqClusterNameField,
}
)

// SetupWithManager sets up the controller with the Manager.
func (r *TransportURLReconciler) SetupWithManager(mgr ctrl.Manager) error {
// index caSecretName
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &rabbitmqv1.TransportURL{}, rabbitmqClusterNameField, func(rawObj client.Object) []string {
// Extract the secret name from the spec, if one is provided
cr := rawObj.(*rabbitmqv1.TransportURL)
if cr.Spec.RabbitmqClusterName == "" {
return nil
}
return []string{cr.Spec.RabbitmqClusterName}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&rabbitmqv1beta1.TransportURL{}).
For(&rabbitmqv1.TransportURL{}).
Owns(&corev1.Secret{}).
Watches(
&source.Kind{Type: &rabbitmqclusterv1.RabbitmqCluster{}},
handler.EnqueueRequestsFromMapFunc(r.findObjectsForSrc),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
Complete(r)
}

func (r *TransportURLReconciler) findObjectsForSrc(src client.Object) []reconcile.Request {
requests := []reconcile.Request{}

for _, field := range allWatchFields {
crList := &rabbitmqv1.TransportURLList{}
listOps := &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(field, src.GetName()),
Namespace: src.GetNamespace(),
}
err := r.List(context.TODO(), crList, listOps)
if err != nil {
return []reconcile.Request{}
}

for _, item := range crList.Items {
requests = append(requests,
reconcile.Request{
NamespacedName: types.NamespacedName{
Name: item.GetName(),
Namespace: item.GetNamespace(),
},
},
)
}
}

return requests
}

// GetRabbitmqCluster - get RabbitmqCluster object in namespace
func getRabbitmqCluster(
ctx context.Context,
h *helper.Helper,
instance *rabbitmqv1beta1.TransportURL,
) (*rabbitmqv1.RabbitmqCluster, error) {
rabbitMqCluster := &rabbitmqv1.RabbitmqCluster{}
instance *rabbitmqv1.TransportURL,
) (*rabbitmqclusterv1.RabbitmqCluster, error) {
rabbitMqCluster := &rabbitmqclusterv1.RabbitmqCluster{}

err := h.GetClient().Get(ctx, types.NamespacedName{Name: instance.Spec.RabbitmqClusterName, Namespace: instance.Namespace}, rabbitMqCluster)

Expand Down

0 comments on commit d112a58

Please sign in to comment.