Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): configurable log level for driver / launcher images #11278

Merged
merged 9 commits into from
Feb 21, 2025
29 changes: 19 additions & 10 deletions backend/src/v2/cmd/driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,21 @@ var (
// the value stored in the paths will be either 'true' or 'false'
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
conditionPath = flag.String("condition_path", "", "Condition output path")
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
)

// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {

func main() {
flag.Parse()
err := drive()

glog.Infof("Setting log level to: '%s'", *logLevel)
err := flag.Set("v", *logLevel)
if err != nil {
glog.Warningf("Failed to set log level: %s", err.Error())
}

err = drive()
if err != nil {
glog.Exitf("%v", err)
}
Expand Down Expand Up @@ -153,15 +161,16 @@ func drive() (err error) {
return err
}
options := driver.Options{
PipelineName: *pipelineName,
RunID: *runID,
RunName: *runName,
RunDisplayName: *runDisplayName,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
PipelineName: *pipelineName,
RunID: *runID,
RunName: *runName,
RunDisplayName: *runDisplayName,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
DAGExecutionID: *dagExecutionID,
IterationIndex: *iterationIndex,
PipelineLogLevel: *logLevel,
}
var execution *driver.Execution
var driverErr error
Expand Down
7 changes: 7 additions & 0 deletions backend/src/v2/cmd/launcher-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.")
mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.")
mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.")
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
)

