Skip to content

Commit

Permalink
fix(controller): prevent zfs volume cr deletion if snapshot exists
Browse files Browse the repository at this point in the history
Signed-off-by: sinhaashish <ashi.sinha.87@gmail.com>
  • Loading branch information
sinhaashish committed Jan 30, 2025
1 parent da8b7ad commit 7e8f590
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 120 deletions.
2 changes: 1 addition & 1 deletion ci/ci-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ SNAP_CLASS=deploy/sample/zfssnapclass.yaml
export OPENEBS_NAMESPACE=${OPENEBS_NAMESPACE:-openebs}
TEST_DIR="$SCRIPT_DIR"/../tests

CRDS_TO_DELETE_ON_CLEANUP="zfsrestores.zfs.openebs.io zfssnapshots.zfs.openebs.io zfsvolumes.zfs.openebs.io zfsbackups.zfs.openebs.io zfsnodes.zfs.openebs.io"
CRDS_TO_DELETE_ON_CLEANUP="zfsrestores.zfs.openebs.io zfssnapshots.zfs.openebs.io zfsvolumes.zfs.openebs.io zfsbackups.zfs.openebs.io zfsnodes.zfs.openebs.io volumesnapshotclasses.snapshot.storage.k8s.io volumesnapshotcontents.snapshot.storage.k8s.io volumesnapshots.snapshot.storage.k8s.io"

help() {
cat <<EOF >&2
Expand Down
11 changes: 11 additions & 0 deletions pkg/builder/volbuilder/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
)

const MarkForDeletionAnnotation string = "openebs.io/mark-for-deletion"

// Builder is the builder object for ZFSVolume
type Builder struct {
volume *ZFSVolume
Expand Down Expand Up @@ -154,6 +156,15 @@ func (b *Builder) WithVolBlockSize(bs string) *Builder {
return b
}

// WithAnnotation sets the annotation of ZFSVolume
func (b *Builder) WithAnnotation() *Builder {
if b.volume.Object.Annotations == nil {
b.volume.Object.Annotations = make(map[string]string)
}
b.volume.Object.Annotations[MarkForDeletionAnnotation] = "true"
return b
}

// WithVolumeType sets if ZFSVolume needs to be thin provisioned
func (b *Builder) WithVolumeType(vtype string) *Builder {
b.volume.Object.Spec.VolumeType = vtype
Expand Down
105 changes: 94 additions & 11 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,14 @@ func (cs *controller) DeleteVolume(
defer unlock()

// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
vol, err := zfs.GetZFSVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

if err != nil {
if k8serror.IsNotFound(err) {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
Expand All @@ -524,19 +524,45 @@ func (cs *controller) DeleteVolume(
return nil, status.Error(codes.Internal, "can not delete, volume creation is in progress")
}

// Delete the corresponding ZV CR
err = zfs.DeleteVolume(volumeID)
// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.VolumeId,
err.Error(),
)
}

// Delete the corresponding ZV CR only if there are no snapshots present for the volume
if len(snapList.Items) == 0 {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
} else {
// add annotation to the volume to indicate that it is eligible for deletion
// once all the snapshots are deleted and the reclaim policy is not Retain
// this volume will be deleted
err = zfs.MarkForDeletion(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to annotate volume on deletion request for {%s}",
volumeID,
)
}

}

sendEventOrIgnore("", volumeID, vol.Spec.Capacity, analytics.VolumeDeprovision)

deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

Expand Down Expand Up @@ -815,8 +841,39 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
volumeID := snapshotID[0]
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()

// verify if the snapshot has already been deleted
snap, err := zfs.GetZFSSnapshot(snapshotID[1])
if snap != nil && snap.DeletionTimestamp != nil {
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

if err != nil {
if k8serror.IsNotFound(err) {
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
"failed to get snapshot for {%s}",
snapshotID[1],
)
}

// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.FailedPrecondition,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for snapshot list for volume. Error: %s",
volumeID,
err.Error(),
)
}

if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
Expand All @@ -825,7 +882,33 @@ func (cs *controller) DeleteSnapshot(
err.Error(),
)
}
return &csi.DeleteSnapshotResponse{}, nil

