Skip to content

Commit

Permalink
Add support for importing models stored in the Modelcar format
Browse files Browse the repository at this point in the history
This allows dsl.import to leverage Modelcar container images in an OCI
repository. This works by having any component container have an init
container with the image set to the Modelcar container image and the
/models directory contents copied to an emptyDir volume that is
accessible in the component container.

This approach has the benefit of leveraging image pull secrets
configured on the Kubernetes cluster rather than require separate
credentials for importing the artifact.

Note that once Kubernetes supports OCI images as volume mounts for
several releases, consider replacing the init container with that
approach.

Resolves:
#11584

Signed-off-by: mprahl <mprahl@users.noreply.github.com>
  • Loading branch information
mprahl committed Feb 6, 2025
1 parent 6cb7cf7 commit c6f3aa0
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/kfp-samples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ jobs:
with:
k8s_version: ${{ matrix.k8s_version }}

- name: Build and upload the sample Modelcar image to Kind
run: |
docker build -f samples/v2/modelcar_import/Dockerfile -t registry.domain.local/modelcar:latest .
kind --name kfp load docker-image registry.domain.local/modelcar:latest
- name: Forward API port
run: ./.github/resources/scripts/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888

Expand Down
24 changes: 20 additions & 4 deletions backend/src/v2/component/importer_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/kubeflow/pipelines/backend/src/v2/objectstore"

pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
Expand Down Expand Up @@ -227,10 +229,6 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact

state := pb.Artifact_LIVE

provider, err := objectstore.ParseProviderFromPath(artifactUri)
if err != nil {
return nil, fmt.Errorf("No Provider scheme found in artifact Uri: %s", artifactUri)
}
artifact = &pb.Artifact{
TypeId: &artifactTypeId,
State: &state,
Expand All @@ -248,6 +246,24 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
}
}

if strings.HasPrefix(artifactUri, "oci://") {
artifactType, err := metadata.SchemaToArtifactType(schema)
if err != nil {
return nil, fmt.Errorf("converting schema to artifact type failed: %w", err)
}

if *artifactType.Name != "system.Model" {
return nil, fmt.Errorf("the %s artifact type does not support OCI registries", *artifactType.Name)
}

return artifact, nil
}

provider, err := objectstore.ParseProviderFromPath(artifactUri)
if err != nil {
return nil, fmt.Errorf("no provider scheme found in artifact URI: %s", artifactUri)
}

// Assume all imported artifacts will rely on execution environment for store provider session info
storeSessionInfo := objectstore.SessionInfo{
Provider: provider,
Expand Down
27 changes: 21 additions & 6 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
}

// Upload artifacts from local path to remote storages.
localDir, err := localPathForURI(outputArtifact.Uri)
localDir, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
} else {
Expand Down Expand Up @@ -497,7 +497,13 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
continue
}
inputArtifact := artifactList.Artifacts[0]
localPath, err := localPathForURI(inputArtifact.Uri)

// OCI artifacts are handled specially
if strings.HasPrefix(inputArtifact.Uri, "oci://") {
continue
}

localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
continue
Expand Down Expand Up @@ -548,6 +554,12 @@ func fetchNonDefaultBuckets(
}
// TODO: Support multiple artifacts someday, probably through the v2 engine.
artifact := artifactList.Artifacts[0]

// OCI artifacts are handled specially
if strings.HasPrefix(artifact.Uri, "oci://") {
continue
}

// The artifact does not belong under the object store path for this run. Cases:
// 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath
// 2. Artifact is imported from the same bucket, but from a different path (re-use the same session)
Expand Down Expand Up @@ -598,7 +610,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
placeholders[key] = inputArtifact.Uri

localPath, err := localPathForURI(inputArtifact.Uri)
localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
// Input Artifact does not have a recognized storage URI
continue
Expand All @@ -617,7 +629,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
outputArtifact := artifactList.Artifacts[0]
placeholders[fmt.Sprintf(`{{$.outputs.artifacts['%s'].uri}}`, name)] = outputArtifact.Uri

localPath, err := localPathForURI(outputArtifact.Uri)
localPath, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
return nil, fmt.Errorf("resolve output artifact %q's local path: %w", name, err)
}
Expand Down Expand Up @@ -720,7 +732,7 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
return executorOutput, nil
}

func localPathForURI(uri string) (string, error) {
func LocalPathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "gs://") {
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
}
Expand All @@ -730,6 +742,9 @@ func localPathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "s3://") {
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
}
if strings.HasPrefix(uri, "oci://") {
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "\\/"), nil
}
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
}

Expand All @@ -747,7 +762,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
}
outputArtifact := artifactList.Artifacts[0]

localPath, err := localPathForURI(outputArtifact.Uri)
localPath, err := LocalPathForURI(outputArtifact.Uri)
if err != nil {
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
}
Expand Down
89 changes: 83 additions & 6 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -530,23 +531,99 @@ func initPodSpecPatch(
}
}

initContainers, volumes, volumeMounts := getOCIModelPodSpec(executorInput.GetInputs().GetArtifacts(), userEnvVar)

