From 84bac97f4a22b0021fd92f55fc58e11d4b9044f6 Mon Sep 17 00:00:00 2001 From: Sergey Berezansky Date: Thu, 16 Jan 2025 15:31:27 +0200 Subject: [PATCH] feat(CSI-312): add topology awareness by providing accessibleTopology in PV creation --- pkg/wekafs/controllerserver.go | 38 +++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/pkg/wekafs/controllerserver.go b/pkg/wekafs/controllerserver.go index 6cc512f5..b57ed8ee 100644 --- a/pkg/wekafs/controllerserver.go +++ b/pkg/wekafs/controllerserver.go @@ -275,14 +275,16 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol logger.Error().Msg("Failed to fetch volume capacity, assuming it was not set") } } + if volExists && volMatchesCapacity { result = "SUCCESS" return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: volume.GetId(), - CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), - VolumeContext: params, - ContentSource: volume.getCsiContentSource(ctx), + VolumeId: volume.GetId(), + CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), + VolumeContext: params, + ContentSource: volume.getCsiContentSource(ctx), + AccessibleTopology: generateAccessibleTopology(), }, }, nil } else if volExists && err == nil { @@ -294,10 +296,11 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol result = "SUCCESS" return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: volume.GetId(), - CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), - VolumeContext: params, - ContentSource: volume.getCsiContentSource(ctx), + VolumeId: volume.GetId(), + CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), + VolumeContext: params, + ContentSource: volume.getCsiContentSource(ctx), + AccessibleTopology: generateAccessibleTopology(), }, }, nil @@ -315,14 +318,25 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol result = "SUCCESS" return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: volume.GetId(), - CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), - VolumeContext: params, - ContentSource: volume.getCsiContentSource(ctx), + VolumeId: volume.GetId(), + CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), + VolumeContext: params, + ContentSource: volume.getCsiContentSource(ctx), + AccessibleTopology: generateAccessibleTopology(), }, }, nil } +func generateAccessibleTopology() []*csi.Topology { + accessibleTopology := make(map[string]string) + accessibleTopology[TopologyLabelWeka] = "true" + return []*csi.Topology{ + { + Segments: accessibleTopology, + }, + } +} + func DeleteVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.DeleteVolumeResponse, error) { err := status.Error(errorCode, strings.ToLower(errorMessage)) log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error deleting volume")