Skip to content

Commit

Permalink
feat: Implement rebalance queue and config option
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Kotcher <rkotcher+1@gmail.com>
  • Loading branch information
Robert Kotcher committed Nov 15, 2022
1 parent 6f76934 commit bac4b4b
Show file tree
Hide file tree
Showing 38 changed files with 1,956 additions and 739 deletions.
4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6268,6 +6268,10 @@
"configMapKeyRef": {
"$ref": "#/definitions/io.k8s.api.core.v1.ConfigMapKeySelector",
"description": "ConfigMapKeyRef is configmap selector for Semaphore configuration"
},
"rebalanceKey": {
"description": "RebalanceKey gives an equal share of locks to all resources that specify the same rebalance key",
"type": "string"
}
},
"type": "object"
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -10172,6 +10172,10 @@
"configMapKeyRef": {
"description": "ConfigMapKeyRef is configmap selector for Semaphore configuration",
"$ref": "#/definitions/io.k8s.api.core.v1.ConfigMapKeySelector"
},
"rebalanceKey": {
"description": "RebalanceKey gives an equal share of locks to all resources that specify the same rebalance key",
"type": "string"
}
}
},
Expand Down
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type Config struct {

RetentionPolicy *RetentionPolicy `json:"retentionPolicy,omitempty"`

// SemaphoreStrategies maps a lock name to the type of strategy that should be used to assign locks to
// pending lock holders.
SemaphoreStrategies SemaphoreStrategyTypes `json:"semaphoreStrategies,omitempty"`

// NavColor is an ui navigation bar background color
NavColor string `json:"navColor,omitempty"`

Expand Down Expand Up @@ -277,6 +281,18 @@ const (
TemplateReferencingSecure TemplateReferencing = "Secure"
)

// SemaphoreStrategy determines how locks are distributed to pending lock holders.
type SemaphoreStrategy string

const (
SemaphoreStrategyDefault SemaphoreStrategy = "default"
SemaphoreStrategyRebalanced SemaphoreStrategy = "rebalanced"
)

// SemaphoreStrategyTypes maps a lock name to the type of strategy that should be used to assign locks to
// pending lock holders.
type SemaphoreStrategyTypes map[string]SemaphoreStrategy

func (req *WorkflowRestrictions) MustUseReference() bool {
if req == nil {
return false
Expand Down
1 change: 1 addition & 0 deletions docs/executor_swagger.md
Original file line number Diff line number Diff line change
Expand Up @@ -4216,6 +4216,7 @@ Note that this field cannot be set when spec.os.name is windows.
| Name | Type | Go type | Required | Default | Description | Example |
|------|------|---------|:--------:| ------- |-------------|---------|
| configMapKeyRef | [ConfigMapKeySelector](#config-map-key-selector)| `ConfigMapKeySelector` | | | | |
| rebalanceKey | string| `string` | | | RebalanceKey gives an equal share of locks to all resources that specify the same rebalance key | |



Expand Down
1 change: 1 addition & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,7 @@ SemaphoreRef is a reference of Semaphore
| Field Name | Field Type | Description |
|:----------:|:----------:|---------------|
|`configMapKeyRef`|[`ConfigMapKeySelector`](#configmapkeyselector)|ConfigMapKeyRef is configmap selector for Semaphore configuration|
|`rebalanceKey`|`string`|RebalanceKey gives an equal share of locks to all resources that specify the same rebalance key|

## ArtifactLocation

Expand Down
127 changes: 127 additions & 0 deletions docs/proposals/rebalanced-semaphore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Rebalanced Semaphore Strategy

## Introduction

In some situations where Argo is orchestrating a service with many end users, it may be the case that we want these users to have equal access to a particular resource. As number of concurrent users increases or decreases, we want Argo to know each users' constantly-changing lock quota.

One example could be distribution of a finite number of software licenses, where a Running node holds a single license for the duration of its execution.

This feature extends the behavior of Argo's [semaphore](https://github.com/argoproj/argo-workflows/blob/master/docs/synchronization.md) so that, in addition to specifying a rate limit for the number of locks available, the Argo user can specify a `rebalanceKey` directly in the Workflow file.

All Workflows sharing the same `rebalanceKey` will have access to ${ 1 / total # distinct `rebalanceKey`s} of this semaphore's rate limit.

_Argo users who do not use this feature are implicitly using the already-existing [default](https://github.com/argoproj/argo-workflows/blob/master/docs/synchronization.md) strategy._

### Example

Suppose we have the following:

* Semaphore S with limit 55
* Workflow A with 100 child nodes all vying for S with `rebalanceKey` J
* Workflow B with 100 child nodes all vying for S with `rebalanceKey` J
* Workflow C with 500 child nodes all vying for S with `rebalanceKey` K

Suppose A, B, and C are submitted, in that order. Then:

* There are two distinct `rebalanceKey`s J and K (that are either pending or held)
* Because A was submitted before B, and A and B both have `rebalanceKey` J, all of A's child nodes will receive a lock from S before any of B's.
* Because A was submitted before B and C, A _could_ briefly receive all of the locks because none of B or C's resources were added to the queue of pending lock holders.
* The list of resources holding a lock from S should quite quickly converge to a list where 27 of the holders are from A and 26 are from B (or vice versa). This is the steady state until A (or C) completes.

Suppose A and B complete, but C is still running. Then:

* C will have access to all 55 locks.

## User guide

### Configuration

Suppose we want to configure some semaphore named "template" to use the rebalanced strategy. First configure the semaphore with a limit, as described in the [synchronization](https://github.com/argoproj/argo-workflows/blob/master/docs/synchronization.md) documentation:

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
data:
template: "5"
```

The property "template" can now be reference by the workflow.

Next the `workflow-controller-configmap` must be edited to tell the controller which semaphores should be configured using the rebalanced strategy:

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: workflow-controller-configmap
data:
semaphoreStrategies: |
template: "rebalanced"
```

The workflow controller should be restarted if a change in strategy takes place. This is because the strategy is fixed when the controller in initialized.

The controller will output logs that explain which strategies were chosen by each semaphore.

Note that semaphores not included in `semaphoreStrategies` will use the [default](https://github.com/argoproj/argo-workflows/blob/master/docs/synchronization.md) strategy. Not specifying the `semaphoreStrategies` key will configure all semaphores to use the default strategy.

To use the rebalanced semaphore, a user should (but is not required to) specify a `rebalanceKey` inside of the `synchronization.semaphore` property:

```yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: rebalance-key-test-
spec:
arguments:
parameters:
- name: rebalancekey
value: "example-key-123"
entrypoint: rebalance-key-test-entrypoint
templates:
- name: rebalance-key-test-entrypoint
steps:
- - name: generate
template: gen-number-list
- - name: synchronization-acquire-lock
template: acquire-lock
withParam: "{{steps.generate.outputs.result}}"
- name: gen-number-list
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(0, 15)], sys.stdout)
- name: acquire-lock
synchronization:
semaphore:
configMapKeyRef:
name: my-config
key: template
rebalanceKey: "{{workflow.parameters.rebalancekey}}"
container:
image: alpine:latest
command: [sh, -c]
args: ["echo acquired lock; sleep $((20 + RANDOM % 11));"]
```

After setting up your config file as described above, run this workflow two times, each time using a different `rebalanceKey`:

Submit a workflow with `user-000`: `argo submit rebalance-key-test.yaml -p 'rebalancekey=user-000'`

Submit a workflow with `user-001`: `argo submit rebalance-key-test.yaml -p 'rebalancekey=user-001'`

At first, the workflow from the first submission should use both available locks. After one pod completes, notice that lock allocation converges so that each workflow has constant access to 1 lock until one workflow finishes entirely, at which point the remaining workflow has access to both locks.

### Assumptions

The following is a list of operational assumptions that a rebalanced semaphore user should understand before deciding to use this feature:

* A single workflow _can_ use many distinct semaphores using the rebalanced strategy.
* A single workflow _can not_ use one particular semaphore with two `rebalanceKey`s, even if they are used in two distinct locations within that workflow.
* A semaphore that is using a semaphore with the rebalanced strategy will ignore priority
* Workflow and Config file formats are backward compatible, meaning that existing workflows that do not want to use the rebalanced strategy need to make no changes.
114 changes: 114 additions & 0 deletions examples/rebalance-key.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Suppose we have some semaphore with a limit of 4 that we want to share equally amongst lock requesters
# with the same rebalanceKey (see below in this example)
#
# We'd first set up the semaphore test-scale with a limit, as described in
# [synchronization](https://github.com/argoproj/argo-workflows/blob/master/docs/synchronization.md)
#
# Next, configure the workflow-controller-configmap with rebalanced strategy:
#
# ```
# apiVersion: v1
# kind: ConfigMap
# metadata:
# name: workflow-controller-configmap
# data:
# semaphoreStrategies: |
# test-scale: "rebalanced"
# ```
#
# By setting `test-scale: "rebalanced"`, we're essentially telling the semaphore to continuously rebalance
# the number of locks that can be distributed to each unique "rebalanceKey"
#
# If we do not wish to "rebalance", and use the default semaphore behavior instead, we can either specify
# "default" or simply not include the lock name in "semaphoreStrategy".
#
# Note that when "rebalanced" is used, all lock requesters with no rebalanceKey specified will all be
# grouped together. Also note that priorities are currently ignored with semaphores using the rebalance
# strategy. Also note that rebalance key works anywhere a semaphore can be used (e.g. workflow level or
# virtual node (steps,dag) level)
#---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: rebalance-key-aaaa
spec:
activeDeadlineSeconds: 100000
entrypoint: synchronization-tmpl-level-example
serviceAccountName: argo
arguments:
parameters:
- name: num
value: 1
podGC:
strategy: OnPodSuccess
templates:
- name: synchronization-tmpl-level-example
steps:
- - name: generate
template: gen-number-list
- - name: synchronization-acquire-lock
template: acquire-lock
withParam: "{{steps.generate.outputs.result}}"

- name: gen-number-list
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(0, {{workflow.parameters.num}})], sys.stdout)
- name: acquire-lock
synchronization:
semaphore:
configMapKeyRef:
name: workflow-controller-configmap
key: test-scale
rebalanceKey: "rebalance-key-aaaa"
container:
image: alpine:latest
command: [sh, -c]
args: ["echo acquired lock; sleep $((20 + RANDOM % 11));"]
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: rebalance-key-bbbb
spec:
activeDeadlineSeconds: 100000
entrypoint: synchronization-tmpl-level-example
serviceAccountName: argo
arguments:
parameters:
- name: num
value: 1
podGC:
strategy: OnPodSuccess
templates:
- name: synchronization-tmpl-level-example
steps:
- - name: generate
template: gen-number-list
- - name: synchronization-acquire-lock
template: acquire-lock
withParam: "{{steps.generate.outputs.result}}"

- name: gen-number-list
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(0, {{workflow.parameters.num}})], sys.stdout)
- name: acquire-lock
synchronization:
semaphore:
configMapKeyRef:
name: workflow-controller-configmap
key: test-scale
rebalanceKey: "rebalance-key-bbbb"
container:
image: alpine:latest
command: [sh, -c]
args: ["echo acquired lock; sleep $((20 + RANDOM % 11));"]
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,8 @@ spec:
required:
- key
type: object
rebalanceKey:
type: string
type: object
type: object
templateDefaults:
Expand Down Expand Up @@ -9241,6 +9243,8 @@ spec:
required:
- key
type: object
rebalanceKey:
type: string
type: object
type: object
timeout:
Expand Down Expand Up @@ -17330,6 +17334,8 @@ spec:
required:
- key
type: object
rebalanceKey:
type: string
type: object
type: object
timeout:
Expand Down
6 changes: 6 additions & 0 deletions manifests/base/crds/full/argoproj.io_cronworkflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,8 @@ spec:
required:
- key
type: object
rebalanceKey:
type: string
type: object
type: object
templateDefaults:
Expand Down Expand Up @@ -9262,6 +9264,8 @@ spec:
required:
- key
type: object
rebalanceKey:
type: string
type: object
type: object
timeout:
Expand Down Expand Up @@ -17351,6 +17355,8 @@ spec:
required:
- key
type: object
rebalanceKey:
type: string
type: object
type: object
timeout:
Expand Down
Loading

0 comments on commit bac4b4b

Please sign in to comment.