From 801d1a6a4396c37bb16797a710962d6b2c964f01 Mon Sep 17 00:00:00 2001 From: mozillazg Date: Mon, 9 Oct 2023 14:24:45 +0800 Subject: [PATCH] oss: support CoCo direct volume feature --- pkg/oss/coco.go | 137 +++++++++++++++++++ pkg/oss/coco_test.go | 122 +++++++++++++++++ pkg/oss/nodeserver.go | 10 ++ pkg/utils/kata/directvolume/direct_volume.go | 83 +++++++++++ pkg/utils/kata/directvolume/utils.go | 9 ++ 5 files changed, 361 insertions(+) create mode 100644 pkg/oss/coco.go create mode 100644 pkg/oss/coco_test.go create mode 100644 pkg/utils/kata/directvolume/direct_volume.go create mode 100644 pkg/utils/kata/directvolume/utils.go diff --git a/pkg/oss/coco.go b/pkg/oss/coco.go new file mode 100644 index 000000000..6cb848ad9 --- /dev/null +++ b/pkg/oss/coco.go @@ -0,0 +1,137 @@ +package oss + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/kata/directvolume" + log "github.com/sirupsen/logrus" +) + +const ( + optDirectAssigned = "direct" + optAnnotations = "annotations" + optEncrypted = "encrypted" + optEncPasswd = "encPasswd" + optKmsKeyId = "kmsKeyId" + fsTypeSecureMount = "secure_mount" + volumeTypeOSS = "alibaba-cloud-oss" + sealedSecretPrefix = "sealed." +) + +func (ns *nodeServer) publishDirectVolume(ctx context.Context, req *csi.NodePublishVolumeRequest, opt *Options) (*csi.NodePublishVolumeResponse, error) { + logger := log.WithFields(map[string]interface{}{ + "NodeServer": "OSSNodeServer", + "VolumeId": req.VolumeId, + "TargetPath": req.TargetPath, + }) + + volumePath := req.TargetPath + if isDirectVolumePath(volumePath) { + logger.Infof("NodePublishVolume: The mount info for DirectVolume is already exist: %s", volumePath) + return &csi.NodePublishVolumeResponse{}, nil + } + + device := req.TargetPath + volumeType := volumeTypeOSS + fsType := fsTypeSecureMount + var encrypted, encPasswd, kmsKeyId string + annotationsObj := map[string]string{} + + if v := req.Secrets[optEncPasswd]; v != "" { + encPasswd = v + } + for key, value := range req.VolumeContext { + switch key { + case optEncrypted: + encrypted = strings.TrimSpace(value) + case optEncPasswd: + encPasswd = strings.TrimSpace(value) + case optKmsKeyId: + kmsKeyId = strings.TrimSpace(value) + case optAnnotations: + json.Unmarshal([]byte(value), &annotationsObj) + } + } + + if annotationsObj["kata_fs_type"] != "" { + fsType = annotationsObj["kata_fs_type"] + } + if annotationsObj["kata_volume_type"] != "" { + volumeType = annotationsObj["kata_volume_type"] + } + if annotationsObj["kata_device"] != "" { + device = annotationsObj["kata_device"] + } + annotations, _ := json.Marshal(annotationsObj) + annotationsStr := string(annotations) + if len(annotations) == 0 || annotationsStr == "null" || annotationsStr == "" { + annotationsStr = "{}" + } + metadata := map[string]string{ + "bucket": opt.Bucket, + "url": opt.URL, + "otherOpts": opt.OtherOpts, + "path": opt.Path, + "encrypted": encrypted, + "kmsKeyId": kmsKeyId, + "annotations": annotationsStr, + "volumeId": req.GetVolumeId(), + "readonly": fmt.Sprintf("%v", req.Readonly), + "targetPath": req.GetTargetPath(), + } + if opt.AkSecret != "" && strings.HasPrefix(opt.AkSecret, sealedSecretPrefix) { + metadata[AkID] = opt.AkID + metadata[AkSecret] = opt.AkSecret + } + if encPasswd != "" && strings.HasPrefix(encPasswd, sealedSecretPrefix) { + metadata[optEncPasswd] = encPasswd + } + + mountInfo := directvolume.MountInfo{ + VolumeType: volumeType, + Device: device, + FsType: fsType, + Metadata: metadata, + Options: strings.Fields(opt.OtherOpts), + } + + logger.Info("NodePublishVolume:: Starting add mount info for DirectVolume") + err := directvolume.AddMountInfo(volumePath, mountInfo) + if err != nil { + logger.Errorf("NodePublishVolume:: Add mount info for DirectVolume failed: %v", err) + return nil, err + } + logger.Info("NodePublishVolume:: Add mount info for DirectVolume is successfully") + + return &csi.NodePublishVolumeResponse{}, nil +} + +func (ns *nodeServer) unPublishDirectVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + logger := log.WithFields(map[string]interface{}{ + "NodeServer": "OSSNodeServer", + "VolumeId": req.VolumeId, + "TargetPath": req.TargetPath, + }) + + volumePath := req.TargetPath + err := directvolume.Remove(volumePath) + if err != nil { + logger.Errorf("NodeUnPublishVolume:: Remove mount info for DirectVolume failed: %v", err) + return nil, err + } + + logger.Info("NodeUnPublishVolume:: Remove mount info for DirectVolume is successfully") + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +func isDirectVolumePath(volumePath string) bool { + _, err := directvolume.VolumeMountInfo(volumePath) + if err != nil { + return false + } + return true +} diff --git a/pkg/oss/coco_test.go b/pkg/oss/coco_test.go new file mode 100644 index 000000000..2a7a37cf1 --- /dev/null +++ b/pkg/oss/coco_test.go @@ -0,0 +1,122 @@ +package oss + +import ( + "context" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/kata/directvolume" + "github.com/stretchr/testify/assert" +) + +func Test_nodeServer_publishDirectVolume(t *testing.T) { + ns := &nodeServer{} + req := &csi.NodePublishVolumeRequest{ + VolumeId: "test-id", + PublishContext: nil, + StagingTargetPath: "", + TargetPath: "/foo/bar/path/for/publish", + VolumeCapability: nil, + Readonly: false, + Secrets: nil, + VolumeContext: map[string]string{ + "encrypted": "local_encrypt", + "kmsKeyId": "foo-key-id", + }, + } + opts := &Options{ + URL: "https://oss-cn-hangzhou.aliyuncs.com", + Bucket: "test-bucket", + Path: "bucket/path", + OtherOpts: "-o test -w abc", + } + resp, err := ns.publishDirectVolume(context.TODO(), req, opts) + assert.NoError(t, err) + assert.NotNil(t, resp) + + defer directvolume.Remove(req.TargetPath) + ret := isDirectVolumePath(req.TargetPath) + assert.True(t, ret) + + info, err := directvolume.VolumeMountInfo(req.TargetPath) + assert.NoError(t, err) + assert.NotNil(t, info) + assert.Equal(t, "alibaba-cloud-oss", info.VolumeType) + assert.Equal(t, "secure_mount", info.FsType) + assert.Equal(t, []string{"-o", "test", "-w", "abc"}, info.Options) + assert.Equal(t, opts.URL, info.Metadata["url"]) + assert.Equal(t, opts.Bucket, info.Metadata["bucket"]) + assert.Equal(t, opts.Path, info.Metadata["path"]) + assert.Equal(t, req.TargetPath, info.Metadata["targetPath"]) + assert.Equal(t, req.VolumeContext["kmsKeyId"], info.Metadata["kmsKeyId"]) + assert.Equal(t, req.VolumeContext["encrypted"], info.Metadata["encrypted"]) + assert.Equal(t, opts.OtherOpts, info.Metadata["otherOpts"]) + + // publish twice + resp, err = ns.publishDirectVolume(context.TODO(), req, opts) + assert.NoError(t, err) + assert.NotNil(t, resp) +} + +func Test_nodeServer_publishDirectVolume_overwrite_annotations(t *testing.T) { + ns := &nodeServer{} + req := &csi.NodePublishVolumeRequest{ + VolumeId: "test-id", + PublishContext: nil, + StagingTargetPath: "", + TargetPath: "/foo/bar/path/for/publish", + VolumeCapability: nil, + Readonly: false, + Secrets: nil, + VolumeContext: map[string]string{ + "annotations": `{"kata_fs_type": "type_v2", "kata_volume_type": "volume_type_v", "kata_device": "kata_device_vv" }`, + }, + } + opts := &Options{} + resp, err := ns.publishDirectVolume(context.TODO(), req, opts) + assert.NoError(t, err) + assert.NotNil(t, resp) + + defer directvolume.Remove(req.TargetPath) + ret := isDirectVolumePath(req.TargetPath) + assert.True(t, ret) + + info, err := directvolume.VolumeMountInfo(req.TargetPath) + assert.NoError(t, err) + assert.NotNil(t, info) + assert.Equal(t, "volume_type_v", info.VolumeType) + assert.Equal(t, "type_v2", info.FsType) + assert.Equal(t, "kata_device_vv", info.Device) + + // publish twice + resp, err = ns.publishDirectVolume(context.TODO(), req, opts) + assert.NoError(t, err) + assert.NotNil(t, resp) +} + +func Test_nodeServer_unPublishDirectVolume(t *testing.T) { + ns := &nodeServer{} + req := &csi.NodeUnpublishVolumeRequest{ + VolumeId: "test-id", + TargetPath: "/foo/bar/path/for/unpublish", + } + + err := directvolume.AddMountInfo(req.TargetPath, directvolume.MountInfo{}) + assert.NoError(t, err) + info, err := directvolume.VolumeMountInfo(req.TargetPath) + assert.NoError(t, err) + assert.NotNil(t, info) + + resp, err := ns.unPublishDirectVolume(context.TODO(), req) + assert.NoError(t, err) + assert.NotNil(t, resp) + + info, err = directvolume.VolumeMountInfo(req.TargetPath) + assert.Error(t, err) + assert.Nil(t, info) + + // unpublish twice + resp, err = ns.unPublishDirectVolume(context.TODO(), req) + assert.NoError(t, err) + assert.NotNil(t, resp) +} diff --git a/pkg/oss/nodeserver.go b/pkg/oss/nodeserver.go index f5088d038..bfcf793ac 100644 --- a/pkg/oss/nodeserver.go +++ b/pkg/oss/nodeserver.go @@ -46,6 +46,8 @@ type nodeServer struct { // Options contains options for target oss type Options struct { + directAssigned bool + Bucket string `json:"bucket"` URL string `json:"url"` OtherOpts string `json:"otherOpts"` @@ -130,6 +132,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis opt.MetricsTop = strings.ToLower(strings.TrimSpace(value)) } else if key == "containernetworkfilesystem" { cnfsName = value + } else if key == optDirectAssigned { + opt.directAssigned, _ = strconv.ParseBool(strings.TrimSpace(value)) } } @@ -180,6 +184,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis argStr := fmt.Sprintf("Bucket: %s, url: %s, , OtherOpts: %s, Path: %s, UseSharedPath: %s, authType: %s", opt.Bucket, opt.URL, opt.OtherOpts, opt.Path, strconv.FormatBool(opt.UseSharedPath), opt.AuthType) log.Infof("NodePublishVolume:: Starting Oss Mount: %s", argStr) + if opt.directAssigned { + return ns.publishDirectVolume(ctx, req, opt) + } if IsOssfsMounted(mountPath) { log.Infof("NodePublishVolume: The mountpoint is mounted: %s", mountPath) return &csi.NodePublishVolumeResponse{}, nil @@ -390,6 +397,9 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if err != nil { return nil, err } + if isDirectVolumePath(mountPoint) { + return ns.unPublishDirectVolume(ctx, req) + } // check mount point with IsLikelyNotMountPoint first notmounted, err := ns.k8smounter.IsLikelyNotMountPoint(mountPoint) diff --git a/pkg/utils/kata/directvolume/direct_volume.go b/pkg/utils/kata/directvolume/direct_volume.go new file mode 100644 index 000000000..2c25a77ee --- /dev/null +++ b/pkg/utils/kata/directvolume/direct_volume.go @@ -0,0 +1,83 @@ +// Copyright (c) 2022 Databricks Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package directvolume + +// base on https://github.com/kata-containers/kata-containers/blob/c800d0739fe42414a15ac8967f3fa5ccb98c28d7/src/runtime/pkg/direct-volume/utils.go + +import ( + b64 "encoding/base64" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" +) + +const ( + mountInfoFileName = "mountInfo.json" +) + +var kataDirectVolumeRootPath = "/run/kata-containers/shared/direct-volumes" + +// MountInfo contains the information needed by Kata to consume a host block device and mount it as a filesystem inside the guest VM. +type MountInfo struct { + // The type of the volume (ie. block) + VolumeType string `json:"volume-type"` + // The device backing the volume. + Device string `json:"device"` + // The filesystem type to be mounted on the volume. + FsType string `json:"fstype"` + // Additional metadata to pass to the agent regarding this volume. + Metadata map[string]string `json:"metadata,omitempty"` + // Additional mount options. + Options []string `json:"options,omitempty"` +} + +// Add writes the mount info of a direct volume into a filesystem path known to Kata Container. +func Add(volumePath string, mountInfo string) error { + volumeDir := filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath))) + stat, err := os.Stat(volumeDir) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + if err := os.MkdirAll(volumeDir, 0700); err != nil { + return err + } + } + if stat != nil && !stat.IsDir() { + return fmt.Errorf("%s should be a directory", volumeDir) + } + + var deserialized MountInfo + if err := json.Unmarshal([]byte(mountInfo), &deserialized); err != nil { + return err + } + + return os.WriteFile(filepath.Join(volumeDir, mountInfoFileName), []byte(mountInfo), 0600) +} + +// Remove deletes the direct volume path including all the files inside it. +func Remove(volumePath string) error { + return os.RemoveAll(filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath)))) +} + +// VolumeMountInfo retrieves the mount info of a direct volume. +func VolumeMountInfo(volumePath string) (*MountInfo, error) { + mountInfoFilePath := filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath)), mountInfoFileName) + if _, err := os.Stat(mountInfoFilePath); err != nil { + return nil, err + } + buf, err := os.ReadFile(mountInfoFilePath) + if err != nil { + return nil, err + } + var mountInfo MountInfo + if err := json.Unmarshal(buf, &mountInfo); err != nil { + return nil, err + } + return &mountInfo, nil +} diff --git a/pkg/utils/kata/directvolume/utils.go b/pkg/utils/kata/directvolume/utils.go new file mode 100644 index 000000000..0cbf530d4 --- /dev/null +++ b/pkg/utils/kata/directvolume/utils.go @@ -0,0 +1,9 @@ +package directvolume + +import "encoding/json" + +func AddMountInfo(volumePath string, mountInfo MountInfo) error { + data, _ := json.Marshal(mountInfo) + + return Add(volumePath, string(data)) +}