Skip to content

Commit

Permalink
Preliminary v2 - Operator supporting updates to custom object now
Browse files Browse the repository at this point in the history
  • Loading branch information
gautamp8 committed Jun 22, 2020
1 parent 2912bce commit 4276873
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 2 deletions.
2 changes: 1 addition & 1 deletion deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def deploy_celery_workers(apps_api, namespace, spec, logger):
namespace=namespace,
app_name=spec['app_name'],
celery_app=spec['celery_app'],
worker_name=spec['worker_name'],
image=spec['image'],
worker_name=celery_config['worker_name'],
num_of_workers=celery_config['num_of_workers'],
queues=celery_config['queues'],
loglevel=celery_config['loglevel'],
Expand Down
71 changes: 71 additions & 0 deletions handlers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import os
import kopf
import kubernetes
from collections import namedtuple

from deployment_utils import (
deploy_celery_workers,
deploy_flower,
expose_flower_service
)
from update_utils import (
update_all_deployments,
update_celery_deployment,
update_flower_deployment
)


@kopf.on.create('grofers.com', 'v1', 'celeryapplications')
Expand Down Expand Up @@ -61,6 +67,71 @@ def create_fn(spec, name, namespace, logger, **kwargs):
}


@kopf.on.update('grofers.com', 'v1', 'celeryapplications')
def update_fn(spec, status, namespace, logger, **kwargs):
# TODO - app name still cannot be updated(Fix that)
diff = kwargs.get('diff')
modified_spec = get_modified_spec_object(diff)

api = kubernetes.client.CoreV1Api()
apps_api_instance = kubernetes.client.AppsV1Api()

if modified_spec.common_spec:
# if common spec was updated, need to update all deployments
return update_all_deployments(
api, apps_api_instance, spec, status, namespace
)
else:
result = {}
if modified_spec.celery_spec:
result.update({
'worker_deployment': update_celery_deployment(
apps_api_instance, spec, status, namespace
)
})

if modified_spec.flower_spec:
result.update({
'flower_deployment': update_flower_deployment(
apps_api_instance, spec, status, namespace
)
})
return result


def get_modified_spec_object(diff):
"""
@param: diff - arg provided by kopf when an object is updated
diff format - Tuple of (op, (fields tuple), old, new)
@returns ModifiedSpec namedtuple signifying which spec was updated
"""
common_spec_checklist = ['app_name', 'celery_app', 'image', 'worker_name']
celery_config_checklist = ['celery_config']
flower_config_checklist = ['flower_config']

common_spec_modified = False
celery_spec_modified = False
flower_spec_modified = False

# TODO - Optimize this loop maybe
for op, fields, old, new in diff:
if any(field in fields for field in common_spec_checklist):
common_spec_modified = True
if any(field in fields for field in celery_config_checklist):
celery_spec_modified = True
if any(field in fields for field in flower_config_checklist):
flower_spec_modified = True

# a namedtuple to give structure to which spec was updated
ModifiedSpec = namedtuple('ModifiedSpec', ['common_spec', 'celery_spec', 'flower_spec'])

return ModifiedSpec(
common_spec=common_spec_modified,
celery_spec=celery_spec_modified,
flower_spec=flower_spec_modified
)


