Skip to content

Commit

Permalink
feat(backend): configurable log level for driver / launcher images (#…
Browse files Browse the repository at this point in the history
…11278)

* Do not invoke get image methods twice.

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Add configurable driver / launcher log level

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Add configurable driver / launcher log level

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Update argocompiler golden files

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Handle errors from flag setting and tests

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Update gold files & image masking to use ghcr

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Update tests with new images

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Use unified var for driver / launcher log level + patch user code
launcher

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

* Add PIPELINE_LOG_LEVEL to deployment for discoverability

Signed-off-by: carter.fendley <carter.fendley@gmail.com>

---------

Signed-off-by: carter.fendley <carter.fendley@gmail.com>
  • Loading branch information
CarterFendley authored Feb 21, 2025
1 parent 1c4f676 commit d2c0376
Show file tree
Hide file tree
Showing 17 changed files with 493 additions and 104 deletions.
29 changes: 19 additions & 10 deletions backend/src/v2/cmd/driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,21 @@ var (
// the value stored in the paths will be either 'true' or 'false'
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
conditionPath = flag.String("condition_path", "", "Condition output path")
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
)

// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {

func main() {
flag.Parse()
err := drive()

glog.Infof("Setting log level to: '%s'", *logLevel)
err := flag.Set("v", *logLevel)
if err != nil {
glog.Warningf("Failed to set log level: %s", err.Error())
}

err = drive()
if err != nil {
glog.Exitf("%v", err)
}
Expand Down Expand Up @@ -153,15 +161,16 @@ func drive() (err error) {
return err
}
options := driver.Options{
PipelineName: *pipelineName,
RunID: *runID,
RunName: *runName,
RunDisplayName: *runDisplayName,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
PipelineName: *pipelineName,
RunID: *runID,
RunName: *runName,
RunDisplayName: *runDisplayName,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
PipelineLogLevel: *logLevel,
}
var execution *driver.Execution
var driverErr error
Expand Down
7 changes: 7 additions & 0 deletions backend/src/v2/cmd/launcher-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.")
mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.")
mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.")
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
)

