Skip to content

Commit

Permalink
Add support for importing models stored in the Modelcar (OCI) format
Browse files Browse the repository at this point in the history
This allows dsl.import to leverage Modelcar container images in an OCI
repository. This works by having an init container prepull the image and
then adding a sidecar container when the launcher container is running.
The Modelcar container adds a symlink to its /models directory in an
emptyDir volume that is accessible by the launcher container. Once the
launcher is done running the user code, it stops the Modelcar
containers.

This approach has the benefit of leveraging image pull secrets
configured on the Kubernetes cluster rather than require separate
credentials for importing the artifact. Additionally, no data is copied
to the emptyDir volume, so the storage cost is just pulling the Modelcar
container image on the Kubernetes worker node.

Note that once Kubernetes supports OCI images as volume mounts for
several releases, consider replacing the init container with that
approach.

This also adds a new environment variable of PIPELINE_RUN_AS_USER to
set the runAsUser on all pods created by Argo Workflows.

Resolves:
#11584

Signed-off-by: mprahl <mprahl@users.noreply.github.com>
  • Loading branch information
mprahl committed Feb 12, 2025
1 parent a1f3262 commit 8a0b0ec
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 10 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/kfp-samples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ jobs:
with:
k8s_version: ${{ matrix.k8s_version }}

- name: Build and upload the sample Modelcar image to Kind
run: |
docker build -f samples/v2/modelcar_import/Dockerfile -t registry.domain.local/modelcar:test .
kind --name kfp load docker-image registry.domain.local/modelcar:test
- name: Forward API port
run: ./.github/resources/scripts/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888

- name: Run Samples Tests
env:
PULL_NUMBER: ${{ github.event.pull_request.number }}
run: |
./backend/src/v2/test/sample-test.sh
8 changes: 8 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
Entrypoint: tmplEntrypoint,
},
}

runAsUser := GetPipelineRunAsUser()
if runAsUser != nil {
wf.Spec.SecurityContext = &k8score.PodSecurityContext{
RunAsUser: GetPipelineRunAsUser(),
}
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
25 changes: 25 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func Test_argo_compiler(t *testing.T) {
jobPath string // path of input PipelineJob to compile
platformSpecPath string // path of possible input PlatformSpec to compile
argoYAMLPath string // path of expected output argo workflow YAML
envVars map[string]string
}{
{
jobPath: "../testdata/hello_world.json",
Expand Down Expand Up @@ -67,9 +68,33 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "",
argoYAMLPath: "testdata/exit_handler.yaml",
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "",
argoYAMLPath: "testdata/hello_world_run_as_user.yaml",
envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
prevEnvVars := map[string]string{}

for envVarName, envVarValue := range tt.envVars {
prevEnvVars[envVarName] = os.Getenv(envVarName)

os.Setenv(envVarName, envVarValue)
}

defer func() {
for envVarName, envVarValue := range prevEnvVars {
if envVarValue == "" {
os.Unsetenv(envVarName)
} else {
os.Setenv(envVarName, envVarValue)
}
}
}()

job, platformSpec := load(t, tt.jobPath, tt.platformSpecPath)
if *update {
wf, err := argocompiler.Compile(job, platformSpec, nil)
Expand Down
22 changes: 22 additions & 0 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package argocompiler
import (
"fmt"
"os"
"strconv"
"strings"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/glog"
"github.com/golang/protobuf/jsonpb"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/component"
Expand All @@ -36,6 +38,7 @@ const (
DriverImageEnvVar = "V2_DRIVER_IMAGE"
DefaultDriverCommand = "driver"
DriverCommandEnvVar = "V2_DRIVER_COMMAND"
PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER"
gcsScratchLocation = "/gcs"
gcsScratchName = "gcs-scratch"
s3ScratchLocation = "/s3"
Expand Down Expand Up @@ -101,6 +104,25 @@ func GetDriverCommand() []string {
return strings.Split(driverCommand, " ")
}

func GetPipelineRunAsUser() *int64 {
runAsUserStr := os.Getenv(PipelineRunAsUserEnvVar)
if runAsUserStr == "" {
return nil
}

runAsUser, err := strconv.ParseInt(runAsUserStr, 10, 64)
if err != nil {
glog.Error(
"Failed to parse the %s environment variable with value %s as an int64: %v",
PipelineRunAsUserEnvVar, runAsUserStr, err,
)

return nil
}

return &runAsUser
}

func (c *workflowCompiler) containerDriverTask(name string, inputs containerDriverInputs) (*wfapi.DAGTask, *containerDriverOutputs) {
dagTask := &wfapi.DAGTask{
Name: name,
Expand Down
Loading

0 comments on commit 8a0b0ec

Please sign in to comment.