Skip to content

Commit

Permalink
refactor(CSI-318): add configurable wait for filesystem / snapshot de…
Browse files Browse the repository at this point in the history
…letion
  • Loading branch information
sergeyberezansky committed Jan 28, 2025
1 parent 3cfa26a commit 553f939
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ spec:
{{- if (.Values.pluginConfig.skipGarbageCollection | default false) }}
- "--skipgarbagecollection"
{{- end }}
{{- if (.Values.pluginConfig.waitForObjectDeletion | default false) }}
- "--waitforobjectdeletion"
{{- end }}
ports:
- containerPort: 9898
name: healthz
Expand Down
2 changes: 2 additions & 0 deletions charts/csi-wekafsplugin/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,5 @@ pluginConfig:
nfsProtocolVersion: "4.1"
# -- Skip garbage collection of deleted directory-backed volume contents and only move them to trash. Default false
skipGarbageCollection: false
# -- Wait for WEKA filesystem / snapshot deletion before acknowledging the corresponding CSI volume deletion. Default false
waitForObjectDeletion: false
2 changes: 2 additions & 0 deletions cmd/wekafsplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
clientGroupName = flag.String("clientgroupname", "", "Name of the NFS client group to use for managing NFS permissions")
nfsProtocolVersion = flag.String("nfsprotocolversion", "4.1", "NFS protocol version to use for mounting volumes")
skipGarbageCollection = flag.Bool("skipgarbagecollection", false, "Skip garbage collection of directory volumes data, only move to trash")
waitForObjectDeletion = flag.Bool("waitforobjectdeletion", false, "Wait for object deletion before returning from DeleteVolume")
// Set by the build process
version = ""
)
Expand Down Expand Up @@ -230,6 +231,7 @@ func handle() {
*nfsProtocolVersion,
version,
*skipGarbageCollection,
*waitForObjectDeletion,
)
driver, err := wekafs.NewWekaFsDriver(
*driverName, *nodeID, *endpoint, *maxVolumesPerNode, version, *debugPath, csiMode, *selinuxSupport, config)
Expand Down
6 changes: 5 additions & 1 deletion pkg/wekafs/driverconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DriverConfig struct {
nfsProtocolVersion string
csiVersion string
skipGarbageCollection bool
waitForObjectDeletion bool
}

func (dc *DriverConfig) Log() {
Expand All @@ -62,6 +63,8 @@ func (dc *DriverConfig) Log() {
Bool("use_nfs", dc.useNfs).
Str("interface_group_name", dc.interfaceGroupName).
Str("client_group_name", dc.clientGroupName).
Bool("skip_garbage_collection", dc.skipGarbageCollection).
Bool("wait_for_object_deletion", dc.waitForObjectDeletion).
Msg("Starting driver with the following configuration")

}
Expand All @@ -75,7 +78,7 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP
allowNfsFailback, useNfs bool,
interfaceGroupName, clientGroupName, nfsProtocolVersion string,
version string,
skipGarbageCollection bool,
skipGarbageCollection, waitForObjectDeletion bool,
) *DriverConfig {

var MutuallyExclusiveMountOptions []mutuallyExclusiveMountOptionSet
Expand Down Expand Up @@ -122,6 +125,7 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP
nfsProtocolVersion: nfsProtocolVersion,
csiVersion: version,
skipGarbageCollection: skipGarbageCollection,
waitForObjectDeletion: waitForObjectDeletion,
}
}

