diff --git a/charts/csi-wekafsplugin/templates/controllerserver-deployment.yaml b/charts/csi-wekafsplugin/templates/controllerserver-deployment.yaml index fc6db5ad..f2b02bf7 100755 --- a/charts/csi-wekafsplugin/templates/controllerserver-deployment.yaml +++ b/charts/csi-wekafsplugin/templates/controllerserver-deployment.yaml @@ -115,6 +115,9 @@ spec: {{- if (.Values.pluginConfig.skipGarbageCollection | default false) }} - "--skipgarbagecollection" {{- end }} + {{- if (.Values.pluginConfig.waitForObjectDeletion | default false) }} + - "--waitforobjectdeletion" + {{- end }} ports: - containerPort: 9898 name: healthz diff --git a/charts/csi-wekafsplugin/values.yaml b/charts/csi-wekafsplugin/values.yaml index 91b440c9..f2fd6418 100644 --- a/charts/csi-wekafsplugin/values.yaml +++ b/charts/csi-wekafsplugin/values.yaml @@ -174,3 +174,5 @@ pluginConfig: nfsProtocolVersion: "4.1" # -- Skip garbage collection of deleted directory-backed volume contents and only move them to trash. Default false skipGarbageCollection: false + # -- Wait for WEKA filesystem / snapshot deletion before acknowledging the corresponding CSI volume deletion. Default false + waitForObjectDeletion: false diff --git a/cmd/wekafsplugin/main.go b/cmd/wekafsplugin/main.go index 99edf20b..4e7faa97 100644 --- a/cmd/wekafsplugin/main.go +++ b/cmd/wekafsplugin/main.go @@ -97,6 +97,7 @@ var ( clientGroupName = flag.String("clientgroupname", "", "Name of the NFS client group to use for managing NFS permissions") nfsProtocolVersion = flag.String("nfsprotocolversion", "4.1", "NFS protocol version to use for mounting volumes") skipGarbageCollection = flag.Bool("skipgarbagecollection", false, "Skip garbage collection of directory volumes data, only move to trash") + waitForObjectDeletion = flag.Bool("waitforobjectdeletion", false, "Wait for object deletion before returning from DeleteVolume") // Set by the build process version = "" ) @@ -230,6 +231,7 @@ func handle() { *nfsProtocolVersion, version, *skipGarbageCollection, + *waitForObjectDeletion, ) driver, err := wekafs.NewWekaFsDriver( *driverName, *nodeID, *endpoint, *maxVolumesPerNode, version, *debugPath, csiMode, *selinuxSupport, config) diff --git a/pkg/wekafs/driverconfig.go b/pkg/wekafs/driverconfig.go index 6d59438f..f4cd9370 100644 --- a/pkg/wekafs/driverconfig.go +++ b/pkg/wekafs/driverconfig.go @@ -40,6 +40,7 @@ type DriverConfig struct { nfsProtocolVersion string csiVersion string skipGarbageCollection bool + waitForObjectDeletion bool } func (dc *DriverConfig) Log() { @@ -62,6 +63,8 @@ func (dc *DriverConfig) Log() { Bool("use_nfs", dc.useNfs). Str("interface_group_name", dc.interfaceGroupName). Str("client_group_name", dc.clientGroupName). + Bool("skip_garbage_collection", dc.skipGarbageCollection). + Bool("wait_for_object_deletion", dc.waitForObjectDeletion). Msg("Starting driver with the following configuration") } @@ -75,7 +78,7 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP allowNfsFailback, useNfs bool, interfaceGroupName, clientGroupName, nfsProtocolVersion string, version string, - skipGarbageCollection bool, + skipGarbageCollection, waitForObjectDeletion bool, ) *DriverConfig { var MutuallyExclusiveMountOptions []mutuallyExclusiveMountOptionSet @@ -122,6 +125,7 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP nfsProtocolVersion: nfsProtocolVersion, csiVersion: version, skipGarbageCollection: skipGarbageCollection, + waitForObjectDeletion: waitForObjectDeletion, } } diff --git a/pkg/wekafs/volume.go b/pkg/wekafs/volume.go index 81bccd14..fcf6b0e9 100644 --- a/pkg/wekafs/volume.go +++ b/pkg/wekafs/volume.go @@ -1295,7 +1295,7 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error { fsObj, err := v.getFilesystemObj(ctx) if err != nil { logger.Error().Err(err).Str("filesystem", v.FilesystemName).Msg("Failed to fetch filesystem for deletion") - return status.Errorf(codes.Internal, "Failed to fetch filesystem for deletion %s", v.FilesystemName) + return status.Errorf(codes.Internal, "Failed to fetch filesystem for deletion: %s, %e", v.FilesystemName, err) } if fsObj == nil || fsObj.Uid == uuid.Nil { logger.Warn().Str("filesystem", v.FilesystemName).Msg("Apparently filesystem not exists, returning OK") @@ -1333,14 +1333,15 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error { } } fsUid := fsObj.Uid - err, done := v.waitForFilesystemDeletion(ctx, logger, fsUid) - if done { - return err + if v.server.getConfig().waitForObjectDeletion { + return v.waitForFilesystemDeletion(ctx, logger, fsUid) } + + go func() { _ = v.waitForFilesystemDeletion(ctx, logger, fsUid) }() return nil } -func (v *Volume) waitForFilesystemDeletion(ctx context.Context, logger zerolog.Logger, fsUid uuid.UUID) (error, bool) { +func (v *Volume) waitForFilesystemDeletion(ctx context.Context, logger zerolog.Logger, fsUid uuid.UUID) error { logger.Trace().Msg("Waiting for filesystem deletion to complete") for start := time.Now(); time.Since(start) < MaxSnapshotDeletionDuration; { fsObj := &apiclient.FileSystem{} @@ -1348,22 +1349,22 @@ func (v *Volume) waitForFilesystemDeletion(ctx context.Context, logger zerolog.L if err != nil { if err == apiclient.ObjectNotFoundError { logger.Trace().Str("filesystem", v.FilesystemName).Msg("Filesystem was removed successfully") - return nil, true + return nil } - return err, true } if fsObj.Uid != uuid.Nil { if fsObj.IsRemoving { logger.Trace().Str("filesystem", v.FilesystemName).Msg("Filesystem is still being removed") + time.Sleep(time.Second) } else { - return errors.New(fmt.Sprintf("FilesystemName %s not marked for deletion but it should", v.FilesystemName)), true + logger.Error().Str("filesystem", v.FilesystemName).Msg("Filesystem not marked for deletion but it should") + return errors.New(fmt.Sprintf("FilesystemName %s not marked for deletion but it should", v.FilesystemName)) } } - time.Sleep(time.Second) } - logger.Error().Str("filesystem", v.FilesystemName).Msg("Timeout deleting volume") - return nil, false + logger.Error().Str("filesystem", v.FilesystemName).Str("volume_id", v.GetId()).Msg("Timeout deleting filesystem associated with volume") + return errors.New("Timeout deleting volume") } func (v *Volume) deleteSnapshot(ctx context.Context) error { @@ -1375,8 +1376,8 @@ func (v *Volume) deleteSnapshot(ctx context.Context) error { logger := log.Ctx(ctx).With().Str("volume_id", v.GetId()).Logger() snapObj, err := v.getSnapshotObj(ctx) if err != nil { - logger.Error().Err(err).Str("snapshot", v.SnapshotName).Msg("Failed to delete snapshot") - return status.Errorf(codes.Internal, "Failed to delete snapshot %s", v.SnapshotName) + logger.Error().Err(err).Str("snapshot", v.SnapshotName).Msg("Failed to fetch snapshot for deletion") + return status.Errorf(codes.Internal, "Failed to fetch snapshot for deletion: %s: %e", v.SnapshotName, err) } if snapObj == nil || snapObj.Uid == uuid.Nil { logger.Debug().Str("snapshot", v.SnapshotName).Msg("Snapshot not found, assuming repeating request") @@ -1404,41 +1405,41 @@ func (v *Volume) deleteSnapshot(ctx context.Context) error { Msg("Failed to delete snapshot") return status.Errorf(codes.Internal, "Failed to delete filesystem %s: %s", v.FilesystemName, err) } - err2, done := v.waitForSnapshotDeletion(ctx, logger, snapUid) - if done { - return err2 + if v.server.getConfig().waitForObjectDeletion { + return v.waitForSnapshotDeletion(ctx, logger, snapUid) } + go func() { _ = v.waitForSnapshotDeletion(ctx, logger, snapUid) }() return nil } -func (v *Volume) waitForSnapshotDeletion(ctx context.Context, logger zerolog.Logger, snapUid uuid.UUID) (error, bool) { +func (v *Volume) waitForSnapshotDeletion(ctx context.Context, logger zerolog.Logger, snapUid uuid.UUID) error { logger.Trace().Msg("Waiting for snapshot deletion to complete") for start := time.Now(); time.Since(start) < MaxSnapshotDeletionDuration; { snapObj := &apiclient.Snapshot{} err := v.apiClient.GetSnapshotByUid(ctx, snapUid, snapObj) if err != nil { if err == apiclient.ObjectNotFoundError { - logger.Trace().Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully") - return nil, true + logger.Debug().Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully") + return nil } if _, ok := err.(*apiclient.ApiNotFoundError); ok { logger.Debug().Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully") - return nil, true + return nil } - return err, true } if snapObj.Uid != uuid.Nil { if snapObj.IsRemoving { logger.Trace().Msg("Snapshot is still being removed") + time.Sleep(time.Second) + continue } else { - return errors.New(fmt.Sprintf("Snapshot %s not marked for deletion but it should", v.SnapshotUuid.String())), true + logger.Error().Str("filesystem", v.FilesystemName).Msg("Filesystem not marked for deletion but it should") + return errors.New(fmt.Sprintf("Snapshot %s not marked for deletion but it should", v.SnapshotUuid.String())) } } - time.Sleep(time.Second) } - - logger.Info().Str("filesystem", v.FilesystemName).Str("snapshot", v.SnapshotName).Msg("Snapshot deleted successfully") - return nil, false + logger.Error().Str("filesystem", v.FilesystemName).Str("snapshot", v.SnapshotName).Str("volume_id", v.GetId()).Msg("Timeout deleting snapshot associated with volume") + return nil } // ObtainRequestParams takes additional optional params from storage class params and applies them to Volume object