Skip to content

Commit

Permalink
Merge pull request #882 from mozillazg/feautre/oss-direct-volume
Browse files Browse the repository at this point in the history
oss: support CoCo direct volume feature
  • Loading branch information
k8s-ci-robot authored Oct 13, 2023
2 parents 096d984 + 801d1a6 commit 79369da
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 0 deletions.
137 changes: 137 additions & 0 deletions pkg/oss/coco.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package oss

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/kata/directvolume"
log "github.com/sirupsen/logrus"
)

const (
optDirectAssigned = "direct"
optAnnotations = "annotations"
optEncrypted = "encrypted"
optEncPasswd = "encPasswd"
optKmsKeyId = "kmsKeyId"
fsTypeSecureMount = "secure_mount"
volumeTypeOSS = "alibaba-cloud-oss"
sealedSecretPrefix = "sealed."
)

func (ns *nodeServer) publishDirectVolume(ctx context.Context, req *csi.NodePublishVolumeRequest, opt *Options) (*csi.NodePublishVolumeResponse, error) {
logger := log.WithFields(map[string]interface{}{
"NodeServer": "OSSNodeServer",
"VolumeId": req.VolumeId,
"TargetPath": req.TargetPath,
})

volumePath := req.TargetPath
if isDirectVolumePath(volumePath) {
logger.Infof("NodePublishVolume: The mount info for DirectVolume is already exist: %s", volumePath)
return &csi.NodePublishVolumeResponse{}, nil
}

device := req.TargetPath
volumeType := volumeTypeOSS
fsType := fsTypeSecureMount
var encrypted, encPasswd, kmsKeyId string
annotationsObj := map[string]string{}

if v := req.Secrets[optEncPasswd]; v != "" {
encPasswd = v
}
for key, value := range req.VolumeContext {
switch key {
case optEncrypted:
encrypted = strings.TrimSpace(value)
case optEncPasswd:
encPasswd = strings.TrimSpace(value)
case optKmsKeyId:
kmsKeyId = strings.TrimSpace(value)
case optAnnotations:
json.Unmarshal([]byte(value), &annotationsObj)
}
}

if annotationsObj["kata_fs_type"] != "" {
fsType = annotationsObj["kata_fs_type"]
}
if annotationsObj["kata_volume_type"] != "" {
volumeType = annotationsObj["kata_volume_type"]
}
if annotationsObj["kata_device"] != "" {
device = annotationsObj["kata_device"]
}
annotations, _ := json.Marshal(annotationsObj)
annotationsStr := string(annotations)
if len(annotations) == 0 || annotationsStr == "null" || annotationsStr == "" {
annotationsStr = "{}"
}
metadata := map[string]string{
"bucket": opt.Bucket,
"url": opt.URL,
"otherOpts": opt.OtherOpts,
"path": opt.Path,
"encrypted": encrypted,
"kmsKeyId": kmsKeyId,
"annotations": annotationsStr,
"volumeId": req.GetVolumeId(),
"readonly": fmt.Sprintf("%v", req.Readonly),
"targetPath": req.GetTargetPath(),
}
if opt.AkSecret != "" && strings.HasPrefix(opt.AkSecret, sealedSecretPrefix) {
metadata[AkID] = opt.AkID
metadata[AkSecret] = opt.AkSecret
}
if encPasswd != "" && strings.HasPrefix(encPasswd, sealedSecretPrefix) {
metadata[optEncPasswd] = encPasswd
}

mountInfo := directvolume.MountInfo{
VolumeType: volumeType,
Device: device,
FsType: fsType,
Metadata: metadata,
Options: strings.Fields(opt.OtherOpts),
}

logger.Info("NodePublishVolume:: Starting add mount info for DirectVolume")
err := directvolume.AddMountInfo(volumePath, mountInfo)
if err != nil {
logger.Errorf("NodePublishVolume:: Add mount info for DirectVolume failed: %v", err)
return nil, err
}
logger.Info("NodePublishVolume:: Add mount info for DirectVolume is successfully")

return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *nodeServer) unPublishDirectVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
logger := log.WithFields(map[string]interface{}{
"NodeServer": "OSSNodeServer",
"VolumeId": req.VolumeId,
"TargetPath": req.TargetPath,
})

volumePath := req.TargetPath
err := directvolume.Remove(volumePath)
if err != nil {
logger.Errorf("NodeUnPublishVolume:: Remove mount info for DirectVolume failed: %v", err)
return nil, err
}

logger.Info("NodeUnPublishVolume:: Remove mount info for DirectVolume is successfully")
return &csi.NodeUnpublishVolumeResponse{}, nil
}

func isDirectVolumePath(volumePath string) bool {
_, err := directvolume.VolumeMountInfo(volumePath)
if err != nil {
return false
}
return true
}
122 changes: 122 additions & 0 deletions pkg/oss/coco_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package oss

import (
"context"
"testing"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/kata/directvolume"
"github.com/stretchr/testify/assert"
)

func Test_nodeServer_publishDirectVolume(t *testing.T) {
ns := &nodeServer{}
req := &csi.NodePublishVolumeRequest{
VolumeId: "test-id",
PublishContext: nil,
StagingTargetPath: "",
TargetPath: "/foo/bar/path/for/publish",
VolumeCapability: nil,
Readonly: false,
Secrets: nil,
VolumeContext: map[string]string{
"encrypted": "local_encrypt",
"kmsKeyId": "foo-key-id",
},
}
opts := &Options{
URL: "https://oss-cn-hangzhou.aliyuncs.com",
Bucket: "test-bucket",
Path: "bucket/path",
OtherOpts: "-o test -w abc",
}
resp, err := ns.publishDirectVolume(context.TODO(), req, opts)
assert.NoError(t, err)
assert.NotNil(t, resp)

defer directvolume.Remove(req.TargetPath)
ret := isDirectVolumePath(req.TargetPath)
assert.True(t, ret)

info, err := directvolume.VolumeMountInfo(req.TargetPath)
assert.NoError(t, err)
assert.NotNil(t, info)
assert.Equal(t, "alibaba-cloud-oss", info.VolumeType)
assert.Equal(t, "secure_mount", info.FsType)
assert.Equal(t, []string{"-o", "test", "-w", "abc"}, info.Options)
assert.Equal(t, opts.URL, info.Metadata["url"])
assert.Equal(t, opts.Bucket, info.Metadata["bucket"])
assert.Equal(t, opts.Path, info.Metadata["path"])
assert.Equal(t, req.TargetPath, info.Metadata["targetPath"])
assert.Equal(t, req.VolumeContext["kmsKeyId"], info.Metadata["kmsKeyId"])
assert.Equal(t, req.VolumeContext["encrypted"], info.Metadata["encrypted"])
assert.Equal(t, opts.OtherOpts, info.Metadata["otherOpts"])

// publish twice
resp, err = ns.publishDirectVolume(context.TODO(), req, opts)
assert.NoError(t, err)
assert.NotNil(t, resp)
}

func Test_nodeServer_publishDirectVolume_overwrite_annotations(t *testing.T) {
ns := &nodeServer{}
req := &csi.NodePublishVolumeRequest{
VolumeId: "test-id",
PublishContext: nil,
StagingTargetPath: "",
TargetPath: "/foo/bar/path/for/publish",
VolumeCapability: nil,
Readonly: false,
Secrets: nil,
VolumeContext: map[string]string{
"annotations": `{"kata_fs_type": "type_v2", "kata_volume_type": "volume_type_v", "kata_device": "kata_device_vv" }`,
},
}
opts := &Options{}
resp, err := ns.publishDirectVolume(context.TODO(), req, opts)
assert.NoError(t, err)
assert.NotNil(t, resp)

defer directvolume.Remove(req.TargetPath)
ret := isDirectVolumePath(req.TargetPath)
assert.True(t, ret)

info, err := directvolume.VolumeMountInfo(req.TargetPath)
assert.NoError(t, err)
assert.NotNil(t, info)
assert.Equal(t, "volume_type_v", info.VolumeType)
assert.Equal(t, "type_v2", info.FsType)
assert.Equal(t, "kata_device_vv", info.Device)

// publish twice
resp, err = ns.publishDirectVolume(context.TODO(), req, opts)
assert.NoError(t, err)
assert.NotNil(t, resp)
}

