diff --git a/backend/src/v2/cmd/driver/main.go b/backend/src/v2/cmd/driver/main.go index 63637ccff46..84537768ebc 100644 --- a/backend/src/v2/cmd/driver/main.go +++ b/backend/src/v2/cmd/driver/main.go @@ -69,13 +69,21 @@ var ( // the value stored in the paths will be either 'true' or 'false' cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path") conditionPath = flag.String("condition_path", "", "Condition output path") + logLevel = flag.String("log_level", "1", "The verbosity level to log.") ) // func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) { func main() { flag.Parse() - err := drive() + + glog.Infof("Setting log level to: '%s'", *logLevel) + err := flag.Set("v", *logLevel) + if err != nil { + glog.Warningf("Failed to set log level: %s", err.Error()) + } + + err = drive() if err != nil { glog.Exitf("%v", err) } @@ -153,15 +161,16 @@ func drive() (err error) { return err } options := driver.Options{ - PipelineName: *pipelineName, - RunID: *runID, - RunName: *runName, - RunDisplayName: *runDisplayName, - Namespace: namespace, - Component: componentSpec, - Task: taskSpec, - DAGExecutionID: *dagExecutionID, - IterationIndex: *iterationIndex, + PipelineName: *pipelineName, + RunID: *runID, + RunName: *runName, + RunDisplayName: *runDisplayName, + Namespace: namespace, + Component: componentSpec, + Task: taskSpec, + DAGExecutionID: *dagExecutionID, + IterationIndex: *iterationIndex, + PipelineLogLevel: *logLevel, } var execution *driver.Execution var driverErr error diff --git a/backend/src/v2/cmd/launcher-v2/main.go b/backend/src/v2/cmd/launcher-v2/main.go index 8fb4e8d7625..a77111cd60e 100644 --- a/backend/src/v2/cmd/launcher-v2/main.go +++ b/backend/src/v2/cmd/launcher-v2/main.go @@ -41,6 +41,7 @@ var ( podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.") mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.") mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.") + logLevel = flag.String("log_level", "1", "The verbosity level to log.") ) func main() { @@ -54,6 +55,12 @@ func run() error { flag.Parse() ctx := context.Background() + glog.Infof("Setting log level to: '%s'", *logLevel) + err := flag.Set("v", *logLevel) + if err != nil { + glog.Warningf("Failed to set log level: %s", err.Error()) + } + if *copy != "" { // copy is used to copy this binary to a shared volume // this is a special command, ignore all other flags by returning diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index bce26108314..d3182a2b684 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -74,6 +74,12 @@ func Test_argo_compiler(t *testing.T) { argoYAMLPath: "testdata/hello_world_run_as_user.yaml", envVars: map[string]string{"PIPELINE_RUN_AS_USER": "1001"}, }, + { + jobPath: "../testdata/hello_world.json", + platformSpecPath: "", + argoYAMLPath: "testdata/hello_world_log_level.yaml", + envVars: map[string]string{"PIPELINE_LOG_LEVEL": "3"}, + }, } for _, tt := range tests { t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) { @@ -122,14 +128,14 @@ func Test_argo_compiler(t *testing.T) { // mask the driver launcher image hash to maintain test stability for _, template := range wf.Spec.Templates { if template.Container != nil && strings.Contains(template.Container.Image, "kfp-driver") { - template.Container.Image = "gcr.io/ml-pipeline/kfp-driver" + template.Container.Image = "ghcr.io/kubeflow/kfp-driver" } if template.Container != nil && strings.Contains(template.Container.Image, "kfp-launcher") { - template.Container.Image = "gcr.io/ml-pipeline/kfp-launcher" + template.Container.Image = "ghcr.io/kubeflow/kfp-launcher" } for i := range template.InitContainers { if strings.Contains(template.InitContainers[i].Image, "kfp-launcher") { - template.InitContainers[i].Image = "gcr.io/ml-pipeline/kfp-launcher" + template.InitContainers[i].Image = "ghcr.io/kubeflow/kfp-launcher" } } } diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index f0a5af58253..384c11a4bb2 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -39,6 +39,7 @@ const ( DefaultDriverCommand = "driver" DriverCommandEnvVar = "V2_DRIVER_COMMAND" PipelineRunAsUserEnvVar = "PIPELINE_RUN_AS_USER" + PipelineLogLevelEnvVar = "PIPELINE_LOG_LEVEL" gcsScratchLocation = "/gcs" gcsScratchName = "gcs-scratch" s3ScratchLocation = "/s3" @@ -162,6 +163,27 @@ func (c *workflowCompiler) addContainerDriverTemplate() string { if ok { return name } + + args := []string{ + "--type", "CONTAINER", + "--pipeline_name", c.spec.GetPipelineInfo().GetName(), + "--run_id", runID(), + "--run_name", runResourceName(), + "--run_display_name", c.job.DisplayName, + "--dag_execution_id", inputValue(paramParentDagID), + "--component", inputValue(paramComponent), + "--task", inputValue(paramTask), + "--container", inputValue(paramContainer), + "--iteration_index", inputValue(paramIterationIndex), + "--cached_decision_path", outputPath(paramCachedDecision), + "--pod_spec_patch_path", outputPath(paramPodSpecPatch), + "--condition_path", outputPath(paramCondition), + "--kubernetes_config", inputValue(paramKubernetesConfig), + } + if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } + t := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -182,24 +204,9 @@ func (c *workflowCompiler) addContainerDriverTemplate() string { }, }, Container: &k8score.Container{ - Image: GetDriverImage(), - Command: GetDriverCommand(), - Args: []string{ - "--type", "CONTAINER", - "--pipeline_name", c.spec.GetPipelineInfo().GetName(), - "--run_id", runID(), - "--run_name", runResourceName(), - "--run_display_name", c.job.DisplayName, - "--dag_execution_id", inputValue(paramParentDagID), - "--component", inputValue(paramComponent), - "--task", inputValue(paramTask), - "--container", inputValue(paramContainer), - "--iteration_index", inputValue(paramIterationIndex), - "--cached_decision_path", outputPath(paramCachedDecision), - "--pod_spec_patch_path", outputPath(paramPodSpecPatch), - "--condition_path", outputPath(paramCondition), - "--kubernetes_config", inputValue(paramKubernetesConfig), - }, + Image: c.driverImage, + Command: c.driverCommand, + Args: args, Resources: driverResources, }, } @@ -287,6 +294,13 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { }, } c.templates[nameContainerExecutor] = container + + args := []string{ + "--copy", component.KFPLauncherPath, + } + if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } executor := &wfapi.Template{ Name: nameContainerImpl, Inputs: wfapi.Inputs{ @@ -345,8 +359,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { InitContainers: []wfapi.UserContainer{{ Container: k8score.Container{ Name: "kfp-launcher", - Image: GetLauncherImage(), - Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath}, + Image: c.launcherImage, + Command: []string{"launcher-v2"}, + Args: args, VolumeMounts: []k8score.VolumeMount{ { Name: volumeNameKFPLauncher, diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 9a0dc11b431..0b7d4257f94 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -15,6 +15,7 @@ package argocompiler import ( "fmt" + "os" "sort" "strings" @@ -535,6 +536,26 @@ func (c *workflowCompiler) addDAGDriverTemplate() string { if ok { return name } + + args := []string{ + "--type", inputValue(paramDriverType), + "--pipeline_name", c.spec.GetPipelineInfo().GetName(), + "--run_id", runID(), + "--run_name", runResourceName(), + "--run_display_name", c.job.DisplayName, + "--dag_execution_id", inputValue(paramParentDagID), + "--component", inputValue(paramComponent), + "--task", inputValue(paramTask), + "--runtime_config", inputValue(paramRuntimeConfig), + "--iteration_index", inputValue(paramIterationIndex), + "--execution_id_path", outputPath(paramExecutionID), + "--iteration_count_path", outputPath(paramIterationCount), + "--condition_path", outputPath(paramCondition), + } + if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } + t := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -555,23 +576,9 @@ func (c *workflowCompiler) addDAGDriverTemplate() string { }, }, Container: &k8score.Container{ - Image: c.driverImage, - Command: c.driverCommand, - Args: []string{ - "--type", inputValue(paramDriverType), - "--pipeline_name", c.spec.GetPipelineInfo().GetName(), - "--run_id", runID(), - "--run_name", runResourceName(), - "--run_display_name", c.job.DisplayName, - "--dag_execution_id", inputValue(paramParentDagID), - "--component", inputValue(paramComponent), - "--task", inputValue(paramTask), - "--runtime_config", inputValue(paramRuntimeConfig), - "--iteration_index", inputValue(paramIterationIndex), - "--execution_id_path", outputPath(paramExecutionID), - "--iteration_count_path", outputPath(paramIterationCount), - "--condition_path", outputPath(paramCondition), - }, + Image: c.driverImage, + Command: c.driverCommand, + Args: args, Resources: driverResources, }, } diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go index 83ac6453b64..c501561a2ea 100644 --- a/backend/src/v2/compiler/argocompiler/importer.go +++ b/backend/src/v2/compiler/argocompiler/importer.go @@ -16,6 +16,7 @@ package argocompiler import ( "fmt" + "os" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" @@ -64,7 +65,7 @@ func (c *workflowCompiler) addImporterTemplate() string { if _, alreadyExists := c.templates[name]; alreadyExists { return name } - launcherArgs := []string{ + args := []string{ "--executor_type", "importer", "--task_spec", inputValue(paramTask), "--component_spec", inputValue(paramComponent), @@ -81,6 +82,9 @@ func (c *workflowCompiler) addImporterTemplate() string { "--mlmd_server_port", fmt.Sprintf("$(%s)", component.EnvMetadataPort), } + if value, ok := os.LookupEnv(PipelineLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } importerTemplate := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -94,7 +98,7 @@ func (c *workflowCompiler) addImporterTemplate() string { Container: &k8score.Container{ Image: c.launcherImage, Command: []string{"launcher-v2"}, - Args: launcherArgs, + Args: args, EnvFrom: []k8score.EnvFromSource{metadataEnvFrom}, Env: commonEnvs, Resources: driverResources, diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml index 0ce60a6cc4f..c4abf4655a7 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml @@ -71,7 +71,7 @@ spec: - '{{inputs.parameters.kubernetes-config}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: @@ -159,11 +159,12 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch - image: gcr.io/ml-pipeline/kfp-launcher + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher name: kfp-launcher resources: limits: @@ -309,7 +310,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml index d0887d474ac..c6b66ba4326 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml @@ -59,7 +59,7 @@ spec: - '{{inputs.parameters.kubernetes-config}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: @@ -147,11 +147,12 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch - image: gcr.io/ml-pipeline/kfp-launcher + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher name: kfp-launcher resources: limits: @@ -252,7 +253,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml b/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml index 7af6ddd8134..13caacda653 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml @@ -76,7 +76,7 @@ spec: - '{{inputs.parameters.kubernetes-config}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: @@ -164,11 +164,12 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch - image: gcr.io/ml-pipeline/kfp-launcher + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher name: kfp-launcher resources: limits: @@ -315,7 +316,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml index f1277b4006d..dcbee178d31 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml @@ -57,7 +57,7 @@ spec: - '{{inputs.parameters.kubernetes-config}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: @@ -145,11 +145,12 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch - image: gcr.io/ml-pipeline/kfp-launcher + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher name: kfp-launcher resources: limits: @@ -242,7 +243,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/compiler/argocompiler/testdata/hello_world_log_level.yaml b/backend/src/v2/compiler/argocompiler/testdata/hello_world_log_level.yaml new file mode 100644 index 00000000000..1aa484f7cda --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/hello_world_log_level.yaml @@ -0,0 +1,316 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: hello-world- +spec: + arguments: + parameters: + - name: components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.9"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - namespace/n1/pipeline/hello-world + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - "" + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + - --log_level + - "3" + command: + - driver + image: ghcr.io/kubeflow/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - args: + - --copy + - /kfp-launcher/launch + - --log_level + - "3" + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' + - name: container + value: '{{workflow.parameters.implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: hello-world-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.hello-world-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.hello-world-driver.outputs.parameters.cached-decision}}' + depends: hello-world-driver.Succeeded + name: hello-world + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - namespace/n1/pipeline/hello-world + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - "" + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --log_level + - "3" + command: + - driver + image: ghcr.io/kubeflow/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"text":{"stringValue":"hi there"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/backend/src/v2/compiler/argocompiler/testdata/hello_world_run_as_user.yaml b/backend/src/v2/compiler/argocompiler/testdata/hello_world_run_as_user.yaml index 6d16f12d8d6..437c2ed85e8 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/hello_world_run_as_user.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/hello_world_run_as_user.yaml @@ -59,7 +59,7 @@ spec: - '{{inputs.parameters.kubernetes-config}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: @@ -147,11 +147,12 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch - image: gcr.io/ml-pipeline/kfp-launcher + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher name: kfp-launcher resources: limits: @@ -244,7 +245,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/compiler/argocompiler/testdata/importer.yaml b/backend/src/v2/compiler/argocompiler/testdata/importer.yaml index 13959180d9b..2fa7cba6ce6 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/importer.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/importer.yaml @@ -59,7 +59,7 @@ spec: - configMapRef: name: metadata-grpc-configmap optional: true - image: gcr.io/ml-pipeline/kfp-launcher + image: ghcr.io/kubeflow/kfp-launcher name: "" resources: limits: @@ -127,7 +127,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml b/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml index 52c2446a1c6..655cacee5fe 100755 --- a/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml @@ -1,6 +1,7 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: + creationTimestamp: null generateName: my-pipeline- spec: arguments: @@ -74,7 +75,7 @@ spec: - '{{inputs.parameters.kubernetes-config}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: @@ -151,7 +152,6 @@ spec: name: kfp-launcher - mountPath: /gcs name: gcs-scratch - - mountPath: /s3 name: s3-scratch - mountPath: /minio @@ -163,11 +163,12 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch - image: gcr.io/ml-pipeline/kfp-launcher + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher name: kfp-launcher resources: limits: @@ -334,7 +335,7 @@ spec: - '{{outputs.parameters.condition.path}}' command: - driver - image: gcr.io/ml-pipeline/kfp-driver + image: ghcr.io/kubeflow/kfp-driver name: "" resources: limits: diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index e8f6416b14f..fc96a156df3 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -81,6 +81,8 @@ type Options struct { RunName string // optional, required only if the {{$.pipeline_job_name}} placeholder is used RunDisplayName string + + PipelineLogLevel string } // Identifying information used for error messages @@ -347,7 +349,7 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl return execution, nil } - podSpec, err := initPodSpecPatch(opts.Container, opts.Component, executorInput, execution.ID, opts.PipelineName, opts.RunID) + podSpec, err := initPodSpecPatch(opts.Container, opts.Component, executorInput, execution.ID, opts.PipelineName, opts.RunID, opts.PipelineLogLevel) if err != nil { return execution, err } @@ -409,6 +411,7 @@ func initPodSpecPatch( executionID int64, pipelineName string, runID string, + pipelineLogLevel string, ) (*k8score.PodSpec, error) { executorInputJSON, err := protojson.Marshal(executorInput) if err != nil { @@ -444,8 +447,12 @@ func initPodSpecPatch( fmt.Sprintf("$(%s)", component.EnvMetadataHost), "--mlmd_server_port", fmt.Sprintf("$(%s)", component.EnvMetadataPort), - "--", // separater before user command and args } + if pipelineLogLevel != "1" { + // Add log level to user code launcher if not default (set to 1) + launcherCmd = append(launcherCmd, "--log_level", pipelineLogLevel) + } + launcherCmd = append(launcherCmd, "--") // separater before user command and args res := k8score.ResourceRequirements{ Limits: map[k8score.ResourceName]k8sres.Quantity{}, Requests: map[k8score.ResourceName]k8sres.Quantity{}, diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index e0f2112fe4e..3021abfee47 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -33,12 +33,13 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { viper.Set("KFP_POD_NAME", "MyWorkflowPod") viper.Set("KFP_POD_UID", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6") type args struct { - container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec - componentSpec *pipelinespec.ComponentSpec - executorInput *pipelinespec.ExecutorInput - executionID int64 - pipelineName string - runID string + container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec + componentSpec *pipelinespec.ComponentSpec + executorInput *pipelinespec.ExecutorInput + executionID int64 + pipelineName string + runID string + pipelineLogLevel string } tests := []struct { name string @@ -81,6 +82,7 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"nvidia.com/gpu":"1"`, false, @@ -120,6 +122,7 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"amd.com/gpu":"1"`, false, @@ -159,6 +162,7 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"cloud-tpus.google.com/v3":"1"`, false, @@ -198,6 +202,7 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"cloud-tpus.google.com/v2":"1"`, false, @@ -237,6 +242,7 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"custom.example.com/accelerator-v1":"1"`, false, @@ -245,7 +251,7 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - podSpec, err := initPodSpecPatch(tt.args.container, tt.args.componentSpec, tt.args.executorInput, tt.args.executionID, tt.args.pipelineName, tt.args.runID) + podSpec, err := initPodSpecPatch(tt.args.container, tt.args.componentSpec, tt.args.executorInput, tt.args.executionID, tt.args.pipelineName, tt.args.runID, tt.args.pipelineLogLevel) if tt.wantErr { assert.Nil(t, podSpec) assert.NotNil(t, err) @@ -345,7 +351,7 @@ func Test_initPodSpecPatch_resource_placeholders(t *testing.T) { } podSpec, err := initPodSpecPatch( - containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", + containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", "1", ) assert.Nil(t, err) assert.Len(t, podSpec.Containers, 1) @@ -378,7 +384,7 @@ func Test_initPodSpecPatch_legacy_resources(t *testing.T) { executorInput := &pipelinespec.ExecutorInput{} podSpec, err := initPodSpecPatch( - containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", + containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", "1", ) assert.Nil(t, err) assert.Len(t, podSpec.Containers, 1) @@ -413,7 +419,7 @@ func Test_initPodSpecPatch_modelcar_input_artifact(t *testing.T) { } podSpec, err := initPodSpecPatch( - containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", + containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300", "1", ) assert.Nil(t, err) @@ -498,12 +504,13 @@ func Test_initPodSpecPatch_resourceRequests(t *testing.T) { viper.Set("KFP_POD_NAME", "MyWorkflowPod") viper.Set("KFP_POD_UID", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6") type args struct { - container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec - componentSpec *pipelinespec.ComponentSpec - executorInput *pipelinespec.ExecutorInput - executionID int64 - pipelineName string - runID string + container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec + componentSpec *pipelinespec.ComponentSpec + executorInput *pipelinespec.ExecutorInput + executionID int64 + pipelineName string + runID string + pipelineLogLevel string } tests := []struct { name string @@ -543,6 +550,7 @@ func Test_initPodSpecPatch_resourceRequests(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"resources":{"limits":{"cpu":"2","memory":"1500M"},"requests":{"cpu":"1","memory":"650M"}}`, "", @@ -579,6 +587,7 @@ func Test_initPodSpecPatch_resourceRequests(t *testing.T) { 1, "MyPipeline", "a1b2c3d4-a1b2-a1b2-a1b2-a1b2c3d4e5f6", + "1", }, `"resources":{"limits":{"cpu":"2","memory":"1500M"}}`, `"requests"`, @@ -586,7 +595,7 @@ func Test_initPodSpecPatch_resourceRequests(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - podSpec, err := initPodSpecPatch(tt.args.container, tt.args.componentSpec, tt.args.executorInput, tt.args.executionID, tt.args.pipelineName, tt.args.runID) + podSpec, err := initPodSpecPatch(tt.args.container, tt.args.componentSpec, tt.args.executorInput, tt.args.executionID, tt.args.pipelineName, tt.args.runID, tt.args.pipelineLogLevel) assert.Nil(t, err) assert.NotEmpty(t, podSpec) podSpecString, err := json.Marshal(podSpec) diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml index 1346314f7ad..b998863f722 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml @@ -19,6 +19,9 @@ spec: - env: - name: LOG_LEVEL value: "info" + # Driver / launcher log level during pipeline execution + - name: PIPELINE_LOG_LEVEL + value: "1" - name: AUTO_UPDATE_PIPELINE_DEFAULT_VERSION valueFrom: configMapKeyRef: