Skip to content

Commit

Permalink
refactor(CSI-305): change mount Map logic for WEKAFS to align with NF…
Browse files Browse the repository at this point in the history
…S and support same fs name on SCMC (#383)

### TL;DR
Refactored mount handling to improve container name management and mount reference counting.

### What changed?
- Added container name caching to ApiClient
- Introduced `EnsureLocalContainer` method to handle container name resolution
- Simplified mount reference counting using a flat map structure
- Added utility function to extract container names from mount points
- Fixed duplicate reference count increment in NFS mounting
- Added comprehensive test coverage for container name extraction
- Improved logging for mount operations and debugging

### How to test?
1. Test mounting filesystems with different container configurations
2. Verify container name extraction from mount points using the new test cases
3. Check mount reference counting behavior with multiple mount/unmount operations
4. Validate logging output for mount operations
5. Test compatibility with both legacy and new mounting scenarios

### Why make this change?
- Improves reliability of container name handling in multi-cluster environments
- Simplifies mount reference counting logic to prevent memory leaks
- Enhances debugging capabilities through better logging
- Reduces code duplication and potential race conditions
- Makes the codebase more maintainable and testable
  • Loading branch information
sergeyberezansky authored Nov 7, 2024
2 parents 9ea969b + dbdd772 commit 4851adb
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 137 deletions.
1 change: 1 addition & 0 deletions pkg/wekafs/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ApiClient struct {
NfsInterfaceGroups map[string]*InterfaceGroup
ApiUserRole ApiUserRole
ApiOrgId int
containerName string
}

type ApiEndPoint struct {
Expand Down
31 changes: 30 additions & 1 deletion pkg/wekafs/apiclient/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,31 @@ func (a *ApiClient) GetLocalContainer(ctx context.Context, allowProtocolContaine
}
}

func (a *ApiClient) EnsureLocalContainer(ctx context.Context, allowProtocolContainers bool) (string, error) {
// already have the container name set either via secret or via API call
if a.containerName != "" {
return a.containerName, nil
}
// if having a local container name set in secrets
if a.Credentials.LocalContainerName != "" {
a.containerName = a.Credentials.LocalContainerName
return a.containerName, nil
}

// if the cluster does not support multiple clusters, we must omit the container name since we can't pass it as a mount option
if !a.SupportsMultipleClusters() {
return a.containerName, nil
}

// fetch the container name from the API
container, err := a.GetLocalContainer(ctx, allowProtocolContainers)
if err != nil {
return "", err
}
a.containerName = container.ContainerName
return a.containerName, nil
}

func filterFrontendContainers(ctx context.Context, hostname string, containerList []Container, allowProtocolContainers bool) []Container {
logger := log.Ctx(ctx)
var ret []Container
Expand All @@ -233,7 +258,11 @@ func filterFrontendContainers(ctx context.Context, hostname string, containerLis
continue
}
if container.State != "ACTIVE" || container.Status != "UP" {
logger.Trace().Str("container_hostname", container.Hostname).Msg("Skipping an INACTIVE container")
logger.Trace().Str("container_hostname", container.Hostname).
Str("container_state", container.State).
Str("container_status", container.Status).
Str("container_id", container.Id).
Msg("Skipping an INACTIVE container")
continue
}
logger.Debug().Str("container_hostname", container.Hostname).Str("container_name", container.ContainerName).Msg("Found a valid container")
Expand Down
3 changes: 1 addition & 2 deletions pkg/wekafs/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ type AnyMounter interface {
getTransport() DataTransport
}

type mountsMapPerFs map[string]AnyMount
type mountsMap map[string]mountsMapPerFs
type nfsMountsMap map[string]int // we only follow the mountPath and number of references
type wekafsMountsMap map[string]int
type DataTransport string
type UnmountFunc func()

Expand Down
1 change: 0 additions & 1 deletion pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) e
}
refCount++
m.mounter.mountMap[m.getRefcountIdx()] = refCount
m.mounter.mountMap[m.getRefcountIdx()] = refCount

logger.Trace().
Int("refcount", refCount).
Expand Down
3 changes: 1 addition & 2 deletions pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func (m *nfsMounter) Mount(ctx context.Context, fs string, apiClient *apiclient.
}

func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error {
opts := options
options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs)
options = options.AsNfs()
options.Merge(options, m.exclusiveMountOptions)
log.Ctx(ctx).Trace().Strs("mount_options", options.Strings()).Str("filesystem", fsName).Msg("Received an unmount request")
mnt := m.NewMount(fsName, options).(*nfsMount)
// since we are not aware of the IP address of the mount, we need to find the mount point by listing the mounts
err := mnt.locateMountIP()
Expand All @@ -112,7 +112,6 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti
return err
}

log.Ctx(ctx).Trace().Strs("mount_options", opts.Strings()).Str("filesystem", fsName).Msg("Received an unmount request")
return mnt.decRef(ctx)
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

const SnapshotTypeUnifiedSnap = "wekasnap/v2"

var ProcMountsPath = "/proc/mounts"

func generateInnerPathForDirBasedVol(dynamicVolPath, csiVolName string) string {
requestedNameHash := getStringSha1(csiVolName)
asciiPart := getAsciiPart(csiVolName, 64)
Expand Down Expand Up @@ -321,6 +323,24 @@ func GetMountIpFromActualMountPoint(mountPointBase string) (string, error) {
}
return "", errors.New("mount point not found")
}
func GetMountContainerNameFromActualMountPoint(mountPointBase string) (string, error) {
file, err := os.Open(ProcMountsPath)
if err != nil {
return "", errors.New("failed to open /proc/mounts")
}
defer func() { _ = file.Close() }()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if len(fields) >= 4 && fields[2] == "wekafs" && strings.HasPrefix(fields[1], mountPointBase) {
optionsString := fields[3]
mountOptions := NewMountOptionsFromString(optionsString)
containerName := mountOptions.getOptionValue("container_name")
return containerName, nil
}
}
return "", errors.New(fmt.Sprintf("mount point not found: %s", mountPointBase))
}