def validate_stuff(spec):
"""
1. If the deployment/svc already exists, k8s throws error
Expand Down
117 changes: 117 additions & 0 deletions models/worker_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from dataclasses import dataclass
from typing import Any, List, TypeVar, Type, cast, Callable


T = TypeVar("T")


def from_str(x: Any) -> str:
assert isinstance(x, str)
return x


def to_class(c: Type[T], x: Any) -> dict:
assert isinstance(x, c)
return cast(Any, x).to_dict()


def from_list(f: Callable[[Any], T], x: Any) -> List[T]:
assert isinstance(x, list)
return [f(y) for y in x]


@dataclass
class Constraints:
cpu: str
memory: str

@staticmethod
def from_dict(obj: Any) -> 'Constraints':
assert isinstance(obj, dict)
cpu = from_str(obj.get("cpu"))
memory = from_str(obj.get("memory"))
return Constraints(cpu, memory)

def to_dict(self) -> dict:
result: dict = {}
result["cpu"] = from_str(self.cpu)
result["memory"] = from_str(self.memory)
return result


@dataclass
class Resources:
requests: Constraints
limits: Constraints

@staticmethod
def from_dict(obj: Any) -> 'Resources':
assert isinstance(obj, dict)
requests = Constraints.from_dict(obj.get("requests"))
limits = Constraints.from_dict(obj.get("limits"))
return Resources(requests, limits)

def to_dict(self) -> dict:
result: dict = {}
result["requests"] = to_class(Constraints, self.requests)
result["limits"] = to_class(Constraints, self.limits)
return result


@dataclass
class WorkerSpec:
args: List[str]
command: List[str]
image: str
name: str
resources: Resources

@staticmethod
def from_dict(obj: Any) -> 'WorkerSpec':
assert isinstance(obj, dict)
args = from_list(from_str, obj.get("args"))
command = from_list(from_str, obj.get("command"))
image = from_str(obj.get("image"))
name = from_str(obj.get("name"))
resources = Resources.from_dict(obj.get("resources"))
return WorkerSpec(args, command, image, name, resources)

def to_dict(self) -> dict:
result: dict = {}
result["args"] = from_list(from_str, self.args)
result["command"] = from_list(from_str, self.command)
result["image"] = from_str(self.image)
result["name"] = from_str(self.name)
result["resources"] = to_class(Resources, self.resources)
return result


def args_list_from_spec_params(
celery_app: str,
queues: str,
loglevel: str,
concurrency: int
) -> List[str]:
return [
f"--app={celery_app}",
"worker",
f"--queues={queues}",
f"--loglevel={loglevel}",
f"--concurrency={concurrency}"
]


def worker_spec_from_dict(s: Any) -> WorkerSpec:
return WorkerSpec.from_dict(s)


def worker_spec_to_dict(x: WorkerSpec) -> Any:
return to_class(WorkerSpec, x)

# To use this code, make sure you
#
# import json
#
# and then, to convert JSON from a string, do
#
# result = worker_spec_from_dict(json.loads(json_string))
2 changes: 1 addition & 1 deletion templates/example_obj.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ spec:
app_name: celery-crd-example
celery_app: 'app:celery_app'
image: example-image
worker_name: example-celery-worker
celery_config:
worker_name: example-celery-worker
num_of_workers: 2
queues: high_priority
loglevel: info
Expand Down
109 changes: 109 additions & 0 deletions update_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from models.worker_spec import (
args_list_from_spec_params
)


def update_all_deployments(api, apps_api_instance, spec, status, namespace):
return {
'worker_deployment': update_celery_deployment(
apps_api_instance, spec, status, namespace
),
'flower_deployment': update_flower_deployment(
apps_api_instance, spec, status, namespace
),
'flower_service': update_flower_service(
api, spec, status, namespace
)
}


def update_celery_deployment(apps_api_instance, spec, status, namespace):
celery_config = spec['celery_config']
worker_spec_dict = {
'args': args_list_from_spec_params(
celery_app=spec['celery_app'],
queues=celery_config['queues'],
loglevel=celery_config['loglevel'],
concurrency=celery_config['concurrency']
),
'command': ["celery"],
'image': spec['image'],
'name': spec['worker_name'],
'resources': celery_config['resources']
}

# JSON way of submitting spec to deploy/patch
patch_body = {
"spec": {
"replicas": celery_config['num_of_workers'],
"template": {
"spec": {
"containers": [
worker_spec_dict
]
}
}
}
}

deployment_name = status['create_fn']['children']['worker_deployment']
apps_api_instance.patch_namespaced_deployment(
deployment_name, namespace, patch_body
)

return deployment_name


def update_flower_deployment(apps_api_instance, spec, status, namespace):
flower_config = spec['flower_config']

flower_spec_dict = {
'args': [spec['celery_app']],
'command': ['flower'],
'image': spec['image'],
'name': f"{spec['worker_name']}-flower",
'ports': [
{"containerPort": 5555}
],
'resources': flower_config['resources']
}

# JSON way of submitting spec to deploy/patch
patch_body = {
"spec": {
"replicas": flower_config['replicas'],
"template": {
"spec": {
"containers": [
flower_spec_dict
]
}
}
}
}

deployment_name = status['create_fn']['children']['flower_deployment']
# TODO: Use a try catch here
apps_api_instance.patch_namespaced_deployment(
deployment_name, namespace, patch_body
)

return deployment_name


def update_flower_service(api, spec, status, namespace):
# Only app_name change will affect flower service
patch_body = {
"spec": {
"selector": {
"run": f"{spec['app_name']}-flower"
}
}
}

svc_name = status['create_fn']['children']['flower_service']
api.patch_namespaced_service(
svc_name, namespace, patch_body
)

return svc_name

0 comments on commit 4276873

Please sign in to comment.