func main() {
Expand All @@ -54,6 +55,12 @@ func run() error {
flag.Parse()
ctx := context.Background()

glog.Infof("Setting log level to: '%s'", *logLevel)
err := flag.Set("v", *logLevel)
if err != nil {
glog.Warningf("Failed to set log level: %s", err.Error())
}

if *copy != "" {
// copy is used to copy this binary to a shared volume
// this is a special command, ignore all other flags by returning
Expand Down
12 changes: 9 additions & 3 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func Test_argo_compiler(t *testing.T) {
argoYAMLPath: "testdata/hello_world_run_as_user.yaml",
envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"},
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "",
argoYAMLPath: "testdata/hello_world_log_level.yaml",
envVars: map[string]string{"PIPELINE_LOG_LEVEL": "3"},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
Expand Down Expand Up @@ -122,14 +128,14 @@ func Test_argo_compiler(t *testing.T) {
// mask the driver launcher image hash to maintain test stability
for _, template := range wf.Spec.Templates {
if template.Container != nil && strings.Contains(template.Container.Image, "kfp-driver") {
template.Container.Image = "gcr.io/ml-pipeline/kfp-driver"
template.Container.Image = "ghcr.io/kubeflow/kfp-driver"
}
if template.Container != nil && strings.Contains(template.Container.Image, "kfp-launcher") {
template.Container.Image = "gcr.io/ml-pipeline/kfp-launcher"
template.Container.Image = "ghcr.io/kubeflow/kfp-launcher"
}
for i := range template.InitContainers {
if strings.Contains(template.InitContainers[i].Image, "kfp-launcher") {
template.InitContainers[i].Image = "gcr.io/ml-pipeline/kfp-launcher"
template.InitContainers[i].Image = "ghcr.io/kubeflow/kfp-launcher"
}
}
}
Expand Down
55 changes: 35 additions & 20 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
DefaultDriverCommand = "driver"
DriverCommandEnvVar = "V2_DRIVER_COMMAND"
PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER"
PipelineLogLevelEnvVar = "PIPELINE_LOG_LEVEL"
gcsScratchLocation = "/gcs"
gcsScratchName = "gcs-scratch"
s3ScratchLocation = "/s3"
Expand Down Expand Up @@ -162,6 +163,27 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
if ok {
return name
}

args := []string{
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--container", inputValue(paramContainer),
"--iteration_index", inputValue(paramIterationIndex),
"--cached_decision_path", outputPath(paramCachedDecision),
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
"--condition_path", outputPath(paramCondition),
"--kubernetes_config", inputValue(paramKubernetesConfig),
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}

t := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -182,24 +204,9 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
},
},
Container: &k8score.Container{
Image: GetDriverImage(),
Command: GetDriverCommand(),
Args: []string{
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--container", inputValue(paramContainer),
"--iteration_index", inputValue(paramIterationIndex),
"--cached_decision_path", outputPath(paramCachedDecision),
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
"--condition_path", outputPath(paramCondition),
"--kubernetes_config", inputValue(paramKubernetesConfig),
},
Image: c.driverImage,
Command: c.driverCommand,
Args: args,
Resources: driverResources,
},
}
Expand Down Expand Up @@ -287,6 +294,13 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
c.templates[nameContainerExecutor] = container

args := []string{
"--copy", component.KFPLauncherPath,
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}
executor := &wfapi.Template{
Name: nameContainerImpl,
Inputs: wfapi.Inputs{
Expand Down Expand Up @@ -345,8 +359,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
InitContainers: []wfapi.UserContainer{{
Container: k8score.Container{
Name: "kfp-launcher",
Image: GetLauncherImage(),
Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath},
Image: c.launcherImage,
Command: []string{"launcher-v2"},
Args: args,
VolumeMounts: []k8score.VolumeMount{
{
Name: volumeNameKFPLauncher,
Expand Down
41 changes: 24 additions & 17 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package argocompiler

import (
"fmt"
"os"
"sort"
"strings"

Expand Down Expand Up @@ -535,6 +536,26 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
if ok {
return name
}

args := []string{
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--runtime_config", inputValue(paramRuntimeConfig),
"--iteration_index", inputValue(paramIterationIndex),
"--execution_id_path", outputPath(paramExecutionID),
"--iteration_count_path", outputPath(paramIterationCount),
"--condition_path", outputPath(paramCondition),
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}

t := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -555,23 +576,9 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
},
},
Container: &k8score.Container{
Image: c.driverImage,
Command: c.driverCommand,
Args: []string{
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--runtime_config", inputValue(paramRuntimeConfig),
"--iteration_index", inputValue(paramIterationIndex),
"--execution_id_path", outputPath(paramExecutionID),
"--iteration_count_path", outputPath(paramIterationCount),
"--condition_path", outputPath(paramCondition),
},
Image: c.driverImage,
Command: c.driverCommand,
Args: args,
Resources: driverResources,
},
}
Expand Down
8 changes: 6 additions & 2 deletions backend/src/v2/compiler/argocompiler/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package argocompiler

import (
"fmt"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
if _, alreadyExists := c.templates[name]; alreadyExists {
return name
}
launcherArgs := []string{
args := []string{
"--executor_type", "importer",
"--task_spec", inputValue(paramTask),
"--component_spec", inputValue(paramComponent),
Expand All @@ -81,6 +82,9 @@ func (c *workflowCompiler) addImporterTemplate() string {
"--mlmd_server_port",
fmt.Sprintf("$(%s)", component.EnvMetadataPort),
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}
importerTemplate := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -94,7 +98,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
Container: &k8score.Container{
Image: c.launcherImage,
Command: []string{"launcher-v2"},
Args: launcherArgs,
Args: args,
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
Env: commonEnvs,
Resources: driverResources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down Expand Up @@ -159,11 +159,12 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
command:
- launcher-v2
image: ghcr.io/kubeflow/kfp-launcher
name: kfp-launcher
resources:
limits:
Expand Down Expand Up @@ -309,7 +310,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down Expand Up @@ -147,11 +147,12 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
command:
- launcher-v2
image: ghcr.io/kubeflow/kfp-launcher
name: kfp-launcher
resources:
limits:
Expand Down Expand Up @@ -252,7 +253,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down
11 changes: 6 additions & 5 deletions backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down Expand Up @@ -164,11 +164,12 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
command:
- launcher-v2
image: ghcr.io/kubeflow/kfp-launcher
name: kfp-launcher
resources:
limits:
Expand Down Expand Up @@ -315,7 +316,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down
Loading

0 comments on commit d2c0376

Please sign in to comment.