Skip to content

Commit

Permalink
feat(CSI-312): add topology awareness by providing accessibleTopology…
Browse files Browse the repository at this point in the history
… in PV creation (#426)

### TL;DR
Added topology support to volume creation in WekaFS CSI driver

### What changed?
- Added `AccessibleTopology` field to volume creation responses
- Implemented `generateAccessibleTopology()` function that creates topology segments with Weka-specific labels
- Applied topology information consistently across all volume creation scenarios

### How to test?
1. Deploy the CSI driver in a Kubernetes cluster with WekaFS
2. Create a new PersistentVolumeClaim (PVC)
3. Verify that the created PV includes topology information
4. Confirm that pods using the PVC are scheduled correctly based on topology constraints

### Why make this change?
Topology support enables better control over pod scheduling by ensuring pods are scheduled on nodes that have access to WekaFS. This improves the reliability of storage operations and helps maintain proper storage accessibility across the cluster.
  • Loading branch information
sergeyberezansky authored Jan 19, 2025
2 parents 36bdff8 + 84bac97 commit 97251ff
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions pkg/wekafs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,16 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
logger.Error().Msg("Failed to fetch volume capacity, assuming it was not set")
}
}

if volExists && volMatchesCapacity {
result = "SUCCESS"
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volume.GetId(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: params,
ContentSource: volume.getCsiContentSource(ctx),
VolumeId: volume.GetId(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: params,
ContentSource: volume.getCsiContentSource(ctx),
AccessibleTopology: generateAccessibleTopology(),
},
}, nil
} else if volExists && err == nil {
Expand All @@ -294,10 +296,11 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
result = "SUCCESS"
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volume.GetId(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: params,
ContentSource: volume.getCsiContentSource(ctx),
VolumeId: volume.GetId(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: params,
ContentSource: volume.getCsiContentSource(ctx),
AccessibleTopology: generateAccessibleTopology(),
},
}, nil

Expand All @@ -315,14 +318,25 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
result = "SUCCESS"
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volume.GetId(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: params,
ContentSource: volume.getCsiContentSource(ctx),
VolumeId: volume.GetId(),
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: params,
ContentSource: volume.getCsiContentSource(ctx),
AccessibleTopology: generateAccessibleTopology(),
},
}, nil
}

func generateAccessibleTopology() []*csi.Topology {
accessibleTopology := make(map[string]string)
accessibleTopology[TopologyLabelWeka] = "true"
return []*csi.Topology{
{
Segments: accessibleTopology,
},
}
}

func DeleteVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.DeleteVolumeResponse, error) {
err := status.Error(errorCode, strings.ToLower(errorMessage))
log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error deleting volume")
Expand Down

0 comments on commit 97251ff

Please sign in to comment.