Skip to content

Commit

Permalink
fix(status): fetch volume status using controller podIP (#112) (#113)
Browse files Browse the repository at this point in the history
Signed-off-by: shubham <shubham.bajpai@mayadata.io>
  • Loading branch information
shubham14bajpai authored Jul 13, 2021
1 parent 3f7d491 commit 43c61c0
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 35 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ jobs:

- name: Running tests
run: |
kubectl apply -f https://raw.githubusercontent.com/openebs/openebs/master/k8s/openebs-operator.yaml
kubectl apply -f https://openebs.github.io/charts/hostpath-operator.yaml
kubectl apply -f deploy/hostpath-sc.yaml
kubectl apply -f deploy/operator.yaml
kubectl apply -f deploy/jiva-csi.yaml
./ci/ci.sh
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ jobs:

- name: Running tests
run: |
kubectl apply -f https://raw.githubusercontent.com/openebs/openebs/master/k8s/openebs-operator.yaml
kubectl apply -f https://openebs.github.io/charts/hostpath-operator.yaml
kubectl apply -f deploy/hostpath-sc.yaml
kubectl apply -f deploy/operator.yaml
kubectl apply -f deploy/jiva-csi.yaml
./ci/ci.sh
Expand Down
2 changes: 1 addition & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
os.Exit(1)
}

duration := 5 * time.Second
duration := 30 * time.Second

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Expand Down
24 changes: 24 additions & 0 deletions deploy/hostpath-sc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#Sample storage classes for OpenEBS Local PV
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: openebs-hostpath
annotations:
openebs.io/cas-type: local
cas.openebs.io/config: |
# hostpath type will create a PV by
# creating a sub-directory under the
# BASEPATH provided below.
- name: StorageType
value: "hostpath"
# Specify the location (directory) where
# where PV(volume) data will be saved.
# A sub-directory with pv-name will be
# created. When the volume is deleted,
# the PV sub-directory will be deleted.
#Default value is /var/openebs/local
- name: BasePath
value: "/var/openebs/local/"
provisioner: openebs.io/local
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Delete
150 changes: 118 additions & 32 deletions pkg/controllers/jivavolume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -72,6 +73,7 @@ type upgradeFunc func(u *upgradeParams) (*openebsiov1alpha1.JivaVolume, error)

var (
upgradeMap = map[string]upgradeFunc{}
podIPMap = map[string]string{}
)

const (
Expand All @@ -91,7 +93,7 @@ var (
createReplicaPodDisruptionBudget,
}

updateErrMsg = "Failed to update JivaVolume with service info"
updateErrMsg = "failed to update JivaVolume with service info"

defaultServiceAccountName = os.Getenv("OPENEBS_SERVICEACCOUNT_NAME")
)
Expand Down Expand Up @@ -135,6 +137,16 @@ func (r *JivaVolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, err
}

if _, ok := podIPMap[instance.Name]; !ok {
err = r.updatePodIPMap(instance)
if err != nil {
// log err only, as controller must be in container creating state
// don't return err as it will dump stack trace unneccesary
logrus.Infof("not able to get controller pod ip for volume %s: %s", instance.Name, err.Error())
time.Sleep(1 * time.Second)
}
}

// initially Phase will be "", so it will skip switch case
// Once it has started boostrapping it will set the Phase to Pending/Failed
// depends upon the error. If bootstrap is successful it will set the Phase
Expand All @@ -156,14 +168,15 @@ func (r *JivaVolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, fmt.Errorf("failed to scaleup volume %s: %s",
instance.Name, err.Error())
}
return reconcile.Result{}, r.getAndUpdateVolumeStatus(instance)
}
if err := r.moveReplicasForMissingNodes(instance); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning,
"ReplicaMovement", "failed to move replica, due to error: %v", err)
return reconcile.Result{}, fmt.Errorf("failed to move replica %s: %s",
instance.Name, err.Error())
}
return reconcile.Result{}, r.getAndUpdateVolumeStatus(instance)
return reconcile.Result{}, nil
case openebsiov1alpha1.JivaVolumePhaseSyncing, openebsiov1alpha1.JivaVolumePhaseUnkown:
return reconcile.Result{}, r.getAndUpdateVolumeStatus(instance)
case openebsiov1alpha1.JivaVolumePhaseDeleting:
Expand All @@ -179,6 +192,55 @@ func (r *JivaVolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, nil
}

func (r *JivaVolumeReconciler) updatePodIPMap(cr *openebsiov1alpha1.JivaVolume) error {
var (
controllerLabel = "openebs.io/component=jiva-controller,openebs.io/persistent-volume="
)

labelSelector, _ := labels.Parse(
controllerLabel + cr.Name)

pods := corev1.PodList{}
err := r.List(context.TODO(), &pods, &client.ListOptions{
Namespace: cr.Namespace,
LabelSelector: labelSelector,
FieldSelector: fields.SelectorFromSet(fields.Set{"status.phase": "Running"}),
})
if err != nil {
return err
}

runningPodIPs := []string{}

for _, pod := range pods.Items {
node := &corev1.Node{}
err := r.Get(context.TODO(), types.NamespacedName{
Name: pod.Spec.NodeName,
}, node)
if err == nil && isNodeReady(node) {
runningPodIPs = append(runningPodIPs, pod.Status.PodIP)
}
}

if len(runningPodIPs) != 1 {
return fmt.Errorf("expected 1 controller pod got %d", len(pods.Items))
}
podIPMap[cr.Name] = runningPodIPs[0]

return nil
}

