From b704490b93ef3bb5b0bfcbabc6db3cc81f88aca8 Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 17:20:59 -0600 Subject: [PATCH 1/8] feat: upgrade kubernetes client from 11.0.0 -> 24.2.0 --- requirements.txt | 5 +++-- src/kubejob.py | 22 +++++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/requirements.txt b/requirements.txt index 7da5a3b..2bed77b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ # kubernetes v12.0.0 breaks the kubejob.py code -kubernetes == 11.0.0 +# kubernetes == 11.0.0 +kubernetes == 24.2.0 tornado == 6.1 pyjson == 1.3.0 pyyaml == 5.4.1 jinja2 == 3.0.1 -requests == 2.26 +requests bcrypt == 3.2.0 pyjwt == 1.7.1 mysql-connector-python-rf == 2.2.2 diff --git a/src/kubejob.py b/src/kubejob.py index 4da30d9..55c77ff 100644 --- a/src/kubejob.py +++ b/src/kubejob.py @@ -1,3 +1,5 @@ +import sys + import global_vars from global_vars import config, log from kubernetes import client, config as kubeconfig @@ -13,12 +15,22 @@ ## Load Kubernetes cluster config. Unhandled exception if not in Kubernetes environment. try: - kubeconfig.load_kube_config(config_file=config['server']['kubeconfig']) -except: kubeconfig.load_incluster_config() -configuration = client.Configuration() -api_batch_v1 = client.BatchV1Api(client.ApiClient(configuration)) -api_v1 = client.CoreV1Api(client.ApiClient(configuration)) + log.info('Successfully loaded in-cluster config!') +except Exception as e1: + log.error('In-cluster config failed: ', e1) + config_file_path = config['server']['kubeconfig'] + log.info('Falling back to provided kubeconfig path: ', config_file_path) + try: + kubeconfig.load_kube_config(config_file=config_file_path) + except Exception as e2: + log.fatal('Failed to get any cluster config: ', e2) + sys.exit(1) +#configuration = client.Configuration() +#api_batch_v1 = client.BatchV1Api(client.ApiClient(configuration)) +#api_v1 = client.CoreV1Api(client.ApiClient(configuration)) +api_batch_v1 = client.BatchV1Api() +api_v1 = client.CoreV1Api() def get_namespace(): # When running in a pod, the namespace should be determined automatically, From 604fd7635b1e3c76e50ffb7352251a0dd1a1193a Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 17:27:08 -0600 Subject: [PATCH 2/8] Update requirements.txt --- requirements.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 2bed77b..692fb52 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ -# kubernetes v12.0.0 breaks the kubejob.py code -# kubernetes == 11.0.0 kubernetes == 24.2.0 tornado == 6.1 pyjson == 1.3.0 From 11189b7c6052f663bc290b04bdd787249e7d9603 Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 17:49:03 -0600 Subject: [PATCH 3/8] fix: supposedly resource_version isnt needed for 410: Gone anymore? --- src/kubewatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kubewatcher.py b/src/kubewatcher.py index 722c09c..7f4831c 100644 --- a/src/kubewatcher.py +++ b/src/kubewatcher.py @@ -73,8 +73,8 @@ def run(self): # Resource version is used to keep track of stream progress (in case of resume) k8s_event_stream = w.stream(func=kubejob.api_batch_v1.list_namespaced_job, namespace=kubejob.get_namespace(), - timeout_seconds=timeout_seconds, - resource_version=resource_version) + #resource_version=resource_version, + timeout_seconds=timeout_seconds) self.logger.info('KubeWatcher connected!') From 7fb704605f6ba6281a75bc1781c176a829735b2d Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 18:09:52 -0600 Subject: [PATCH 4/8] fix: use List+Watch pattern for setting resource_version --- src/kubewatcher.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/kubewatcher.py b/src/kubewatcher.py index 7f4831c..828e0da 100644 --- a/src/kubewatcher.py +++ b/src/kubewatcher.py @@ -1,3 +1,4 @@ +import json import time import threading @@ -61,7 +62,6 @@ def run(self): self.logger.info('KubeWatcher looking for required labels: ' + str(required_labels)) timeout_seconds = 0 - resource_version = '' k8s_event_stream = None w = watch.Watch() @@ -70,10 +70,15 @@ def run(self): time.sleep(1) self.logger.info('KubeWatcher is connecting...') try: - # Resource version is used to keep track of stream progress (in case of resume) + # List all pods in watched namespace to get resource_version + namespaced_jobs = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace()) + resource_version = namespaced_jobs['metadata']['resource_version'] if 'metadata' in namespaced_jobs and 'resource_version' in namespaced_jobs['metadata'] else '' + + # Then, watch for new events using the most recent resource_version + # Resource version is used to keep track of stream progress (in case of resume/retry) k8s_event_stream = w.stream(func=kubejob.api_batch_v1.list_namespaced_job, namespace=kubejob.get_namespace(), - #resource_version=resource_version, + resource_version=resource_version, timeout_seconds=timeout_seconds) self.logger.info('KubeWatcher connected!') From b1b99a61f6137c2d1efcd002e11e1fb69a62ae55 Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 18:20:27 -0600 Subject: [PATCH 5/8] fix: keep outer resource_version for reset logic / handling 410: Gone --- src/kubewatcher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/kubewatcher.py b/src/kubewatcher.py index 828e0da..75878a5 100644 --- a/src/kubewatcher.py +++ b/src/kubewatcher.py @@ -62,6 +62,7 @@ def run(self): self.logger.info('KubeWatcher looking for required labels: ' + str(required_labels)) timeout_seconds = 0 + resource_version = '' k8s_event_stream = None w = watch.Watch() @@ -72,7 +73,7 @@ def run(self): try: # List all pods in watched namespace to get resource_version namespaced_jobs = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace()) - resource_version = namespaced_jobs['metadata']['resource_version'] if 'metadata' in namespaced_jobs and 'resource_version' in namespaced_jobs['metadata'] else '' + resource_version = namespaced_jobs['metadata']['resource_version'] if 'metadata' in namespaced_jobs and 'resource_version' in namespaced_jobs['metadata'] else resource_version # Then, watch for new events using the most recent resource_version # Resource version is used to keep track of stream progress (in case of resume/retry) @@ -152,7 +153,7 @@ def run(self): k8s_event_stream = None if e.status == 410: # Resource too old - resource_version = None + resource_version = '' self.logger.warning("Resource too old (410) - reconnecting: " + str(e)) time.sleep(2) continue From 4f199b85e82df03d41a56f8ded866ed7be71a12c Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 18:26:14 -0600 Subject: [PATCH 6/8] fix: syntax error --- src/kubewatcher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/kubewatcher.py b/src/kubewatcher.py index 75878a5..9be3db7 100644 --- a/src/kubewatcher.py +++ b/src/kubewatcher.py @@ -3,6 +3,7 @@ import threading from kubernetes import watch, config as kubeconfig +from kubernetes.client import V1JobList from kubernetes.client.rest import ApiException from requests import HTTPError @@ -72,8 +73,8 @@ def run(self): self.logger.info('KubeWatcher is connecting...') try: # List all pods in watched namespace to get resource_version - namespaced_jobs = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace()) - resource_version = namespaced_jobs['metadata']['resource_version'] if 'metadata' in namespaced_jobs and 'resource_version' in namespaced_jobs['metadata'] else resource_version + namespaced_jobs: V1JobList = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace()) + resource_version = namespaced_jobs.metadata['resource_version'] if 'resource_version' in namespaced_jobs.metadata else resource_version # Then, watch for new events using the most recent resource_version # Resource version is used to keep track of stream progress (in case of resume/retry) From 741d16a41a37c98bfcf21523f0345b1d58aef6a0 Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 18 Dec 2023 18:35:16 -0600 Subject: [PATCH 7/8] fix: one more syntax error --- src/kubewatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kubewatcher.py b/src/kubewatcher.py index 9be3db7..f6d8f4f 100644 --- a/src/kubewatcher.py +++ b/src/kubewatcher.py @@ -74,7 +74,7 @@ def run(self): try: # List all pods in watched namespace to get resource_version namespaced_jobs: V1JobList = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace()) - resource_version = namespaced_jobs.metadata['resource_version'] if 'resource_version' in namespaced_jobs.metadata else resource_version + resource_version = namespaced_jobs.metadata.resource_version if namespaced_jobs.metadata.resource_version else resource_version # Then, watch for new events using the most recent resource_version # Resource version is used to keep track of stream progress (in case of resume/retry) From b3c384cea4c5691249075c9c14b84440b81465cc Mon Sep 17 00:00:00 2001 From: Sara Lambert Date: Mon, 8 Jan 2024 17:12:52 -0600 Subject: [PATCH 8/8] Update values.prod.yaml --- chart/values.prod.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/chart/values.prod.yaml b/chart/values.prod.yaml index ec30345..b0c6551 100644 --- a/chart/values.prod.yaml +++ b/chart/values.prod.yaml @@ -1,5 +1,5 @@ ingress: - hostname: jobmgr.mmli1.ncsa.illinois.edu + hostname: jobmgr.platform.moleculemaker.org tls: true annotations: cert-manager.io/cluster-issuer: letsencrypt-production @@ -117,7 +117,8 @@ config: server: protocol: "https" ## API hostname. Must match the ingress.hostname value. - hostName: "jobmgr.mmli1.ncsa.illinois.edu" + ## Suffix must match CLEAN + MOLLI prod for user auth to work + hostName: "jobmgr.platform.moleculemaker.org" namespace: "alphasynthesis" oauth: userInfoUrl: "http://oauth2-proxy.oauth2-proxy.svc.cluster.local/oauth2/userinfo"