diff --git a/templates/example_obj.yaml b/deploy/cr.yaml similarity index 55% rename from templates/example_obj.yaml rename to deploy/cr.yaml index 93061a2..c607be7 100644 --- a/templates/example_obj.yaml +++ b/deploy/cr.yaml @@ -1,16 +1,16 @@ -apiVersion: grofers.com/v1 -kind: CeleryApplication +apiVersion: celeryproject.org/v1alpha1 +kind: Celery metadata: name: example-celery-obj spec: - app_name: celery-crd-example - celery_app: 'app:celery_app' - image: example-image - celery_config: - worker_name: example-celery-worker - num_of_workers: 2 - queues: high_priority - loglevel: info + common: + appName: celery-crd-example + celeryApp: 'app:celery_app' + image: example-image + workerSpec: + numOfWorkers: 2 + queues: celery # default queue name + logLevel: debug concurrency: 2 resources: requests: @@ -19,7 +19,7 @@ spec: limits: cpu: "200m" memory: "128Mi" - flower_config: + flowerSpec: replicas: 1 resources: requests: diff --git a/templates/crd.yaml b/deploy/crd.yaml similarity index 65% rename from templates/crd.yaml rename to deploy/crd.yaml index 04c14d4..e8651c9 100644 --- a/templates/crd.yaml +++ b/deploy/crd.yaml @@ -1,49 +1,58 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: - name: celeryapplications.grofers.com + name: celery.celeryproject.org spec: scope: Namespaced - group: grofers.com + group: celeryproject.org names: - kind: CeleryApplication - plural: celeryapplications - singular: celeryapplication + kind: Celery + listKind: CeleryList + plural: celery + singular: celery shortNames: - - celery - - capps + - cel - capp - versions: - - name: v1 + versions: + - name: v1alpha1 served: true storage: true schema: openAPIV3Schema: type: object + required: ["spec"] properties: spec: + description: "spec defines the desired state and params for celery cluster" type: object properties: - app_name: - type: string - celery_app: - type: string - image: - type: string - worker_name: - type: string - celery_config: + common: + description: "common configuration parameters for all worker and flower deployments" + required: ["appName", "celeryApp", "image"] type: object properties: - num_of_workers: + image: + description: "container image name to run in the worker and flower deployments" + type: string + appName: + description: "app name for worker and flower deployments, will be suffixed accordingly" + type: string + celeryApp: + description: "celery app instance to use (e.g. module.celery_app_attr_name)" + type: string + workerSpec: + description: "worker deployment specific parameters" + type: object + properties: + numOfWorkers: type: integer queues: type: string - loglevel: + logLevel: type: string concurrency: type: integer - max_tasks_per_child: + maxTasksPerChild: type: integer resources: type: object @@ -63,7 +72,8 @@ spec: memory: type: string x-kubernetes-preserve-unknown-fields: true - flower_config: + flowerSpec: + description: "flower deployment specific parameters" type: object properties: replicas: @@ -88,7 +98,7 @@ spec: x-kubernetes-preserve-unknown-fields: true status: type: object - x-kubernetes-preserve-unknown-fields: true + x-kubernetes-preserve-unknown-fields: true additionalPrinterColumns: - name: Children type: string diff --git a/deployment_utils.py b/deployment_utils.py index cca7f1b..87794c8 100644 --- a/deployment_utils.py +++ b/deployment_utils.py @@ -10,19 +10,18 @@ def deploy_celery_workers(apps_api, namespace, spec, logger): ) tmpl = open(path, 'rt').read() - celery_config = spec['celery_config'] + celery_config = spec['workerSpec'] req_resources = celery_config['resources']['requests'] lim_resources = celery_config['resources']['limits'] text = tmpl.format( namespace=namespace, - app_name=spec['app_name'], - celery_app=spec['celery_app'], - image=spec['image'], - worker_name=celery_config['worker_name'], - num_of_workers=celery_config['num_of_workers'], + app_name=spec['common']['appName'], + celery_app=spec['common']['celeryApp'], + image=spec['common']['image'], + num_of_workers=celery_config['numOfWorkers'], queues=celery_config['queues'], - loglevel=celery_config['loglevel'], + loglevel=celery_config['logLevel'], concurrency=celery_config['concurrency'], lim_cpu=lim_resources['cpu'], lim_mem=lim_resources['memory'], @@ -52,14 +51,14 @@ def deploy_flower(apps_api, namespace, spec, logger): ) tmpl = open(path, 'rt').read() - flower_config = spec['flower_config'] + flower_config = spec['flowerSpec'] req_resources = flower_config['resources']['requests'] lim_resources = flower_config['resources']['limits'] text = tmpl.format( namespace=namespace, - app_name=spec['app_name'], - celery_app=spec['celery_app'], - image=spec['image'], + app_name=spec['common']['appName'], + celery_app=spec['common']['celeryApp'], + image=spec['common']['image'], replicas=flower_config['replicas'], lim_cpu=lim_resources['cpu'], lim_mem=lim_resources['memory'], @@ -91,7 +90,7 @@ def expose_flower_service(api, namespace, spec, logger): text = tmpl.format( namespace=namespace, - app_name=spec['app_name'] + app_name=spec['common']['appName'] ) data = yaml.safe_load(text) mark_as_child(data) diff --git a/example/Dockerfile b/example/Dockerfile index 982000c..4495c0e 100644 --- a/example/Dockerfile +++ b/example/Dockerfile @@ -9,4 +9,4 @@ COPY . . WORKDIR example -CMD [ "python", "app.py" ] +CMD [ "python", "run_task.py" ] diff --git a/example/app.py b/example/app.py index 8706eac..1742fed 100644 --- a/example/app.py +++ b/example/app.py @@ -30,7 +30,3 @@ def __call__(self, *args, **kwargs): @celery_app.task() def add(a, b): return a + b - - -if __name__ == '__main__': - add.delay(4, 5) diff --git a/example/run_task.py b/example/run_task.py new file mode 100644 index 0000000..91ddde2 --- /dev/null +++ b/example/run_task.py @@ -0,0 +1,10 @@ +from app import add + +i = 0 +while True: + add.delay(4, 5) + add.delay(10, 20) + add.delay(100, 20) + i += 1 + if i == 1000: + break diff --git a/handlers.py b/handlers.py index f7aa017..138c8e5 100644 --- a/handlers.py +++ b/handlers.py @@ -15,7 +15,7 @@ ) -@kopf.on.create('grofers.com', 'v1', 'celeryapplications') +@kopf.on.create('celeryproject.org', 'v1alpha1', 'celery') def create_fn(spec, name, namespace, logger, **kwargs): """ TODO - @@ -67,7 +67,7 @@ def create_fn(spec, name, namespace, logger, **kwargs): } -@kopf.on.update('grofers.com', 'v1', 'celeryapplications') +@kopf.on.update('celeryproject.org', 'v1alpha1', 'celery') def update_fn(spec, status, namespace, logger, **kwargs): # TODO - app name still cannot be updated(Fix that) diff = kwargs.get('diff') @@ -83,7 +83,7 @@ def update_fn(spec, status, namespace, logger, **kwargs): ) else: result = {} - if modified_spec.celery_spec: + if modified_spec.worker_spec: result.update({ 'worker_deployment': update_celery_deployment( apps_api_instance, spec, status, namespace @@ -105,9 +105,9 @@ def get_modified_spec_object(diff): diff format - Tuple of (op, (fields tuple), old, new) @returns ModifiedSpec namedtuple signifying which spec was updated """ - common_spec_checklist = ['app_name', 'celery_app', 'image', 'worker_name'] - celery_config_checklist = ['celery_config'] - flower_config_checklist = ['flower_config'] + common_spec_checklist = ['appName', 'celeryApp', 'image'] + celery_config_checklist = ['workerSpec'] + flower_config_checklist = ['flowerSpec'] common_spec_modified = False celery_spec_modified = False @@ -123,11 +123,11 @@ def get_modified_spec_object(diff): flower_spec_modified = True # a namedtuple to give structure to which spec was updated - ModifiedSpec = namedtuple('ModifiedSpec', ['common_spec', 'celery_spec', 'flower_spec']) + ModifiedSpec = namedtuple('ModifiedSpec', ['common_spec', 'worker_spec', 'flower_spec']) return ModifiedSpec( common_spec=common_spec_modified, - celery_spec=celery_spec_modified, + worker_spec=celery_spec_modified, flower_spec=flower_spec_modified ) diff --git a/templates/deployments/celery_worker_deployment.yaml b/templates/deployments/celery_worker_deployment.yaml index d96b229..2e9fb04 100644 --- a/templates/deployments/celery_worker_deployment.yaml +++ b/templates/deployments/celery_worker_deployment.yaml @@ -4,7 +4,7 @@ metadata: labels: app: {app_name} celery: "true" - name: {worker_name} + name: {app_name}-celery-worker namespace: {namespace} spec: minReadySeconds: 10 @@ -23,7 +23,7 @@ spec: app: {app_name} spec: containers: - - name: {worker_name} + - name: {app_name}-celery-worker image: {image} imagePullPolicy: Never command: ["celery"] diff --git a/templates/static/flask-example.yaml b/templates/static/flask-example.yaml new file mode 100644 index 0000000..b9d85c1 --- /dev/null +++ b/templates/static/flask-example.yaml @@ -0,0 +1,36 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: my-flask-app + name: my-flask-app + namespace: default +spec: + minReadySeconds: 5 + progressDeadlineSeconds: 600 + replicas: 1 + selector: + matchLabels: + app: my-flask-app + strategy: + rollingUpdate: + maxSurge: 20% + maxUnavailable: 0% + type: RollingUpdate + template: + metadata: + creationTimestamp: null + labels: + app: my-flask-app + spec: + containers: + - name: my-flask-app + image: example-image + imagePullPolicy: Never + resources: + requests: + cpu: "100m" + memory: "64Mi" + limits: + cpu: "200m" + memory: "128Mi" \ No newline at end of file diff --git a/templates/static/redis-master.yaml b/templates/static/redis-master.yaml index 1b4614f..60b342a 100644 --- a/templates/static/redis-master.yaml +++ b/templates/static/redis-master.yaml @@ -40,6 +40,7 @@ metadata: role: master tier: backend spec: + type: NodePort ports: - port: 6379 # Map incoming connections on port 6379 to the target port 6379 of the Pod targetPort: 6379 diff --git a/update_utils.py b/update_utils.py index 1359d88..a1acd6d 100644 --- a/update_utils.py +++ b/update_utils.py @@ -18,24 +18,24 @@ def update_all_deployments(api, apps_api_instance, spec, status, namespace): def update_celery_deployment(apps_api_instance, spec, status, namespace): - celery_config = spec['celery_config'] + worker_spec = spec['workerSpec'] worker_spec_dict = { 'args': args_list_from_spec_params( - celery_app=spec['celery_app'], - queues=celery_config['queues'], - loglevel=celery_config['loglevel'], - concurrency=celery_config['concurrency'] + celery_app=spec['common']['celeryApp'], + queues=worker_spec['queues'], + loglevel=worker_spec['logLevel'], + concurrency=worker_spec['concurrency'] ), 'command': ["celery"], - 'image': spec['image'], - 'name': spec['worker_name'], - 'resources': celery_config['resources'] + 'image': spec['common']['image'], + 'name': f"{spec['common']['appName']}-celery-worker", + 'resources': worker_spec['resources'] } # JSON way of submitting spec to deploy/patch patch_body = { "spec": { - "replicas": celery_config['num_of_workers'], + "replicas": worker_spec['numOfWorkers'], "template": { "spec": { "containers": [ @@ -55,23 +55,23 @@ def update_celery_deployment(apps_api_instance, spec, status, namespace): def update_flower_deployment(apps_api_instance, spec, status, namespace): - flower_config = spec['flower_config'] + flower_spec = spec['flowerSpec'] flower_spec_dict = { - 'args': [spec['celery_app']], + 'args': [spec['common']['celeryApp']], 'command': ['flower'], - 'image': spec['image'], - 'name': f"{spec['worker_name']}-flower", + 'image': spec['common']['image'], + 'name': f"{spec['common']['appName']}-flower", 'ports': [ {"containerPort": 5555} ], - 'resources': flower_config['resources'] + 'resources': flower_spec['resources'] } # JSON way of submitting spec to deploy/patch patch_body = { "spec": { - "replicas": flower_config['replicas'], + "replicas": flower_spec['replicas'], "template": { "spec": { "containers": [ @@ -96,7 +96,7 @@ def update_flower_service(api, spec, status, namespace): patch_body = { "spec": { "selector": { - "run": f"{spec['app_name']}-flower" + "run": f"{spec['common']['appName']}-flower" } } }