Skip to content

Commit

Permalink
refactor(CSI-272): move NFS client registration to APIClient startup …
Browse files Browse the repository at this point in the history
…rather than on each mount
  • Loading branch information
sergeyberezansky committed Jan 28, 2025
1 parent c3ae164 commit 3cfa26a
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 72 deletions.
48 changes: 48 additions & 0 deletions pkg/wekafs/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type ApiClient struct {
ApiUserRole ApiUserRole
ApiOrgId int
containerName string
NfsInterfaceGroupName string
NfsClientGroupName string
}

type ApiEndPoint struct {
Expand Down Expand Up @@ -814,6 +816,52 @@ func (a *ApiClient) HasCSIPermissions() bool {
return false
}

func (a *ApiClient) RegisterNfsClientGroup(ctx context.Context) error {
logger := log.Ctx(ctx)
targetIp, err := a.GetNfsMountIp(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to get NFS mount IP")
return err
}

nodeIP, err := GetNodeIpAddressByRouting(targetIp)
if err != nil {
logger.Error().Err(err).Msg("Failed to get routed node IP address, relying on node IP")
nodeIP = GetNodeIpAddress()
}

logger.Debug().Str("target_ip", targetIp).Str("client_ip", nodeIP).Msg("Registering IP address in NFS client group")
updatedConfig := false

logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group")
cg, created, err := a.EnsureCsiPluginNfsClientGroup(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to ensure NFS client group")
return err
}

if created {
a.NfsClientGroupName = cg.Name
}

updatedConfig = updatedConfig || created

// Ensure client group rule
logger.Trace().Str("ip_address", nodeIP).Str("client_group", cg.Name).Msg("Ensuring NFS Client Group Rule for IP")
created, err = a.EnsureNfsClientGroupRuleForIp(ctx, cg, nodeIP)
if err != nil {
logger.Error().Err(err).Str("ip_address", nodeIP).Msg("Failed to ensure NFS client group rule for IP")
return err
}
updatedConfig = updatedConfig || created

if updatedConfig {
logger.Trace().Msg("Waiting for NFS configuration to be applied")
time.Sleep(5 * time.Second)
}
return nil
}

func maskPayload(payload string) string {
masker := mask.NewMasker()
masker.RegisterMaskStringFunc(mask.MaskTypeFilled, masker.MaskFilledString)
Expand Down
4 changes: 2 additions & 2 deletions pkg/wekafs/apiclient/interfacegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (a *ApiClient) GetNfsInterfaceGroup(ctx context.Context, name string) *Inte

// GetNfsMountIp returns the IP address of the NFS interface group to be used for NFS mount
// TODO: need to do it much more sophisticated way to distribute load
func (a *ApiClient) GetNfsMountIp(ctx context.Context, interfaceGroupName string) (string, error) {
func (a *ApiClient) GetNfsMountIp(ctx context.Context) (string, error) {
// if override is set, use it
if len(a.Credentials.NfsTargetIPs) > 0 && a.Credentials.NfsTargetIPs[0] != "" {
ips := a.Credentials.NfsTargetIPs
Expand All @@ -197,7 +197,7 @@ func (a *ApiClient) GetNfsMountIp(ctx context.Context, interfaceGroupName string
return ip, nil
}

ig := a.GetNfsInterfaceGroup(ctx, interfaceGroupName)
ig := a.GetNfsInterfaceGroup(ctx, a.NfsInterfaceGroupName)
if ig == nil {
return "", errors.New("no NFS interface group found")
}
Expand Down
39 changes: 13 additions & 26 deletions pkg/wekafs/apiclient/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,26 +473,26 @@ func (a *ApiClient) CreateNfsClientGroup(ctx context.Context, r *NfsClientGroupC
return err
}

func (a *ApiClient) EnsureCsiPluginNfsClientGroup(ctx context.Context, clientGroupName string) (grp *NfsClientGroup, created bool, err error) {
func (a *ApiClient) EnsureCsiPluginNfsClientGroup(ctx context.Context) (grp *NfsClientGroup, created bool, err error) {
op := "EnsureCsiPluginNfsClientGroup"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
var ret *NfsClientGroup
if clientGroupName == "" {
clientGroupName = NfsClientGroupName
if a.NfsClientGroupName == "" {
a.NfsClientGroupName = NfsClientGroupName
}
logger.Trace().Str("client_group_name", clientGroupName).Msg("Getting client group by name")
ret, err = a.GetNfsClientGroupByName(ctx, clientGroupName)
logger.Trace().Str("client_group_name", a.NfsClientGroupName).Msg("Getting client group by name")
ret, err = a.GetNfsClientGroupByName(ctx, a.NfsClientGroupName)
if err != nil {
if err != ObjectNotFoundError {
logger.Error().Err(err).Msg("Failed to get client group by name")
return ret, false, err
} else {
logger.Trace().Str("client_group_name", clientGroupName).Msg("Existing client group not found, creating client group")
logger.Trace().Str("client_group_name", a.NfsClientGroupName).Msg("Existing client group not found, creating client group")

err = a.CreateNfsClientGroup(ctx, NewNfsClientGroupCreateRequest(clientGroupName), ret)
err = a.CreateNfsClientGroup(ctx, NewNfsClientGroupCreateRequest(a.NfsClientGroupName), ret)
return ret, err == nil, nil
}
}
Expand Down Expand Up @@ -655,12 +655,12 @@ func (r *NfsClientGroupRule) IsEligibleForIP(ip string) bool {
return network.ContainsIPAddress(ip)
}

func (a *ApiClient) GetNfsClientGroupRules(ctx context.Context, clientGroupName string, rules *[]NfsClientGroupRule) error {
func (a *ApiClient) GetNfsClientGroupRules(ctx context.Context, rules *[]NfsClientGroupRule) error {
op := "GetNfsClientGroupRules"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
cg, _, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName)
cg, _, err := a.EnsureCsiPluginNfsClientGroup(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -821,7 +821,7 @@ func (a *ApiClient) EnsureNfsClientGroupRuleForIp(ctx context.Context, cg *NfsCl
return false, err
}

func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName string, version NfsVersionString, clientGroupName string) error {
func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, fsName string, version NfsVersionString, clientGroupName string) error {
op := "EnsureNfsPermissions"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
Expand All @@ -834,29 +834,16 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName
}
var created bool

logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupCaption).Msg("Ensuring NFS permissions")
// Ensure client group
logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group")
cg, created, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName)
cg, err := a.GetNfsClientGroupByName(ctx, clientGroupCaption)
if err != nil {
logger.Error().Err(err).Msg("Failed to ensure NFS client group")
logger.Error().Err(err).Str("client_group_name", clientGroupCaption).Msg("Failed to get NFS client group by name")
return err
}
updateConfigRequired = updateConfigRequired || created

// Ensure client group rule
logger.Trace().Str("ip_address", ip).Msg("Ensuring NFS Client Group Rule for IP")
created, err = a.EnsureNfsClientGroupRuleForIp(ctx, cg, ip)
if err != nil {
logger.Error().Err(err).Str("ip_address", ip).Msg("Failed to ensure NFS client group rule for IP")
return err
}
updateConfigRequired = updateConfigRequired || created
// Ensure NFS permission
logger.Trace().Str("filesystem", fsName).Str("client_group", cg.Name).Msg("Ensuring NFS Export for client group")
created, err = EnsureNfsPermission(ctx, fsName, cg.Name, version, a)
updateConfigRequired = updateConfigRequired || created
if updateConfigRequired {
if created {
logger.Trace().Msg("Waiting for NFS configuration to be applied")
time.Sleep(5 * time.Second)
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/wekafs/apiclient/nfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,16 @@ func TestNfsClientGroup(t *testing.T) {

func TestEnsureCsiPluginNfsClientGroup(t *testing.T) {
apiClient := GetApiClientForTest(t)
result, _, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background(), NfsClientGroupName)
ctx := context.Background()
result, _, err := apiClient.EnsureCsiPluginNfsClientGroup(ctx)
assert.NoError(t, err)
assert.NotNil(t, result)
}

func TestNfsClientGroupRules(t *testing.T) {
apiClient := GetApiClientForTest(t)
cg, _, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background(), NfsClientGroupName)
ctx := context.Background()
cg, _, err := apiClient.EnsureCsiPluginNfsClientGroup(ctx)
assert.NoError(t, err)
assert.NotNil(t, cg)

Expand All @@ -247,7 +249,7 @@ outerLoop:
assert.NoError(t, err)
}
rules := &[]NfsClientGroupRule{}
err = apiClient.GetNfsClientGroupRules(context.Background(), NfsClientGroupName, rules)
err = apiClient.GetNfsClientGroupRules(ctx, rules)
assert.NoError(t, err)
assert.NotEmpty(t, rules)
for _, rule := range *rules {
Expand All @@ -262,7 +264,8 @@ func TestNfsEnsureNfsPermissions(t *testing.T) {
apiClient := GetApiClientForTest(t)

// Test EnsureNfsPermission
err := apiClient.EnsureNfsPermissions(context.Background(), "172.16.5.147", "default", NfsVersionV4, NfsClientGroupName)
ctx := context.Background()
err := apiClient.EnsureNfsPermissions(ctx, "default", NfsVersionV4, NfsClientGroupName)
assert.NoError(t, err)
}

Expand Down
31 changes: 12 additions & 19 deletions pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ import (
)

type nfsMount struct {
mounter *nfsMounter
fsName string
mountPoint string
kMounter mount.Interface
debugPath string
mountOptions MountOptions
lastUsed time.Time
mountIpAddress string
interfaceGroupName string
clientGroupName string
protocolVersion apiclient.NfsVersionString
mounter *nfsMounter
fsName string
mountPoint string
kMounter mount.Interface
debugPath string
mountOptions MountOptions
lastUsed time.Time
mountIpAddress string
clientGroupName string
protocolVersion apiclient.NfsVersionString
}

func (m *nfsMount) getMountPoint() string {
Expand Down Expand Up @@ -153,7 +152,7 @@ func (m *nfsMount) doUnmount(ctx context.Context) error {

func (m *nfsMount) ensureMountIpAddress(ctx context.Context, apiClient *apiclient.ApiClient) error {
if m.mountIpAddress == "" {
ip, err := apiClient.GetNfsMountIp(ctx, m.interfaceGroupName)
ip, err := apiClient.GetNfsMountIp(ctx)
if err != nil {
return err
}
Expand All @@ -178,14 +177,8 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
}

if !m.isInDevMode() {

nodeIP, err := apiclient.GetNodeIpAddressByRouting(m.mountIpAddress)
err := apiClient.EnsureNfsPermissions(ctx, m.fsName, apiclient.NfsVersionV4, m.clientGroupName)
if err != nil {
logger.Error().Err(err).Msg("Failed to get routed node IP address, relying on node IP")
nodeIP = apiclient.GetNodeIpAddress()
}

if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName, apiclient.NfsVersionV4, m.clientGroupName) != nil {
logger.Error().Err(err).Msg("Failed to ensure NFS permissions")
return errors.New("failed to ensure NFS permissions")
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type nfsMounter struct {
debugPath string
selinuxSupport *bool
gc *innerPathVolGc
interfaceGroupName string
clientGroupName string
nfsProtocolVersion string
exclusiveMountOptions []mutuallyExclusiveMountOptionSet
Expand All @@ -38,7 +37,6 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter {
mounter.gc = initInnerPathVolumeGc(mounter)
mounter.gc.config = driver.config
mounter.schedulePeriodicMountGc()
mounter.interfaceGroupName = driver.config.interfaceGroupName
mounter.clientGroupName = driver.config.clientGroupName
mounter.nfsProtocolVersion = driver.config.nfsProtocolVersion

Expand All @@ -51,15 +49,14 @@ func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount {
}
uniqueId := getStringSha1AsB32(fsName + ":" + options.String())
wMount := &nfsMount{
mounter: m,
kMounter: m.kMounter,
fsName: fsName,
debugPath: m.debugPath,
mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId,
mountOptions: options,
interfaceGroupName: m.interfaceGroupName,
clientGroupName: m.clientGroupName,
protocolVersion: apiclient.NfsVersionString(fmt.Sprintf("V%s", m.nfsProtocolVersion)),
mounter: m,
kMounter: m.kMounter,
fsName: fsName,
debugPath: m.debugPath,
mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId,
mountOptions: options,
clientGroupName: m.clientGroupName,
protocolVersion: apiclient.NfsVersionString(fmt.Sprintf("V%s", m.nfsProtocolVersion)),
}
return wMount
}
Expand Down
30 changes: 20 additions & 10 deletions pkg/wekafs/wekafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ var (
// ApiStore hashmap of all APIs defined by credentials + endpoints
type ApiStore struct {
sync.Mutex
apis map[uint32]*apiclient.ApiClient
allowInsecureHttps bool
Hostname string
apis map[uint32]*apiclient.ApiClient
config *DriverConfig
Hostname string
}

// Die used to intentionally panic and exit, while updating termination log
Expand Down Expand Up @@ -151,7 +151,7 @@ func (api *ApiStore) fromCredentials(ctx context.Context, credentials apiclient.
logger := log.Ctx(ctx)
logger.Trace().Str("api_client", credentials.String()).Msg("Creating new Weka API client")
// doing this to fetch a client hash
newClient, err := apiclient.NewApiClient(ctx, credentials, api.allowInsecureHttps, hostname)
newClient, err := apiclient.NewApiClient(ctx, credentials, api.config.allowInsecureHttps, hostname)
if err != nil {
return nil, errors.New("could not create API client object from supplied params")
}
Expand All @@ -176,7 +176,17 @@ func (api *ApiStore) fromCredentials(ctx context.Context, credentials apiclient.
"To support organization other than Root please upgrade to version %s or higher",
credentials.Organization, newClient.ClusterName, apiclient.MinimumSupportedWekaVersions.MountFilesystemsUsingAuthToken))
}
if (api.config.allowNfsFailback || api.config.useNfs) && !api.config.isInDevMode() {
newClient.NfsInterfaceGroupName = api.config.interfaceGroupName
newClient.NfsClientGroupName = api.config.clientGroupName
err := newClient.RegisterNfsClientGroup(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to register NFS client group")
return nil, err
}
}
api.apis[hash] = newClient

return newClient, nil
}

Expand All @@ -195,12 +205,12 @@ func (api *ApiStore) GetClientFromSecrets(ctx context.Context, secrets map[strin
return client, nil
}

func NewApiStore(allowInsecureHttps bool, hostname string) *ApiStore {
func NewApiStore(config *DriverConfig, hostname string) *ApiStore {
s := &ApiStore{
Mutex: sync.Mutex{},
apis: make(map[uint32]*apiclient.ApiClient),
allowInsecureHttps: allowInsecureHttps,
Hostname: hostname,
Mutex: sync.Mutex{},
apis: make(map[uint32]*apiclient.ApiClient),
config: config,
Hostname: hostname,
}
return s
}
Expand Down Expand Up @@ -235,7 +245,7 @@ func NewWekaFsDriver(
version: vendorVersion,
endpoint: endpoint,
maxVolumesPerNode: maxVolumesPerNode,
api: NewApiStore(config.allowInsecureHttps, nodeID),
api: NewApiStore(config, nodeID),
debugPath: debugPath,
csiMode: csiMode, // either "controller", "node", "all"
selinuxSupport: selinuxSupport,
Expand Down

0 comments on commit 3cfa26a

Please sign in to comment.