Skip to content

Commit

Permalink
Merge pull request #286 from weka/sergey/fix-race-on-multi-delete
Browse files Browse the repository at this point in the history
fix(CSI-224,WEKAPP-417375): race condition on multiple volume deletion in parallel
  • Loading branch information
sergeyberezansky authored Jul 31, 2024
2 parents 6041fb4 + 9763acf commit 6f78a1a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 69 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ RUN apk add util-linux libselinux libselinux-utils util-linux pciutils usbutils
# Update CA certificates
RUN apk add ca-certificates
RUN update-ca-certificates
ADD https://github.com/tigrawap/locar/releases/download/0.4.0/locar_linux_amd64 /locar
RUN chmod +x /locar
COPY --from=go-builder /bin/wekafsplugin /wekafsplugin
ARG binary=/bin/wekafsplugin
ENTRYPOINT ["/wekafsplugin"]
142 changes: 73 additions & 69 deletions pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package wekafs

import (
"context"
"fmt"
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"go.opentelemetry.io/otel"
"io"
"os"
"os/exec"
"path/filepath"
"sync"
)

const garbagePath = ".__internal__wekafs-async-delete"

//const garbageCollectionMaxThreads = 32

type innerPathVolGc struct {
isRunning map[string]bool
isDeferred map[string]bool
Expand All @@ -26,42 +31,36 @@ func initInnerPathVolumeGc(mounter *wekaMounter) *innerPathVolGc {
return &gc
}

func (gc *innerPathVolGc) triggerGc(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fs] {
gc.isDeferred[fs] = true
return
}
gc.isRunning[fs] = true
go gc.purgeLeftovers(ctx, fs, apiClient)
}

func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) {
fsName := volume.FilesystemName
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fsName] {
gc.isDeferred[fsName] = true
return
}
gc.isRunning[fsName] = true
gc.isDeferred[fsName] = true
go gc.purgeVolume(ctx, volume)
op := "triggerGcVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Info().Msg("Triggering garbage collection of volume")
gc.moveVolumeToTrash(ctx, volume) // always do it synchronously
}

func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
func (gc *innerPathVolGc) moveVolumeToTrash(ctx context.Context, volume *Volume) {
op := "moveVolumeToTrash"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Debug().Msg("Starting garbage collection of volume")
fsName := volume.FilesystemName
defer gc.finishGcCycle(ctx, fsName, volume.apiClient)
defer gc.initiateGarbageCollection(ctx, fsName, volume.apiClient)
path, err, unmount := gc.mounter.Mount(ctx, fsName, volume.apiClient)
defer unmount()
if err != nil {
logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing")
return
}
volumeTrashLoc := filepath.Join(path, garbagePath)
if err := os.MkdirAll(volumeTrashLoc, DefaultVolumePermissions); err != nil {
logger.Error().Err(err).Msg("Failed to create garbage collector directory")
logger.Error().Str("garbage_collection_path", volumeTrashLoc).Err(err).Msg("Failed to create garbage collector directory")
} else {
logger.Debug().Str("garbage_collection_path", volumeTrashLoc).Msg("Successfuly created garbage collection directory")
logger.Debug().Str("garbage_collection_path", volumeTrashLoc).Msg("Successfully created garbage collection directory")
}
fullPath := filepath.Join(path, volume.GetFullPath(ctx))
logger.Debug().Str("full_path", fullPath).Str("volume_trash_location", volumeTrashLoc).Msg("Moving volume contents to trash")
Expand All @@ -75,65 +74,70 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
// so if the volume is dir/v1/<filesystem>/this/is/a/path/to/volume, we might move only the `volume`
// but otherwise it could be risky as if we have multiple volumes we might remove other data too, e.g.
// vol1: dir/v1/<filesystem>/this/is/a/path/to/volume, vol2: dir/v1/<filesystem>/this/is/a/path/to/another_volume

logger.Trace().Str("purge_path", volumeTrashLoc).Msg("Purging deleted volume data")
if err != nil {
logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing")
return
}
if err := purgeDirectory(ctx, volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("purge_path", volumeTrashLoc).Msg("Failed to remove directory")
return
}

logger.Debug().Msg("Volume purged")
}

