Skip to content

Commit

Permalink
add support for k8s platform field inputs
Browse files Browse the repository at this point in the history
This change adds the necessary proto changes to accomodate parameterized
input for various k8s platform fields: secrets, configmaps, node
selectors, tolerations, and image pull secrets.

In order to reduce duplication, a common proto directory is introduced
for InputParameterSpec and future such cases.

The format follows the standard that pvc_mount employs, but generalizes
it further. Old fields are marked deprecated in favor of the new fields.

Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
  • Loading branch information
HumairAK committed Feb 19, 2025
1 parent c0778ba commit 989ea8d
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 137 deletions.
119 changes: 15 additions & 104 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ml_pipelines;
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
import "google/rpc/status.proto";
import "common.proto";

option go_package = "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec";

Expand All @@ -28,7 +29,7 @@ message PipelineJob {
// The runtime config of a PipelineJob.
message RuntimeConfig {
// Deprecated. Use [RuntimeConfig.parameter_values][] instead.
map<string, Value> parameters = 1 [deprecated = true];
map<string, kfp_common.Value> parameters = 1 [deprecated = true];

// A path in a Cloud Storage bucket which will be treated as the root
// output directory of the pipeline. It is used by the system to
Expand Down Expand Up @@ -68,7 +69,7 @@ message PipelineSpec {
// Optional field. Default value of the runtime parameter. If not set and
// the runtime parameter value is not provided during runtime, an error will
// be raised.
Value default_value = 2;
kfp_common.Value default_value = 2;
}

// The map of name to definition of all components used in this pipeline.
Expand Down Expand Up @@ -234,9 +235,9 @@ message ComponentOutputsSpec {
message ArtifactSpec {
ArtifactTypeSchema artifact_type = 1;
// Deprecated. Use [ArtifactSpec.metadata][] instead.
map<string, ValueOrRuntimeParameter> properties = 2 [deprecated = true];
map<string, kfp_common.ValueOrRuntimeParameter> properties = 2 [deprecated = true];
// Deprecated. Use [ArtifactSpec.metadata][] instead.
map<string, ValueOrRuntimeParameter> custom_properties = 3
map<string, kfp_common.ValueOrRuntimeParameter> custom_properties = 3
[deprecated = true];
// Properties of the Artifact.
google.protobuf.Struct metadata = 4;
Expand Down Expand Up @@ -290,71 +291,9 @@ message TaskInputsSpec {
reserved 5;
}

// Represents an input parameter. The value can be taken from an upstream
// task's output parameter (if specifying `producer_task` and
// `output_parameter_key`, or it can be a runtime value, which can either be
// determined at compile-time, or from a pipeline parameter.
message InputParameterSpec {
// Represents an upstream task's output parameter.
message TaskOutputParameterSpec {
// The name of the upstream task which produces the output parameter that
// matches with the `output_parameter_key`.
string producer_task = 1;

// The key of [TaskOutputsSpec.parameters][] map of the producer task.
string output_parameter_key = 2;
}

// Represents an upstream task's final status. The field can only be set if
// the schema version is `2.0.0`. The resolved input parameter will be a
// json payload in string type.
message TaskFinalStatus {
// The name of the upsteram task where the final status is coming from.
string producer_task = 1;
}

oneof kind {
// Output parameter from an upstream task.
TaskOutputParameterSpec task_output_parameter = 1;
// A constant value or runtime parameter.
ValueOrRuntimeParameter runtime_value = 2;
// Pass the input parameter from parent component input parameter.
string component_input_parameter = 3;
// The final status of an uptream task.
TaskFinalStatus task_final_status = 5;
}

// Selector expression of Common Expression Language (CEL)
// that applies to the parameter found from above kind.
//
// The expression is applied to the Value type
// [Value][]. For example,
// 'size(string_value)' will return the size of the Value.string_value.
//
// After applying the selection, the parameter will be returned as a
// [Value][]. The type of the Value is either deferred from the input
// definition in the corresponding
// [ComponentSpec.input_definitions.parameters][], or if not found,
// automatically deferred as either string value or double value.
//
// In addition to the builtin functions in CEL, The value.string_value can
// be treated as a json string and parsed to the [google.protobuf.Value][]
// proto message. Then, the CEL expression provided in this field will be
// used to get the requested field. For examples,
// - if Value.string_value is a json array of "[1.1, 2.2, 3.3]",
// 'parseJson(string_value)[i]' will pass the ith parameter from the list
// to the current task, or
// - if the Value.string_value is a json map of "{"a": 1.1, "b": 2.2,
// "c": 3.3}, 'parseJson(string_value)[key]' will pass the map value from
// the struct map to the current task.
//
// If unset, the value will be passed directly to the current task.
string parameter_expression_selector = 4;
}

// A map of input parameters which are small values, stored by the system and
// can be queriable.
map<string, InputParameterSpec> parameters = 1;
map<string, kfp_common.InputParameterSpec> parameters = 1;
// A map of input artifacts.
map<string, InputArtifactSpec> artifacts = 2;
}
Expand All @@ -368,11 +307,11 @@ message TaskOutputsSpec {

// The properties of the artifact, which are determined either at
// compile-time, or at pipeline submission time through runtime parameters
map<string, ValueOrRuntimeParameter> properties = 2;
map<string, kfp_common.ValueOrRuntimeParameter> properties = 2;

// The custom properties of the artifact, which are determined either at
// compile-time, or at pipeline submission time through runtime parameters
map<string, ValueOrRuntimeParameter> custom_properties = 3;
map<string, kfp_common.ValueOrRuntimeParameter> custom_properties = 3;
}

// Specification for output parameters produced by the task.
Expand Down Expand Up @@ -667,22 +606,6 @@ message PipelineTaskInfo {
string name = 1;
}

// Definition for a value or reference to a runtime parameter. A
// ValueOrRuntimeParameter instance can be either a field value that is
// determined during compilation time, or a runtime parameter which will be
// determined during runtime.
message ValueOrRuntimeParameter {
oneof value {
// Constant value which is determined in compile time.
// Deprecated. Use [ValueOrRuntimeParameter.constant][] instead.
Value constant_value = 1 [deprecated = true];
// The runtime parameter refers to the parent component input parameter.
string runtime_parameter = 2;
// Constant value which is determined in compile time.
google.protobuf.Value constant = 3;
}
}

// The definition of the deployment config of the pipeline. It contains the
// the platform specific executor configs for KFP OSS.
message PipelineDeploymentConfig {
Expand Down Expand Up @@ -816,18 +739,18 @@ message PipelineDeploymentConfig {
// The specification to import or reimport a new artifact to the pipeline.
message ImporterSpec {
// The URI of the artifact.
ValueOrRuntimeParameter artifact_uri = 1;
kfp_common.ValueOrRuntimeParameter artifact_uri = 1;

// The type of the artifact.
ArtifactTypeSchema type_schema = 2;

// The properties of the artifact.
// Deprecated. Use [ImporterSpec.metadata][] instead.
map<string, ValueOrRuntimeParameter> properties = 3 [deprecated = true];
map<string, kfp_common.ValueOrRuntimeParameter> properties = 3 [deprecated = true];

// The custom properties of the artifact.
// Deprecated. Use [ImporterSpec.metadata][] instead.
map<string, ValueOrRuntimeParameter> custom_properties = 4
map<string, kfp_common.ValueOrRuntimeParameter> custom_properties = 4
[deprecated = true];

// Properties of the Artifact.
Expand Down Expand Up @@ -896,18 +819,6 @@ message PipelineDeploymentConfig {
map<string, ExecutorSpec> executors = 1;
}

// Value is the value of the field.
message Value {
oneof value {
// An integer value
int64 int_value = 1;
// A double value
double double_value = 2;
// A string value
string string_value = 3;
}
}

// The definition of a runtime artifact.
message RuntimeArtifact {
// The name of an artifact.
Expand All @@ -921,11 +832,11 @@ message RuntimeArtifact {

// The properties of the artifact.
// Deprecated. Use [RuntimeArtifact.metadata][] instead.
map<string, Value> properties = 4 [deprecated = true];
map<string, kfp_common.Value> properties = 4 [deprecated = true];

// The custom properties of the artifact.
// Deprecated. Use [RuntimeArtifact.metadata][] instead.
map<string, Value> custom_properties = 5 [deprecated = true];
map<string, kfp_common.Value> custom_properties = 5 [deprecated = true];

// Properties of the Artifact.
google.protobuf.Struct metadata = 6;
Expand Down Expand Up @@ -967,7 +878,7 @@ message ExecutorInput {
message Inputs {
// Input parameters of the execution.
// Deprecated. Use [ExecutorInput.Inputs.parameter_values][] instead.
map<string, Value> parameters = 1 [deprecated = true];
map<string, kfp_common.Value> parameters = 1 [deprecated = true];

// Input artifacts of the execution.
map<string, ArtifactList> artifacts = 2;
Expand Down Expand Up @@ -1011,7 +922,7 @@ message ExecutorInput {
message ExecutorOutput {
// The values for output parameters.
// Deprecated. Use [ExecutorOutput.parameter_values][] instead.
map<string, Value> parameters = 1 [deprecated = true];
map<string, kfp_common.Value> parameters = 1 [deprecated = true];

// The updated metadata for output artifact.
map<string, ArtifactList> artifacts = 2;
Expand Down
12 changes: 10 additions & 2 deletions api/v2alpha1/python/generate_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

PROTO_DIR = os.path.realpath(os.path.join(os.path.dirname(__file__), os.pardir))

PROJECT_DIR = os.path.dirname(os.path.dirname(PROTO_DIR))
COMMON_PROTO_DIR = os.path.join(PROJECT_DIR, 'common')
COMMON_PROTO_SOURCE = os.path.join(COMMON_PROTO_DIR, 'common.proto')

PKG_DIR = os.path.realpath(
os.path.join(os.path.dirname(__file__), "kfp", "pipeline_spec"))

Expand Down Expand Up @@ -62,9 +66,13 @@ def generate_proto(source):

protoc_command = [
PROTOC,
"-I%s" % PROTO_DIR,
"--python_out=%s" % PKG_DIR, source
f'-I={COMMON_PROTO_DIR}',
f"-I={PROTO_DIR}",
f"--python_out={PKG_DIR}",
COMMON_PROTO_SOURCE,
source
]
print(protoc_command)
if subprocess.call(protoc_command) != 0:
sys.exit(-1)

Expand Down
9 changes: 9 additions & 0 deletions api/v2alpha1/python/kfp/pipeline_spec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

# Ensure Python can find the protobuf-generated modules
# This is needed in order to resolve common proto python module
# that is generated via generate_proto.py
package_dir = os.path.dirname(os.path.abspath(__file__))
if package_dir not in sys.path:
sys.path.append(package_dir)
3 changes: 3 additions & 0 deletions common/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Common Proto Spec

This directory contains proto definitions that are share between different KFP packages.
119 changes: 119 additions & 0 deletions common/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2025 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package kfp_common;

import "google/protobuf/struct.proto";

// Represents an input parameter. The value can be taken from an upstream
// task's output parameter (if specifying `producer_task` and
// `output_parameter_key`, or it can be a runtime value, which can either be
// determined at compile-time, or from a pipeline parameter.
message InputParameterSpec {
// Represents an upstream task's output parameter.
message TaskOutputParameterSpec {
// The name of the upstream task which produces the output parameter that
// matches with the `output_parameter_key`.
string producer_task = 1;

// The key of [TaskOutputsSpec.parameters][] map of the producer task.
string output_parameter_key = 2;
}

// Represents an upstream task's final status. The field can only be set if
// the schema version is `2.0.0`. The resolved input parameter will be a
// json payload in string type.
message TaskFinalStatus {
// The name of the upsteram task where the final status is coming from.
string producer_task = 1;
}

oneof kind {
// Output parameter from an upstream task.
TaskOutputParameterSpec task_output_parameter = 1;
// A constant value or runtime parameter.
ValueOrRuntimeParameter runtime_value = 2;
// Pass the input parameter from parent component input parameter.
string component_input_parameter = 3;
// The final status of an upstream task.
TaskFinalStatus task_final_status = 5;
}

// Selector expression of Common Expression Language (CEL)
// that applies to the parameter found from above kind.
//
// The expression is applied to the Value type
// [Value][]. For example,
// 'size(string_value)' will return the size of the Value.string_value.
//
// After applying the selection, the parameter will be returned as a
// [Value][]. The type of the Value is either deferred from the input
// definition in the corresponding
// [ComponentSpec.input_definitions.parameters][], or if not found,
// automatically deferred as either string value or double value.
//
// In addition to the builtin functions in CEL, The value.string_value can
// be treated as a json string and parsed to the [google.protobuf.Value][]
// proto message. Then, the CEL expression provided in this field will be
// used to get the requested field. For examples,
// - if Value.string_value is a json array of "[1.1, 2.2, 3.3]",
// 'parseJson(string_value)[i]' will pass the ith parameter from the list
// to the current task, or
// - if the Value.string_value is a json map of "{"a": 1.1, "b": 2.2,
// "c": 3.3}, 'parseJson(string_value)[key]' will pass the map value from
// the struct map to the current task.
//
// If unset, the value will be passed directly to the current task.
string parameter_expression_selector = 4;
}

// Definition for a value or reference to a runtime parameter. A
// ValueOrRuntimeParameter instance can be either a field value that is
// determined during compilation time, or a runtime parameter which will be
// determined during runtime.
message ValueOrRuntimeParameter {
oneof value {
// Constant value which is determined in compile time.
// Deprecated. Use [ValueOrRuntimeParameter.constant][] instead.
Value constant_value = 1 [deprecated = true];
// The runtime parameter refers to the parent component input parameter.
string runtime_parameter = 2;
// Constant value which is determined in compile time.
google.protobuf.Value constant = 3;
}
}

// Value is the value of the field.
message Value {
oneof value {
// An integer value
int64 int_value = 1;
// A double value
double double_value = 2;
// A string value
string string_value = 3;
}
}

// Represents an upstream task's output parameter.
message TaskOutputParameterSpec {
// The name of the upstream task which produces the output parameter that
// matches with the `output_parameter_key`.
string producer_task = 1;

// The key of [TaskOutputsSpec.parameters][] map of the producer task.
string output_parameter_key = 2;
}
Loading

0 comments on commit 989ea8d

Please sign in to comment.