containerImage, err := resolvePodSpecInputRuntimeParameter(container.Image, executorInput)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
podSpec := &k8score.PodSpec{
InitContainers: initContainers,
Containers: []k8score.Container{{
Name: "main", // argo task user container is always called "main"
Command: launcherCmd,
Args: userCmdArgs,
Image: containerImage,
Resources: res,
Env: userEnvVar,
Name: "main", // argo task user container is always called "main"
Command: launcherCmd,
Args: userCmdArgs,
Image: containerImage,
Resources: res,
Env: userEnvVar,
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
}
return podSpec, nil
}

// getOCIModelPodSpec will return init containers, pod volumes, and volume mounts for the launcher pod for any input
// artifacts that are Modelcar images in an OCI registry. If no such input artifacts exist, they returned slices are
// nil.
func getOCIModelPodSpec(
artifacts map[string]*pipelinespec.ArtifactList,
userEnvVar []k8score.EnvVar,
) (
[]k8score.Container,
[]k8score.Volume,
[]k8score.VolumeMount,
) {
var initContainers []k8score.Container
var volumes []k8score.Volume
var volumeMounts []k8score.VolumeMount

for name, artifactList := range artifacts {
if len(artifactList.Artifacts) == 0 {
continue
}

// Following the convention of downloadArtifacts in the launcher to only look at the first in the list.
inputArtifact := artifactList.Artifacts[0]

// This should ideally verify that this is also a model input artifact, but this metadata doesn't seem to
// be set on inputArtifact.
if !strings.HasPrefix(inputArtifact.Uri, "oci://") {
continue
}

localPath, err := component.LocalPathForURI(inputArtifact.Uri)
if err != nil {
continue
}

volumeName := "oci-" + name

volumes = []k8score.Volume{
{
Name: volumeName,
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
}

volumeMounts = []k8score.VolumeMount{
{
Name: volumeName,
MountPath: localPath,
SubPath: strings.TrimPrefix(localPath, "/oci/"),
},
}

initContainers = append(initContainers, k8score.Container{
Name: "oci-" + name,
Command: []string{
// This assumes the Modelcar format of models being stored in /models. These models
// are copied to an emptyDir volume.
"sh", "-c", fmt.Sprintf("mkdir -p '%s' && cp -R /models/* '%s'", localPath, localPath),
},
Image: strings.TrimPrefix(inputArtifact.Uri, "oci://"),
// These images can be huge, so take advantage of caching by Kubernetes by not always pulling.
ImagePullPolicy: "IfNotPresent",
Env: userEnvVar,
VolumeMounts: volumeMounts,
})
}

return initContainers, volumes, volumeMounts
}

// Extends the PodSpec to include Kubernetes-specific executor config.
func extendPodSpecPatch(
podSpec *k8score.PodSpec,
Expand Down
41 changes: 41 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,47 @@ func Test_initPodSpecPatch_legacy_resources(t *testing.T) {
assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")])
}

func Test_initPodSpecPatch_modelcar_input_artifact(t *testing.T) {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{
Image: "python:3.9",
Args: []string{"--function_to_execute", "add"},
Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"},
}
componentSpec := &pipelinespec.ComponentSpec{}
executorInput := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
Artifacts: map[string]*pipelinespec.ArtifactList{
"my-model": {
Artifacts: []*pipelinespec.RuntimeArtifact{
{
Uri: "oci://registry.domain.local/my-model:latest",
},
},
},
},
},
}

podSpec, err := initPodSpecPatch(
containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300",
)
assert.Nil(t, err)
assert.Len(t, podSpec.Containers, 1)
assert.Len(t, podSpec.InitContainers, 1)
assert.Equal(t, podSpec.InitContainers[0].Name, "oci-my-model")
assert.Len(t, podSpec.InitContainers[0].Command, 3)
expectedCopyCmd := "mkdir -p '/oci/registry.domain.local\\/my-model:latest' && " +
"cp -R /models/* '/oci/registry.domain.local\\/my-model:latest'"
assert.Equal(t, podSpec.InitContainers[0].Command[2], expectedCopyCmd)
assert.Len(t, podSpec.Volumes, 1)
assert.Equal(t, podSpec.Volumes[0].Name, "oci-my-model")
assert.NotNil(t, podSpec.Volumes[0].EmptyDir)
assert.Len(t, podSpec.Containers[0].VolumeMounts, 1)
assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].Name, "oci-my-model")
assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].MountPath, "/oci/registry.domain.local\\/my-model:latest")
assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].SubPath, "registry.domain.local\\/my-model:latest")
}

func Test_makeVolumeMountPatch(t *testing.T) {
type args struct {
pvcMount []*kubernetesplatform.PvcMount
Expand Down
4 changes: 4 additions & 0 deletions backend/src/v2/test/sample-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ python3 -m pip install -r ./requirements-sample-test.txt

popd

if [[ -n "${PULL_NUMBER}" ]]; then
KFP_PACKAGE_PATH='git+https://github.com/kubeflow/pipelines@refs/pull/${PULL_NUMBER}/merge#egg=kfp&subdirectory=sdk/python'
fi

# The -u flag makes python output unbuffered, so that we can see real time log.
# Reference: https://stackoverflow.com/a/107717
python3 -u ./samples/v2/sample_test.py
18 changes: 18 additions & 0 deletions samples/v2/modelcar_import/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.13-slim as base

USER 0

RUN pip install huggingface-hub

# Download a small model file from Hugging Face
RUN python -c "from huggingface_hub import snapshot_download; snapshot_download(repo_id='openai/whisper-tiny', local_dir='/models',allow_patterns=['*.safetensors', '*.json', '*.txt'], revision='169d4a4341b33bc18d8881c4b69c2e104e1cc0af')"

# Final image containing only the essential model files
FROM alpine:3.19

RUN mkdir /models

# Copy the model files from the base container
COPY --from=base /models /models

USER 1001
Loading

0 comments on commit c6f3aa0

Please sign in to comment.