func main() {
Expand All @@ -54,6 +55,12 @@ func run() error {
flag.Parse()
ctx := context.Background()

glog.Infof("Setting log level to: '%s'", *logLevel)
err := flag.Set("v", *logLevel)
if err != nil {
glog.Warningf("Failed to set log level: %s", err.Error())
}

if *copy != "" {
// copy is used to copy this binary to a shared volume
// this is a special command, ignore all other flags by returning
Expand Down
12 changes: 9 additions & 3 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func Test_argo_compiler(t *testing.T) {
argoYAMLPath: "testdata/hello_world_run_as_user.yaml",
envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"},
},
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "",
argoYAMLPath: "testdata/hello_world_log_level.yaml",
envVars: map[string]string{"PIPELINE_LOG_LEVEL": "3"},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
Expand Down Expand Up @@ -122,14 +128,14 @@ func Test_argo_compiler(t *testing.T) {
// mask the driver launcher image hash to maintain test stability
for _, template := range wf.Spec.Templates {
if template.Container != nil && strings.Contains(template.Container.Image, "kfp-driver") {
template.Container.Image = "gcr.io/ml-pipeline/kfp-driver"
template.Container.Image = "ghcr.io/kubeflow/kfp-driver"
}
if template.Container != nil && strings.Contains(template.Container.Image, "kfp-launcher") {
template.Container.Image = "gcr.io/ml-pipeline/kfp-launcher"
template.Container.Image = "ghcr.io/kubeflow/kfp-launcher"
}
for i := range template.InitContainers {
if strings.Contains(template.InitContainers[i].Image, "kfp-launcher") {
template.InitContainers[i].Image = "gcr.io/ml-pipeline/kfp-launcher"
template.InitContainers[i].Image = "ghcr.io/kubeflow/kfp-launcher"
}
}
}
Expand Down
55 changes: 35 additions & 20 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
DefaultDriverCommand = "driver"
DriverCommandEnvVar = "V2_DRIVER_COMMAND"
PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER"
PipelineLogLevelEnvVar = "PIPELINE_LOG_LEVEL"
gcsScratchLocation = "/gcs"
gcsScratchName = "gcs-scratch"
s3ScratchLocation = "/s3"
Expand Down Expand Up @@ -162,6 +163,27 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
if ok {
return name
}

args := []string{
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--container", inputValue(paramContainer),
"--iteration_index", inputValue(paramIterationIndex),
"--cached_decision_path", outputPath(paramCachedDecision),
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
"--condition_path", outputPath(paramCondition),
"--kubernetes_config", inputValue(paramKubernetesConfig),
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}

t := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -182,24 +204,9 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
},
},
Container: &k8score.Container{
Image: GetDriverImage(),
Command: GetDriverCommand(),
Args: []string{
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--container", inputValue(paramContainer),
"--iteration_index", inputValue(paramIterationIndex),
"--cached_decision_path", outputPath(paramCachedDecision),
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
"--condition_path", outputPath(paramCondition),
"--kubernetes_config", inputValue(paramKubernetesConfig),
},
Image: c.driverImage,
Command: c.driverCommand,
Args: args,
Resources: driverResources,
},
}
Expand Down Expand Up @@ -287,6 +294,13 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
c.templates[nameContainerExecutor] = container

args := []string{
"--copy", component.KFPLauncherPath,
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}
executor := &wfapi.Template{
Name: nameContainerImpl,
Inputs: wfapi.Inputs{
Expand Down Expand Up @@ -345,8 +359,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
InitContainers: []wfapi.UserContainer{{
Container: k8score.Container{
Name: "kfp-launcher",
Image: GetLauncherImage(),
Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath},
Image: c.launcherImage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this change besides optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No not really, just made it a bit concise to add flags and follows the pattern used in the other Container definitions for driver / launcher

Command: []string{"launcher-v2"},
Args: args,
VolumeMounts: []k8score.VolumeMount{
{
Name: volumeNameKFPLauncher,
Expand Down
41 changes: 24 additions & 17 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package argocompiler

import (
"fmt"
"os"
"sort"
"strings"

Expand Down Expand Up @@ -535,6 +536,26 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
if ok {
return name
}

args := []string{
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--runtime_config", inputValue(paramRuntimeConfig),
"--iteration_index", inputValue(paramIterationIndex),
"--execution_id_path", outputPath(paramExecutionID),
"--iteration_count_path", outputPath(paramIterationCount),
"--condition_path", outputPath(paramCondition),
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}

t := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -555,23 +576,9 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
},
},
Container: &k8score.Container{
Image: c.driverImage,
Command: c.driverCommand,
Args: []string{
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--runtime_config", inputValue(paramRuntimeConfig),
"--iteration_index", inputValue(paramIterationIndex),
"--execution_id_path", outputPath(paramExecutionID),
"--iteration_count_path", outputPath(paramIterationCount),
"--condition_path", outputPath(paramCondition),
},
Image: c.driverImage,
Command: c.driverCommand,
Args: args,
Resources: driverResources,
},
}
Expand Down
8 changes: 6 additions & 2 deletions backend/src/v2/compiler/argocompiler/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package argocompiler

import (
"fmt"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
if _, alreadyExists := c.templates[name]; alreadyExists {
return name
}
launcherArgs := []string{
args := []string{
"--executor_type", "importer",
"--task_spec", inputValue(paramTask),
"--component_spec", inputValue(paramComponent),
Expand All @@ -81,6 +82,9 @@ func (c *workflowCompiler) addImporterTemplate() string {
"--mlmd_server_port",
fmt.Sprintf("$(%s)", component.EnvMetadataPort),
}
if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}
importerTemplate := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -94,7 +98,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
Container: &k8score.Container{
Image: c.launcherImage,
Command: []string{"launcher-v2"},
Args: launcherArgs,
Args: args,
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
Env: commonEnvs,
Resources: driverResources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down Expand Up @@ -159,11 +159,12 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
command:
- launcher-v2
image: ghcr.io/kubeflow/kfp-launcher
name: kfp-launcher
resources:
limits:
Expand Down Expand Up @@ -309,7 +310,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down Expand Up @@ -147,11 +147,12 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
command:
- launcher-v2
image: ghcr.io/kubeflow/kfp-launcher
name: kfp-launcher
resources:
limits:
Expand Down Expand Up @@ -252,7 +253,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down
11 changes: 6 additions & 5 deletions backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ spec:
- '{{inputs.parameters.kubernetes-config}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down Expand Up @@ -164,11 +164,12 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
image: gcr.io/ml-pipeline/kfp-launcher
command:
- launcher-v2
image: ghcr.io/kubeflow/kfp-launcher
name: kfp-launcher
resources:
limits:
Expand Down Expand Up @@ -315,7 +316,7 @@ spec:
- '{{outputs.parameters.condition.path}}'
command:
- driver
image: gcr.io/ml-pipeline/kfp-driver
image: ghcr.io/kubeflow/kfp-driver
name: ""
resources:
limits:
Expand Down
Loading
Loading