func Test_nodeServer_unPublishDirectVolume(t *testing.T) {
ns := &nodeServer{}
req := &csi.NodeUnpublishVolumeRequest{
VolumeId: "test-id",
TargetPath: "/foo/bar/path/for/unpublish",
}

err := directvolume.AddMountInfo(req.TargetPath, directvolume.MountInfo{})
assert.NoError(t, err)
info, err := directvolume.VolumeMountInfo(req.TargetPath)
assert.NoError(t, err)
assert.NotNil(t, info)

resp, err := ns.unPublishDirectVolume(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, resp)

info, err = directvolume.VolumeMountInfo(req.TargetPath)
assert.Error(t, err)
assert.Nil(t, info)

// unpublish twice
resp, err = ns.unPublishDirectVolume(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, resp)
}
10 changes: 10 additions & 0 deletions pkg/oss/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type nodeServer struct {

// Options contains options for target oss
type Options struct {
directAssigned bool

Bucket string `json:"bucket"`
URL string `json:"url"`
OtherOpts string `json:"otherOpts"`
Expand Down Expand Up @@ -130,6 +132,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
opt.MetricsTop = strings.ToLower(strings.TrimSpace(value))
} else if key == "containernetworkfilesystem" {
cnfsName = value
} else if key == optDirectAssigned {
opt.directAssigned, _ = strconv.ParseBool(strings.TrimSpace(value))
}
}

Expand Down Expand Up @@ -180,6 +184,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
argStr := fmt.Sprintf("Bucket: %s, url: %s, , OtherOpts: %s, Path: %s, UseSharedPath: %s, authType: %s", opt.Bucket, opt.URL, opt.OtherOpts, opt.Path, strconv.FormatBool(opt.UseSharedPath), opt.AuthType)
log.Infof("NodePublishVolume:: Starting Oss Mount: %s", argStr)

if opt.directAssigned {
return ns.publishDirectVolume(ctx, req, opt)
}
if IsOssfsMounted(mountPath) {
log.Infof("NodePublishVolume: The mountpoint is mounted: %s", mountPath)
return &csi.NodePublishVolumeResponse{}, nil
Expand Down Expand Up @@ -390,6 +397,9 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if err != nil {
return nil, err
}
if isDirectVolumePath(mountPoint) {
return ns.unPublishDirectVolume(ctx, req)
}

// check mount point with IsLikelyNotMountPoint first
notmounted, err := ns.k8smounter.IsLikelyNotMountPoint(mountPoint)
Expand Down
83 changes: 83 additions & 0 deletions pkg/utils/kata/directvolume/direct_volume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2022 Databricks Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package directvolume

// base on https://github.com/kata-containers/kata-containers/blob/c800d0739fe42414a15ac8967f3fa5ccb98c28d7/src/runtime/pkg/direct-volume/utils.go

import (
b64 "encoding/base64"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
)

const (
mountInfoFileName = "mountInfo.json"
)

var kataDirectVolumeRootPath = "/run/kata-containers/shared/direct-volumes"

// MountInfo contains the information needed by Kata to consume a host block device and mount it as a filesystem inside the guest VM.
type MountInfo struct {
// The type of the volume (ie. block)
VolumeType string `json:"volume-type"`
// The device backing the volume.
Device string `json:"device"`
// The filesystem type to be mounted on the volume.
FsType string `json:"fstype"`
// Additional metadata to pass to the agent regarding this volume.
Metadata map[string]string `json:"metadata,omitempty"`
// Additional mount options.
Options []string `json:"options,omitempty"`
}

// Add writes the mount info of a direct volume into a filesystem path known to Kata Container.
func Add(volumePath string, mountInfo string) error {
volumeDir := filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath)))
stat, err := os.Stat(volumeDir)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
if err := os.MkdirAll(volumeDir, 0700); err != nil {
return err
}
}
if stat != nil && !stat.IsDir() {
return fmt.Errorf("%s should be a directory", volumeDir)
}

var deserialized MountInfo
if err := json.Unmarshal([]byte(mountInfo), &deserialized); err != nil {
return err
}

return os.WriteFile(filepath.Join(volumeDir, mountInfoFileName), []byte(mountInfo), 0600)
}

// Remove deletes the direct volume path including all the files inside it.
func Remove(volumePath string) error {
return os.RemoveAll(filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath))))
}

// VolumeMountInfo retrieves the mount info of a direct volume.
func VolumeMountInfo(volumePath string) (*MountInfo, error) {
mountInfoFilePath := filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath)), mountInfoFileName)
if _, err := os.Stat(mountInfoFilePath); err != nil {
return nil, err
}
buf, err := os.ReadFile(mountInfoFilePath)
if err != nil {
return nil, err
}
var mountInfo MountInfo
if err := json.Unmarshal(buf, &mountInfo); err != nil {
return nil, err
}
return &mountInfo, nil
}
9 changes: 9 additions & 0 deletions pkg/utils/kata/directvolume/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package directvolume

import "encoding/json"

func AddMountInfo(volumePath string, mountInfo MountInfo) error {
data, _ := json.Marshal(mountInfo)

return Add(volumePath, string(data))
}

0 comments on commit 79369da

Please sign in to comment.