Expand Down
53 changes: 27 additions & 26 deletions pkg/wekafs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error {
fsObj, err := v.getFilesystemObj(ctx)
if err != nil {
logger.Error().Err(err).Str("filesystem", v.FilesystemName).Msg("Failed to fetch filesystem for deletion")
return status.Errorf(codes.Internal, "Failed to fetch filesystem for deletion %s", v.FilesystemName)
return status.Errorf(codes.Internal, "Failed to fetch filesystem for deletion: %s, %e", v.FilesystemName, err)
}
if fsObj == nil || fsObj.Uid == uuid.Nil {
logger.Warn().Str("filesystem", v.FilesystemName).Msg("Apparently filesystem not exists, returning OK")
Expand Down Expand Up @@ -1333,37 +1333,38 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error {
}
}
fsUid := fsObj.Uid
err, done := v.waitForFilesystemDeletion(ctx, logger, fsUid)
if done {
return err
if v.server.getConfig().waitForObjectDeletion {
return v.waitForFilesystemDeletion(ctx, logger, fsUid)
}

go func() { _ = v.waitForFilesystemDeletion(ctx, logger, fsUid) }()
return nil
}

func (v *Volume) waitForFilesystemDeletion(ctx context.Context, logger zerolog.Logger, fsUid uuid.UUID) (error, bool) {
func (v *Volume) waitForFilesystemDeletion(ctx context.Context, logger zerolog.Logger, fsUid uuid.UUID) error {
logger.Trace().Msg("Waiting for filesystem deletion to complete")
for start := time.Now(); time.Since(start) < MaxSnapshotDeletionDuration; {
fsObj := &apiclient.FileSystem{}
err := v.apiClient.GetFileSystemByUid(ctx, fsUid, fsObj)
if err != nil {
if err == apiclient.ObjectNotFoundError {
logger.Trace().Str("filesystem", v.FilesystemName).Msg("Filesystem was removed successfully")
return nil, true
return nil
}
return err, true
}
if fsObj.Uid != uuid.Nil {
if fsObj.IsRemoving {
logger.Trace().Str("filesystem", v.FilesystemName).Msg("Filesystem is still being removed")
time.Sleep(time.Second)
} else {
return errors.New(fmt.Sprintf("FilesystemName %s not marked for deletion but it should", v.FilesystemName)), true
logger.Error().Str("filesystem", v.FilesystemName).Msg("Filesystem not marked for deletion but it should")
return errors.New(fmt.Sprintf("FilesystemName %s not marked for deletion but it should", v.FilesystemName))
}
}
time.Sleep(time.Second)
}

logger.Error().Str("filesystem", v.FilesystemName).Msg("Timeout deleting volume")
return nil, false
logger.Error().Str("filesystem", v.FilesystemName).Str("volume_id", v.GetId()).Msg("Timeout deleting filesystem associated with volume")
return errors.New("Timeout deleting volume")
}

func (v *Volume) deleteSnapshot(ctx context.Context) error {
Expand All @@ -1375,8 +1376,8 @@ func (v *Volume) deleteSnapshot(ctx context.Context) error {
logger := log.Ctx(ctx).With().Str("volume_id", v.GetId()).Logger()
snapObj, err := v.getSnapshotObj(ctx)
if err != nil {
logger.Error().Err(err).Str("snapshot", v.SnapshotName).Msg("Failed to delete snapshot")
return status.Errorf(codes.Internal, "Failed to delete snapshot %s", v.SnapshotName)
logger.Error().Err(err).Str("snapshot", v.SnapshotName).Msg("Failed to fetch snapshot for deletion")
return status.Errorf(codes.Internal, "Failed to fetch snapshot for deletion: %s: %e", v.SnapshotName, err)
}
if snapObj == nil || snapObj.Uid == uuid.Nil {
logger.Debug().Str("snapshot", v.SnapshotName).Msg("Snapshot not found, assuming repeating request")
Expand Down Expand Up @@ -1404,41 +1405,41 @@ func (v *Volume) deleteSnapshot(ctx context.Context) error {
Msg("Failed to delete snapshot")
return status.Errorf(codes.Internal, "Failed to delete filesystem %s: %s", v.FilesystemName, err)
}
err2, done := v.waitForSnapshotDeletion(ctx, logger, snapUid)
if done {
return err2
if v.server.getConfig().waitForObjectDeletion {
return v.waitForSnapshotDeletion(ctx, logger, snapUid)
}
go func() { _ = v.waitForSnapshotDeletion(ctx, logger, snapUid) }()
return nil
}

func (v *Volume) waitForSnapshotDeletion(ctx context.Context, logger zerolog.Logger, snapUid uuid.UUID) (error, bool) {
func (v *Volume) waitForSnapshotDeletion(ctx context.Context, logger zerolog.Logger, snapUid uuid.UUID) error {
logger.Trace().Msg("Waiting for snapshot deletion to complete")
for start := time.Now(); time.Since(start) < MaxSnapshotDeletionDuration; {
snapObj := &apiclient.Snapshot{}
err := v.apiClient.GetSnapshotByUid(ctx, snapUid, snapObj)
if err != nil {
if err == apiclient.ObjectNotFoundError {
logger.Trace().Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully")
return nil, true
logger.Debug().Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully")
return nil
}
if _, ok := err.(*apiclient.ApiNotFoundError); ok {
logger.Debug().Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully")
return nil, true
return nil
}
return err, true
}
if snapObj.Uid != uuid.Nil {
if snapObj.IsRemoving {
logger.Trace().Msg("Snapshot is still being removed")
time.Sleep(time.Second)
continue
} else {
return errors.New(fmt.Sprintf("Snapshot %s not marked for deletion but it should", v.SnapshotUuid.String())), true
logger.Error().Str("filesystem", v.FilesystemName).Msg("Filesystem not marked for deletion but it should")
return errors.New(fmt.Sprintf("Snapshot %s not marked for deletion but it should", v.SnapshotUuid.String()))
}
}
time.Sleep(time.Second)
}

logger.Info().Str("filesystem", v.FilesystemName).Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully")
return nil, false
logger.Error().Str("filesystem", v.FilesystemName).Str("snapshot", v.SnapshotName).Str("volume_id", v.GetId()).Msg("Timeout deleting snapshot associated with volume")
return nil
}

// ObtainRequestParams takes additional optional params from storage class params and applies them to Volume object
Expand Down

0 comments on commit 553f939

Please sign in to comment.