eligibleForDeletion, err := zfs.IsVolumeEligibleForDeletion(volumeID)
if err != nil {
return nil, status.Errorf(
codes.FailedPrecondition,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for eligible for deletion. Error: %s",
volumeID,
err.Error(),
)
}

// Delete the corresponding ZV CR only if this is the last snapshot
// for the volume and the corresponding pvc is deleted
if len(snapList.Items) == 1 && eligibleForDeletion {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
klog.Infof("volume %s deleted after the deletion of last snapshot %s ", volumeID, snapshotID[1])
}

return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

// ListSnapshots lists all snapshots for the
Expand Down
20 changes: 20 additions & 0 deletions pkg/response/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,23 @@ func NewDeleteVolumeResponseBuilder() *DeleteVolumeResponseBuilder {
func (b *DeleteVolumeResponseBuilder) Build() *csi.DeleteVolumeResponse {
return b.response
}

// DeleteSnapshotResponseBuilder helps building an
// instance of csi DeleteSnapshotResponse
type DeleteSnapshotResponseBuilder struct {
response *csi.DeleteSnapshotResponse
}

// NewDeleteSnapshotResponseBuilder returns a new
// instance of DeleteSnapshotResponseBuilder
func NewDeleteSnapshotResponseBuilder() *DeleteSnapshotResponseBuilder {
return &DeleteSnapshotResponseBuilder{
response: &csi.DeleteSnapshotResponse{},
}
}

// Build returns the constructed instance
// of csi DeleteSnapshotResponse
func (b *DeleteSnapshotResponseBuilder) Build() *csi.DeleteSnapshotResponse {
return b.response
}
66 changes: 59 additions & 7 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/openebs/zfs-localpv/pkg/builder/restorebuilder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -62,6 +64,8 @@ const (
OpenEBSCasTypeKey string = "openebs.io/cas-type"
// ZFSCasTypeName for the name of the cas-type
ZFSCasTypeName string = "localpv-zfs"
// MarkForDeletionAnnotation is the annotation key to mark the volume for deletion
MarkForDeletionAnnotation string = "openebs.io/marked-for-deletion"
)

var (
Expand Down Expand Up @@ -213,13 +217,6 @@ func DeleteSnapshot(snapname string) (err error) {
return
}

// GetVolume the corresponding ZFSVolume CR
func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
return volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).
Get(volumeID, metav1.GetOptions{})
}

// DeleteVolume deletes the corresponding ZFSVol CR
func DeleteVolume(volumeID string) (err error) {
err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
Expand Down Expand Up @@ -251,6 +248,18 @@ func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
return vol, err
}

// UpdateZFSVolumeAnnotation updtates the ZFSVolume CR with the given annotation
func UpdateZFSVolumeAnnotation(vol *apis.ZFSVolume) error {
newVol, err := volbuilder.BuildFrom(vol).
WithAnnotation().Build()

if err != nil {
return err
}
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
return err
}

// GetZFSVolumeState returns ZFSVolume OwnerNode and State for
// the given volume. CreateVolume request may call it again and
// again until volume is "Ready".
Expand Down Expand Up @@ -441,3 +450,46 @@ func IsVolumeReady(vol *apis.ZFSVolume) bool {

return false
}

// GetSnapshotForVolume fetches all the snapshots for the given volume
func GetSnapshotForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}

// MarkForDeletion marks the volume for deletion by adding the annotation
func MarkForDeletion(volumeName string) error {
zv, err := GetZFSVolume(volumeName)
if err != nil {
klog.Errorf("failed to get ZV %s: %v", volumeName, err)
return err
}

err = UpdateZFSVolumeAnnotation(zv)
if err != nil {
klog.Errorf("Failed to annotate the ZV with marked for deletion %s: %v", volumeName, err)
return err
}
return nil
}

// IsVolumeEligibleForDeletion checks if the volume can be deleted or not
func IsVolumeEligibleForDeletion(volumeName string) (bool, error) {

zfsVol, err := GetZFSVolume(volumeName)
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to get ZFSVolume %s: %v",
volumeName,
err,
)
}
if zfsVol.Annotations[MarkForDeletionAnnotation] == "true" {
return true, nil
}
return false, nil
}
Loading

0 comments on commit 7e8f590

Please sign in to comment.