func validateVolumeId(volumeId string) error {
// Volume New format:
Expand Down
73 changes: 73 additions & 0 deletions pkg/wekafs/utilities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package wekafs

import (
"github.com/stretchr/testify/assert"
"os"
"testing"
)

func TestGetMountContainerNameFromActualMountPoint(t *testing.T) {
// Create a temporary file to mock /proc/mounts
tmpFile, err := os.CreateTemp("", "mounts")
assert.NoError(t, err)
defer os.Remove(tmpFile.Name())

// Write mock data to the temporary file
mockData := `
dev/sdc1 /boot/efi vfat rw,relatime,fmask=0077,dmask=0077,codepage=437,iocharset=ascii,shortname=winnt,errors=remount-ro 0 0
fusectl /sys/fs/fuse/connections fusectl rw,relatime 0 0
binfmt_misc /proc/sys/fs/binfmt_misc binfmt_misc rw,relatime 0 0
/etc/auto.misc /misc autofs rw,relatime,fd=7,pgrp=2304,timeout=300,minproto=5,maxproto=5,indirect,pipe_ino=34520 0 0
-hosts /net autofs rw,relatime,fd=13,pgrp=2304,timeout=300,minproto=5,maxproto=5,indirect,pipe_ino=36373 0 0
/etc/auto.weka-smb /wekasmb autofs rw,relatime,fd=19,pgrp=2304,timeout=300,minproto=5,maxproto=5,indirect,pipe_ino=34528 0 0
/etc/auto.weka-smb /wekasmb-persistent autofs rw,relatime,fd=25,pgrp=2304,timeout=0,minproto=5,maxproto=5,indirect,pipe_ino=34531 0 0
/etc/auto.weka-kw /wekakwfs autofs rw,relatime,fd=31,pgrp=2304,timeout=300,minproto=5,maxproto=5,indirect,pipe_ino=34535 0 0
/etc/auto.weka-kw /wekakwfs-persistent autofs rw,relatime,fd=37,pgrp=2304,timeout=0,minproto=5,maxproto=5,indirect,pipe_ino=35191 0 0
tmpfs /run/user/0 tmpfs rw,nosuid,nodev,relatime,size=2238124k,mode=700 0 0
10.108.97.126/default /mnt/weka wekafs rw,relatime,writecache,inode_bits=auto,readahead_kb=32768,dentry_max_age_positive=1000,dentry_max_age_negative=0,container_name=client 0 0
10.108.97.126/default /mnt/weka wekafs rw,relatime,writecache,inode_bits=auto,readahead_kb=32768,dentry_max_age_positive=1000,dentry_max_age_negative=0,container_name=client 0 0
default /run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKST-42b24381dc12client wekafs rw,relatime,writecache,inode_bits=auto,readahead_kb=32768,dentry_max_age_positive=1000,dentry_max_age_negative=0,container_name=42b24381dc12client 0 0
default /run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKKK wekafs rw,relatime,writecache,inode_bits=auto,readahead_kb=32768,dentry_max_age_positive=1000,dentry_max_age_negative=0 0 0
default /run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKSS wekafs rw,relatime,writecache,inode_bits=auto,readahead_kb=32768,dentry_max_age_positive=1000,dentry_max_age_negative=0,container_name=containername 0 0
default /run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKAA-mystrangeclient wekafs rw,relatime,writecache,inode_bits=auto,readahead_kb=32768,dentry_max_age_positive=1000,dentry_max_age_negative=0 0 0
`
_, err = tmpFile.WriteString(mockData)
assert.NoError(t, err)
tmpFile.Close()

// Redirect the function to read from the temporary file
originalProcMountsPath := ProcMountsPath
defer func() { ProcMountsPath = originalProcMountsPath }()
ProcMountsPath = tmpFile.Name()

// Call the function and check the result
containerName, err := GetMountContainerNameFromActualMountPoint("/mnt/weka")
assert.NoError(t, err)
assert.Equal(t, "client", containerName)

containerName, err = GetMountContainerNameFromActualMountPoint("/run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKST-42b24381dc12client")
assert.NoError(t, err)
assert.Equal(t, "42b24381dc12client", containerName)

containerName, err = GetMountContainerNameFromActualMountPoint("/run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKKK")
assert.NoError(t, err)
assert.Equal(t, "", containerName)

containerName, err = GetMountContainerNameFromActualMountPoint("/run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKSS")
assert.NoError(t, err)
assert.Equal(t, "containername", containerName)

containerName, err = GetMountContainerNameFromActualMountPoint("/run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKSS-NONEXISTENT")
assert.Error(t, err)
assert.Equal(t, "", containerName)

containerName, err = GetMountContainerNameFromActualMountPoint("/run/weka-fs-mounts/default-DTQLAJ6KO6IUCZE23RBIM26YYUQNWKAA-mystrangeclient")
assert.NoError(t, err)
assert.Equal(t, "", containerName)

containerName, err = GetMountContainerNameFromActualMountPoint("/run/weka-fs-mounts/default-NONEXISTENT")
assert.Error(t, err)
assert.Equal(t, "", containerName)

}
Loading

0 comments on commit 4851adb

Please sign in to comment.