Skip to content

Commit

Permalink
Implement scaling of workers based on queue length(v0)
Browse files Browse the repository at this point in the history
  • Loading branch information
gautamp8 committed Jul 6, 2020
1 parent 89f2471 commit 93b47b6
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 27 deletions.
11 changes: 10 additions & 1 deletion deploy/cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,13 @@ spec:
memory: "64Mi"
limits:
cpu: "200m"
memory: "128Mi"
memory: "128Mi"
scaleTargetRef:
- kind: worker
minReplicas: 2
maxReplicas: 5
metrics:
- name: message_queue
target:
type: length
averageValue: 100
33 changes: 33 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,39 @@ spec:
memory:
type: string
x-kubernetes-preserve-unknown-fields: true
scaleTargetRef:
description: "auto scaling targets"
type: array
items:
type: object
properties:
kind:
description: "target of which kind (e.g worker, flower)"
type: string
minReplicas:
description: "minimum number of replicas to keep"
type: integer
maxReplicas:
description: "maximum number of replicas to keep"
type: integer
metrics:
description: "specify metrics to scale/downscale the number of workers"
type: array
items:
type: object
properties:
name:
description: "name of metric. (e.g. message_queue)"
type: string
target:
type: object
properties:
type:
description: "target metric type. (e.g. length)"
type: string
averageValue:
description: "average metric value to maintain"
type: integer
status:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
17 changes: 7 additions & 10 deletions deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ def deploy_celery_workers(apps_api, namespace, spec, logger):
namespace=namespace,
body=data
)
deployment_name = deployed_obj.metadata.name

logger.info(
f"Deployment for celery workers successfully created with name: %s",
deployment_name
deployed_obj.metadata.name
)

return deployment_name
return deployed_obj


def deploy_flower(apps_api, namespace, spec, logger):
Expand Down Expand Up @@ -72,13 +72,12 @@ def deploy_flower(apps_api, namespace, spec, logger):
namespace=namespace,
body=data
)
deployment_name = deployed_obj.metadata.name
logger.info(
f"Deployment for celery flower successfully created with name: %s",
deployment_name
deployed_obj.metadata.name
)

return deployment_name
return deployed_obj


def expose_flower_service(api, namespace, spec, logger):
Expand All @@ -99,13 +98,11 @@ def expose_flower_service(api, namespace, spec, logger):
namespace=namespace,
body=data
)
flower_svc_name = svc_obj.metadata.name
logger.info(
f"Flower service successfully created with name: %s",
flower_svc_name
svc_obj.metadata.name
)

return flower_svc_name
return svc_obj


def mark_as_child(data):
Expand Down
111 changes: 96 additions & 15 deletions handlers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import kopf
import kubernetes
import requests
from math import ceil
from collections import namedtuple

from deployment_utils import (
Expand All @@ -25,9 +27,7 @@ def create_fn(spec, name, namespace, logger, **kwargs):
4. Define and expolre a flower Service to keep a watch on those metrics
5. Scale/Downscale on the basis of task queue length
"""
result = {}
children_count = 0
status = 'Creating'

# 1. Validation of spec
val, err_msg = validate_spec(spec)
Expand All @@ -39,31 +39,39 @@ def create_fn(spec, name, namespace, logger, **kwargs):
apps_api_instance = kubernetes.client.AppsV1Api()

# 2. Deployment for celery workers
worker_deployment_name = deploy_celery_workers(
worker_deployment = deploy_celery_workers(
apps_api_instance, namespace, spec, logger
)
result.update({'worker_deployment': worker_deployment_name})
children_count += 1

# 3. Deployment for flower
flower_deployment_name = deploy_flower(
flower_deployment = deploy_flower(
apps_api_instance, namespace, spec, logger
)
result.update({'flower_deployment': flower_deployment_name})
children_count += 1

# 4. Expose flower service
flower_svc_name = expose_flower_service(
flower_svc = expose_flower_service(
api, namespace, spec, logger
)
result.update({'flower_service': flower_svc_name})
children_count += 1
status = 'Success'

children = [
{
'name': worker_deployment.metadata.name,
'replicas': worker_deployment.spec.replicas
},
{
'name': flower_deployment.metadata.name,
'replicas': flower_deployment.spec.replicas
},
{
'name': flower_svc.metadata.name,
'spec': flower_svc.spec.to_dict()
}
]

return {
'status': status,
'children_count': children_count,
'children': result
'children': children,
'children_count': len(children),
'status': "CREATED"
}


Expand Down Expand Up @@ -132,6 +140,79 @@ def get_modified_spec_object(diff):
)


def check_flower_label(value, spec, **_):
"""
Checks if incoming label value is the one assigned to
flower service and deployment
"""
return value == f"{spec['common']['appName']}-flower"


@kopf.timer('celeryproject.org', 'v1alpha1', 'celery',
initial_delay=5, interval=10, idle=10)
def message_queue_length(spec, status, **kwargs):
flower_svc_host = "http://192.168.64.2:31737"
url = f"{flower_svc_host}/api/queues/length"
response = requests.get(url=url)
if response.status_code == 200:
return response.json().get('active_queues')

return {
"queue_length": None
}


def get_current_replicas(child_name, status):
children = status.get('create_fn').get('children')
for child in children:
if child.get('name') == child_name:
return child.get('replicas')


def get_current_queue_len(child_name, status):
for queue in status.get('message_queue_length'):
if queue.get('name') == child_name:
return queue.get('messages')

return None


@kopf.on.field('celeryproject.org', 'v1alpha1', 'celery',
field='status.message_queue_length')
def horizontal_autoscale(spec, status, namespace, **kwargs):
worker_deployment_name = f"{spec['common']['appName']}-celery-worker"
current_replicas = get_current_replicas(worker_deployment_name, status)
updated_num_of_replicas = current_replicas
scaling_targets = spec['scaleTargetRef']
for scaling_target in scaling_targets:
# For now we only support 1 i.e message queue length
if scaling_target.get('kind') == 'worker':
min_replicas = scaling_target.get('minReplicas', spec['workerSpec']['numOfWorkers'])
max_replicas = scaling_target.get('maxReplicas')
queue_name = spec['workerSpec']['queues']
current_queue_length = get_current_queue_len(queue_name, status)
avg_queue_length = scaling_target['metrics'][0].get('target').get('averageValue')
updated_num_of_replicas = ceil(
current_replicas * (current_queue_length / avg_queue_length)
) or min_replicas

patch_body = {
"spec": {
"replicas": updated_num_of_replicas,
}
}

apps_api_instance = kubernetes.client.AppsV1Api()
updated_deployment = apps_api_instance.patch_namespaced_deployment(
worker_deployment_name, namespace, patch_body
)

return {
'deploymentName': updated_deployment.metadata.name,
'updated_num_of_replicas': updated_num_of_replicas
}


def validate_stuff(spec):
"""
1. If the deployment/svc already exists, k8s throws error
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ itsdangerous==1.1.0
jedi==0.17.0
Jinja2==2.11.2
kombu==4.6.10
kopf==0.26
kopf==0.27
kubernetes==11.0.0
MarkupSafe==1.1.1
multidict==4.7.6
Expand Down

0 comments on commit 93b47b6

Please sign in to comment.