From de584c92a55d7a22c4bfa966a02de3f43d166804 Mon Sep 17 00:00:00 2001 From: Gautam Prajapati Date: Wed, 8 Jul 2020 20:21:43 +0530 Subject: [PATCH] Fix update handler, stablize scaling algo --- constants.py | 17 ++++++++ handlers.py | 73 +++++++++++++++++++++------------- update_utils.py | 103 +++++++++++++++++++++++++++++++++++++----------- 3 files changed, 141 insertions(+), 52 deletions(-) create mode 100644 constants.py diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..6a97205 --- /dev/null +++ b/constants.py @@ -0,0 +1,17 @@ +# Native API Objects +DEPLOYMENT_KIND = 'Deployment' +SERVICE_KIND = 'Service' + +# Celery Worker Constants +WORKER_TYPE = 'worker' + + +# Flower Constants +FLOWER_TYPE = 'flower' + + +# Hander Status +STATUS_CREATED = 'CREATED' +STATUS_SUCCESS = 'SUCCESS' +STATUS_UPDATED = 'UPDATED' +STATUS_PATCHED = 'PATCHED' diff --git a/handlers.py b/handlers.py index 78db13b..68a6d81 100644 --- a/handlers.py +++ b/handlers.py @@ -2,6 +2,7 @@ import kopf import kubernetes import requests +import constants from math import ceil from collections import namedtuple @@ -12,7 +13,7 @@ ) from update_utils import ( update_all_deployments, - update_celery_deployment, + update_worker_deployment, update_flower_deployment ) @@ -20,12 +21,7 @@ @kopf.on.create('celeryproject.org', 'v1alpha1', 'celery') def create_fn(spec, name, namespace, logger, **kwargs): """ - TODO - - 1. Validate the spec for incoming obj - 2. Create a config-map for celery - 3. Instantiate a celery Deployment with the specified parameters - 4. Define and expolre a flower Service to keep a watch on those metrics - 5. Scale/Downscale on the basis of task queue length + Celery custom resource creation handler """ children_count = 0 @@ -56,33 +52,39 @@ def create_fn(spec, name, namespace, logger, **kwargs): children = [ { 'name': worker_deployment.metadata.name, - 'replicas': worker_deployment.spec.replicas + 'replicas': worker_deployment.spec.replicas, + 'kind': constants.DEPLOYMENT_KIND, + 'type': constants.WORKER_TYPE }, { 'name': flower_deployment.metadata.name, - 'replicas': flower_deployment.spec.replicas + 'replicas': flower_deployment.spec.replicas, + 'kind': constants.DEPLOYMENT_KIND, + 'type': constants.FLOWER_TYPE }, { 'name': flower_svc.metadata.name, - 'spec': flower_svc.spec.to_dict() + 'spec': flower_svc.spec.to_dict(), + 'kind': constants.SERVICE_KIND, + 'type': constants.FLOWER_TYPE } ] return { 'children': children, 'children_count': len(children), - 'status': "CREATED" + 'status': constants.STATUS_CREATED } @kopf.on.update('celeryproject.org', 'v1alpha1', 'celery') def update_fn(spec, status, namespace, logger, **kwargs): - # TODO - app name still cannot be updated(Fix that) diff = kwargs.get('diff') modified_spec = get_modified_spec_object(diff) api = kubernetes.client.CoreV1Api() apps_api_instance = kubernetes.client.AppsV1Api() + result = status.get('update_fn') or status.get('create_fn') if modified_spec.common_spec: # if common spec was updated, need to update all deployments @@ -90,19 +92,28 @@ def update_fn(spec, status, namespace, logger, **kwargs): api, apps_api_instance, spec, status, namespace ) else: - result = {} if modified_spec.worker_spec: - result.update({ - 'worker_deployment': update_celery_deployment( - apps_api_instance, spec, status, namespace - ) + # if worker spec was updated, just update worker deployments + worker_deployment = update_worker_deployment( + apps_api_instance, spec, status, namespace + ) + deployment_status = next(child for child in result.get('children') if child['type'] == constants.WORKER_TYPE) # NOQA + + deployment_status.update({ + 'name': worker_deployment.metadata.name, + 'replicas': worker_deployment.spec.replicas }) if modified_spec.flower_spec: - result.update({ - 'flower_deployment': update_flower_deployment( - apps_api_instance, spec, status, namespace - ) + # if flower spec was updated, just update flower deployments + flower_deployment = update_flower_deployment( + apps_api_instance, spec, status, namespace + ) + deployment_status = next(child for child in result.get('children') if child['type'] == constants.FLOWER_TYPE) # NOQA + + deployment_status.update({ + 'name': flower_deployment.metadata.name, + 'replicas': flower_deployment.spec.replicas }) return result @@ -149,9 +160,9 @@ def check_flower_label(value, spec, **_): @kopf.timer('celeryproject.org', 'v1alpha1', 'celery', - initial_delay=5, interval=10, idle=10) + initial_delay=50000, interval=100000, idle=10) def message_queue_length(spec, status, **kwargs): - flower_svc_host = "http://192.168.64.2:31737" + flower_svc_host = "http://192.168.64.2:32289" url = f"{flower_svc_host}/api/queues/length" response = requests.get(url=url) if response.status_code == 200: @@ -170,11 +181,11 @@ def get_current_replicas(child_name, status): def get_current_queue_len(child_name, status): - for queue in status.get('message_queue_length'): + for queue in status.get('message_queue_length', []): if queue.get('name') == child_name: return queue.get('messages') - return None + return 0 @kopf.on.field('celeryproject.org', 'v1alpha1', 'celery', @@ -192,9 +203,15 @@ def horizontal_autoscale(spec, status, namespace, **kwargs): queue_name = spec['workerSpec']['queues'] current_queue_length = get_current_queue_len(queue_name, status) avg_queue_length = scaling_target['metrics'][0].get('target').get('averageValue') - updated_num_of_replicas = ceil( - current_replicas * (current_queue_length / avg_queue_length) - ) or min_replicas + updated_num_of_replicas = min( + max( + ceil( + current_replicas * (current_queue_length / avg_queue_length) + ), + min_replicas + ), + max_replicas + ) patch_body = { "spec": { diff --git a/update_utils.py b/update_utils.py index a1acd6d..79f332b 100644 --- a/update_utils.py +++ b/update_utils.py @@ -1,23 +1,77 @@ +import constants from models.worker_spec import ( args_list_from_spec_params ) def update_all_deployments(api, apps_api_instance, spec, status, namespace): + worker_deployment = update_worker_deployment( + apps_api_instance, spec, status, namespace + ) + + flower_deployment = update_flower_deployment( + apps_api_instance, spec, status, namespace + ) + + flower_svc = update_flower_service( + api, spec, status, namespace + ) + + children = [ + { + 'name': worker_deployment.metadata.name, + 'replicas': worker_deployment.spec.replicas, + 'kind': constants.DEPLOYMENT_KIND, + 'type': constants.WORKER_TYPE + }, + { + 'name': flower_deployment.metadata.name, + 'replicas': flower_deployment.spec.replicas, + 'kind': constants.DEPLOYMENT_KIND, + 'type': constants.FLOWER_TYPE + }, + { + 'name': flower_svc.metadata.name, + 'spec': flower_svc.spec.to_dict(), + 'kind': constants.SERVICE_KIND, + 'type': constants.FLOWER_TYPE + } + ] + return { - 'worker_deployment': update_celery_deployment( - apps_api_instance, spec, status, namespace - ), - 'flower_deployment': update_flower_deployment( - apps_api_instance, spec, status, namespace - ), - 'flower_service': update_flower_service( - api, spec, status, namespace - ) + 'children': children, + 'children_count': len(children), + 'status': constants.STATUS_UPDATED } -def update_celery_deployment(apps_api_instance, spec, status, namespace): +def get_curr_deployment_from_handler_status(handler_name, status, child_type): + """ + Get current deployment name from handler's status + @param: handler_name - which handler to get from + @param: child_type - worker or flower + @returns: current deployment name + """ + for child in status.get(handler_name).get('children'): + if child.get('type') == child_type and child.get('kind') == constants.DEPLOYMENT_KIND: # NOQA + return child.get('name') + + return None + + +def get_curr_deployment_name(status, child_type): + """ + Get current deployment name from parent's status + @param: child_type - worker or flower + @returns: current deployment name + """ + if status.get('update_fn'): + return get_curr_deployment_from_handler_status('update_fn', status, child_type) + + return get_curr_deployment_from_handler_status('create_fn', status, child_type) + + +def update_worker_deployment(apps_api_instance, spec, status, namespace): worker_spec = spec['workerSpec'] worker_spec_dict = { 'args': args_list_from_spec_params( @@ -46,12 +100,13 @@ def update_celery_deployment(apps_api_instance, spec, status, namespace): } } - deployment_name = status['create_fn']['children']['worker_deployment'] - apps_api_instance.patch_namespaced_deployment( - deployment_name, namespace, patch_body + worker_deployment_name = get_curr_deployment_name( + status, constants.WORKER_TYPE ) - return deployment_name + return apps_api_instance.patch_namespaced_deployment( + worker_deployment_name, namespace, patch_body + ) def update_flower_deployment(apps_api_instance, spec, status, namespace): @@ -82,13 +137,13 @@ def update_flower_deployment(apps_api_instance, spec, status, namespace): } } - deployment_name = status['create_fn']['children']['flower_deployment'] - # TODO: Use a try catch here - apps_api_instance.patch_namespaced_deployment( - deployment_name, namespace, patch_body + flower_deployment_name = get_curr_deployment_name( + status, constants.FLOWER_TYPE ) - return deployment_name + return apps_api_instance.patch_namespaced_deployment( + flower_deployment_name, namespace, patch_body + ) def update_flower_service(api, spec, status, namespace): @@ -101,9 +156,9 @@ def update_flower_service(api, spec, status, namespace): } } - svc_name = status['create_fn']['children']['flower_service'] - api.patch_namespaced_service( - svc_name, namespace, patch_body + flower_svc_name = get_curr_deployment_name( + status, constants.FLOWER_TYPE + ) # flower svc is named same as flower deployment + return api.patch_namespaced_service( + flower_svc_name, namespace, patch_body ) - - return svc_name