func purgeDirectory(ctx context.Context, path string) error {
logger := log.Ctx(ctx).With().Str("path", path).Logger()
if !PathExists(path) {
logger.Error().Str("path", path).Msg("Failed to remove existing directory")
return nil
}
for !pathIsEmptyDir(path) { // to make sure that if new files still appeared during invocation
files, err := os.ReadDir(path)
if err != nil {
logger.Error().Err(err).Msg("GC failed to read directory contents")
return err
}
for _, f := range files {
fp := filepath.Join(path, f.Name())
if f.IsDir() {
if err := purgeDirectory(ctx, fp); err != nil {
logger.Error().Err(err).Msg("")
return err
}
} else if err := os.Remove(fp); err != nil {
logger.Error().Err(err).Msg("Failed to remove directory that was used mount point")
}
}
}
return os.Remove(path)
// 2024-07-29: apparently seems this is not a real problem since static volumes are not deleted this way
// and dynamic volumes are always created inside the /csi-volumes
logger.Debug().Str("full_path", fullPath).Str("volume_trash_location", volumeTrashLoc).Msg("Volume contents moved to trash")
}

func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
defer gc.finishGcCycle(ctx, fs, apiClient)
op := "purgeLeftovers"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
gc.Lock()
gc.isRunning[fs] = true
gc.Unlock()
path, err, unmount := gc.mounter.Mount(ctx, fs, apiClient)
defer unmount()
if err != nil {
log.Ctx(ctx).Error().Err(err).Str("filesystem", fs).Str("path", path).Msg("Failed mounting FS for garbage collection")
return
}
}
volumeTrashLoc := filepath.Join(path, garbagePath)

func (gc *innerPathVolGc) finishGcCycle(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
if fileExists("/locar") {
logger.Debug().Msg("Using locar for fast deletion")
deleteCmd := exec.Command("bash", "-c", fmt.Sprintf("/locar --type dir %s | /usr/bin/xargs -P32 -n128 rm -rf", volumeTrashLoc))
output, err := deleteCmd.CombinedOutput()
if err != nil {
logger.Error().Err(err).Msg("Error running locar")
logger.Trace().Str("output", string(output)).Msg("Locar output")
}
} else {
logger.Debug().Msg("Using default deletion method")
if err := os.RemoveAll(volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("path", volumeTrashLoc).Msg("Failed to perform garbage collection")
}
}
logger.Debug().Msg("Garbage collection completed")
gc.Lock()
defer gc.Unlock()
gc.isRunning[fs] = false
if gc.isDeferred[fs] {
gc.isDeferred[fs] = false
go gc.triggerGc(ctx, fs, apiClient)
go gc.purgeLeftovers(ctx, fs, apiClient)
}
}

func (gc *innerPathVolGc) initiateGarbageCollection(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
op := "initiateGarbageCollection"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
logger.Trace().Msg("Initiating garbage collection")
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fs] {
logger.Trace().Msg("Garbage collection already running, deferring next run")
gc.isDeferred[fs] = true
return
}
if !gc.isDeferred[fs] {
logger.Trace().Msg("Garbage collection not running, starting")
go gc.purgeLeftovers(ctx, fs, apiClient)
}
gc.Unlock()
}

// pathIsEmptyDir is a simple check to determine if directory is empty or not.
Expand Down
11 changes: 11 additions & 0 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ func PathExists(p string) bool {
return true
}

func fileExists(filename string) bool {
_, err := os.Stat(filename)
if err == nil {
return true
}
if os.IsNotExist(err) {
return false
}
return false
}

func PathIsWekaMount(ctx context.Context, path string) bool {
file, err := os.Open("/proc/mounts")
if err != nil {
Expand Down

0 comments on commit 6f78a1a

Please sign in to comment.