Skip to content

Commit

Permalink
feat(backend): Add Semaphore and Mutex fields to Workflow CR
Browse files Browse the repository at this point in the history
- Added `Semaphore` and `Mutex` fields to the Workflow Spec to support concurrency control mechanisms directly within workflows.
- Introduced a new environment variable, `SEMAPHORE_CONFIGMAP_NAME`, to the API Server deployment for managing semaphore configurations.
- Added an empty ConfigMap manifest for semaphores to facilitate initial setup and testing.

Signed-off-by: ddalvi <ddalvi@redhat.com>
  • Loading branch information
DharmitD committed Feb 17, 2025
1 parent 87498e8 commit 6ff70af
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 30 deletions.
73 changes: 54 additions & 19 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,39 @@ func NewGenericScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu
}, nil
}

func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) *pipelinespec.SinglePlatformSpec {
var kubernetesSpec *pipelinespec.SinglePlatformSpec

// Check for "kubernetes" key in the platformSpec map
if platformSpec != nil {
if platform, ok := platformSpec["kubernetes"]; ok && platform != nil {
kubernetesSpec = platform
}
}
return kubernetesSpec
}

func getPipelineOptions(platform *pipelinespec.SinglePlatformSpec) *argocompiler.Options {
var pipelineOptions *argocompiler.Options

if platform != nil && platform.PipelineConfig != nil {
pipelineOptions = &argocompiler.Options{}
if platform.PipelineConfig.SemaphoreKey != "" {
pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey
}
if platform.PipelineConfig.MutexName != "" {
pipelineOptions.MutexName = platform.PipelineConfig.MutexName
}
}
return pipelineOptions
}

// Converts modelJob to ScheduledWorkflow.
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
if t == nil {
return nil, fmt.Errorf("V2Spec is nil")
}

job := &pipelinespec.PipelineJob{}

bytes, err := protojson.Marshal(t.spec)
Expand All @@ -98,39 +129,47 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
// Check platformSpec for nil before using it
if t.platformSpec == nil {
return nil, fmt.Errorf("platformSpec is nil")
}

kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
if kubernetesSpec == nil {
return nil, fmt.Errorf("kubernetesSpec is nil")
}

pipelineOptions := getPipelineOptions(kubernetesSpec)
if pipelineOptions == nil {
return nil, fmt.Errorf("pipelineOptions is nil")
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
if err != nil {
return nil, util.Wrap(err, "Failed to compile job")
}
// currently, there is only Argo implementation, so it's using `ArgoWorkflow` for now
// later on, if a new runtime support will be added, we need a way to switch/specify
// runtime. i.e using ENV var

executionSpec, err := util.NewExecutionSpecFromInterface(util.CurrentExecutionType(), obj)
if err != nil {
return nil, util.NewInternalServerError(err, "error creating execution spec")
}

// Overwrite namespace from the job object
if modelJob.Namespace != "" {
executionSpec.SetExecutionNamespace(modelJob.Namespace)
}
if executionSpec.ServiceAccount() == "" {
setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount)
}

// Disable istio sidecar injection if not specified
executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled)

parameters, err := StringMapToCRDParameters(modelJob.RuntimeConfig.Parameters)
if err != nil {
return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed")
Expand Down Expand Up @@ -305,17 +344,13 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
if err = t.validatePipelineJobInputs(job); err != nil {
return nil, util.Wrap(err, "invalid pipeline job inputs")
}
// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}

kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down
46 changes: 36 additions & 10 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"os"
"strings"

"github.com/kubeflow/pipelines/backend/src/apiserver/common"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
Expand All @@ -41,6 +43,16 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
SemaphoreKey string
MutexName string
}

func getSemaphoreConfigMapName() string {
const defaultConfigMapName = "semaphore-config"
if name := os.Getenv("SEMAPHORE_CONFIGMAP_NAME"); name != "" {
return name
}
return defaultConfigMapName
}

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
Expand Down Expand Up @@ -87,22 +99,13 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Kind: "Workflow",
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Note, uncomment the following during development to view argo inputs/outputs in KFP UI.
// TODO(Bobgy): figure out what annotations we should use for v2 engine.
// For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts
// suitable for debugging.
//
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
PodMetadata: &wfapi.Metadata{
Expand All @@ -120,6 +123,29 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
Entrypoint: tmplEntrypoint,
},
}

sync := &wfapi.Synchronization{}
if opts != nil && opts.SemaphoreKey != "" {
sync.Semaphore = &wfapi.SemaphoreRef{
ConfigMapKeyRef: &k8score.ConfigMapKeySelector{
LocalObjectReference: k8score.LocalObjectReference{
Name: getSemaphoreConfigMapName(),
},
Key: opts.SemaphoreKey,
},
}
}

if opts != nil && opts.MutexName != "" {
sync.Mutex = &wfapi.Mutex{
Name: opts.MutexName,
}
}

if sync.Semaphore != nil || sync.Mutex != nil {
wf.Spec.Synchronization = sync
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec
}

// Add kubernetes spec to annotation
if state.kubernetesSpec != nil {
if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil {
kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel]
if ok {
state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)
Expand Down
1 change: 1 addition & 0 deletions manifests/kustomize/base/pipeline/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ resources:
- ml-pipeline-scheduledworkflow-role.yaml
- ml-pipeline-scheduledworkflow-rolebinding.yaml
- ml-pipeline-scheduledworkflow-sa.yaml
- ml-pipeline-semaphore-configmap.yaml
- ml-pipeline-ui-deployment.yaml
- ml-pipeline-ui-configmap.yaml
- ml-pipeline-ui-role.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ spec:
fieldPath: metadata.namespace
- name: OBJECTSTORECONFIG_SECURE
value: "false"
- name: SEMAPHORE_CONFIGMAP_NAME
value: "semaphore-config"
- name: OBJECTSTORECONFIG_BUCKETNAME
valueFrom:
configMapKeyRef:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
kind: ConfigMap
apiVersion: v1
metadata:
name: semaphore-config
data: {}
---
apiVersion: batch/v1
kind: Job
metadata:
name: semaphore-configmap-init
namespace: kubeflow
spec:
template:
spec:
containers:
- name: create-configmap
image: bitnami/kubectl:latest
command:
- /bin/sh
- -c
- |
if ! kubectl get configmap semaphore-config -n kubeflow > /dev/null 2>&1; then
echo "Creating semaphore-config ConfigMap..."
kubectl create configmap semaphore-config -n kubeflow --from-literal=init=""
else
echo "ConfigMap semaphore-config already exists. Skipping creation."
fi
restartPolicy: OnFailure
backoffLimit: 3

0 comments on commit 6ff70af

Please sign in to comment.