func isNodeReady(node *corev1.Node) bool {
for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady {
if cond.Status == corev1.ConditionTrue {
return true
}
}
}
return false
}

func (r *JivaVolumeReconciler) isScaleup(cr *openebsiov1alpha1.JivaVolume) bool {
if cr.Spec.DesiredReplicationFactor > cr.Spec.Policy.Target.ReplicationFactor {
if cr.Spec.Policy.Target.ReplicationFactor != cr.Status.ReplicaCount {
Expand Down Expand Up @@ -272,38 +334,44 @@ func (r *JivaVolumeReconciler) moveReplicasForMissingNodes(cr *openebsiov1alpha1
err = r.Delete(context.TODO(), &pod)
// wait for pod to get deleted and
// recreated
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
if err != nil && !errors.IsNotFound(err) {
return err
}
continue
}
return err
}

err = r.Get(context.TODO(), types.NamespacedName{
Name: pvc.GetAnnotations()[nodeAnnotation],
}, &corev1.Node{})
if err != nil {
if errors.IsNotFound(err) {
err = r.removeSTSVolume(pvc)
if err != nil {
return err
}
err = r.Delete(context.TODO(), &pod)
if err != nil {
nodeName := pvc.GetAnnotations()[nodeAnnotation]
// if a pvc and pod is deleted then in next iteration the nodeName
// will be empty which will end up in not-found error
// this can result in a race between pvc getting bound and operator deleting
// the pending pvc, so performing steps only if nodeName is present
if nodeName != "" {
err = r.Get(context.TODO(), types.NamespacedName{
Name: nodeName,
}, &corev1.Node{})
if err != nil {
if errors.IsNotFound(err) {
err = r.removeSTSVolume(pvc)
if err != nil {
return err
}
err = r.Delete(context.TODO(), &pod)
if err != nil {
return err
}
// wait for pod to get deleted and
// recreated
time.Sleep(10 * time.Second)
r.Recorder.Eventf(cr, corev1.EventTypeWarning,
"ReplicaMovement",
"replica %s and it's corresponding PVC & PV deleted",
pod.Name,
)
} else {
return err
}
// wait for pod to get deleted and
// recreated
time.Sleep(5 * time.Second)
r.Recorder.Eventf(cr, corev1.EventTypeWarning,
"ReplicaMovement",
"replica %s and it's corresponding PVC & PV deleted",
pod.Name,
)
} else {
return err
}
}
}
Expand Down Expand Up @@ -353,6 +421,12 @@ deletepvc:

// SetupWithManager sets up the controller with the Manager.
func (r *JivaVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.Pod{}, "status.phase", func(rawObj client.Object) []string {
pod := rawObj.(*corev1.Pod)
return []string{string(pod.Status.Phase)}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&openebsiov1alpha1.JivaVolume{}).
Owns(&appsv1.Deployment{}).
Expand Down Expand Up @@ -1158,8 +1232,8 @@ func setdefaults(cr *openebsiov1alpha1.JivaVolume) {
}
}

func (r *JivaVolumeReconciler) updateStatus(err error, cr *openebsiov1alpha1.JivaVolume) {
if err != nil {
func (r *JivaVolumeReconciler) updateStatus(err *error, cr *openebsiov1alpha1.JivaVolume) {
if *err != nil {
setdefaults(cr)
}
if err := r.updateJivaVolume(cr); err != nil {
Expand All @@ -1170,27 +1244,39 @@ func (r *JivaVolumeReconciler) updateStatus(err error, cr *openebsiov1alpha1.Jiv
}
}

func (r *JivaVolumeReconciler) getAndUpdateVolumeStatus(cr *openebsiov1alpha1.JivaVolume) (err error) {
func (r *JivaVolumeReconciler) getAndUpdateVolumeStatus(cr *openebsiov1alpha1.JivaVolume) error {
var (
cli *jiva.ControllerClient
err error
)

defer r.updateStatus(&err, cr)

if err = r.getJivaVolume(cr); err != nil {
return fmt.Errorf("Failed to getAndUpdateVolumeStatus, err: %v", err)
return fmt.Errorf("failed to getAndUpdateVolumeStatus, err: %v", err)
}

defer r.updateStatus(err, cr)
addr := cr.Spec.ISCSISpec.TargetIP + ":9501"
if podIP, ok := podIPMap[cr.Name]; ok {
addr = podIP + ":9501"
}

if len(addr) == 0 {
return fmt.Errorf("Failed to get volume stats: target address is empty")
return fmt.Errorf("failed to get volume stats: target address is empty")
}

cli = jiva.NewControllerClient(addr)
stats := &volume.Stats{}
err = cli.Get("/stats", stats)
if err != nil {
// log err only, as controller must be in container creating state
// don't return err as it will dump stack trace unneccesary
logrus.Info("Failed to get volume stats ", "err", err)
logrus.Info("failed to get volume stats ", "err", err)
err = r.updatePodIPMap(cr)
if err != nil {
logrus.Infof("failed to get controller pod ip for volume %s: %s", cr.Name, err.Error())
time.Sleep(1 * time.Second)
}
}

cr.Status.Status = stats.TargetStatus
Expand Down

0 comments on commit 43c61c0

Please sign in to comment.