Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade kubernetes client from 11.0.0 -> 24.2.0, implement List+Watch in KubeWatcher #32

Merged
merged 9 commits into from
Jan 8, 2024
5 changes: 3 additions & 2 deletions chart/values.prod.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ingress:
hostname: jobmgr.mmli1.ncsa.illinois.edu
hostname: jobmgr.platform.moleculemaker.org
tls: true
annotations:
cert-manager.io/cluster-issuer: letsencrypt-production
Expand Down Expand Up @@ -117,7 +117,8 @@ config:
server:
protocol: "https"
## API hostname. Must match the ingress.hostname value.
hostName: "jobmgr.mmli1.ncsa.illinois.edu"
## Suffix must match CLEAN + MOLLI prod for user auth to work
hostName: "jobmgr.platform.moleculemaker.org"
namespace: "alphasynthesis"
oauth:
userInfoUrl: "http://oauth2-proxy.oauth2-proxy.svc.cluster.local/oauth2/userinfo"
Expand Down
5 changes: 2 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# kubernetes v12.0.0 breaks the kubejob.py code
kubernetes == 11.0.0
kubernetes == 24.2.0
tornado == 6.1
pyjson == 1.3.0
pyyaml == 5.4.1
jinja2 == 3.0.1
requests == 2.26
requests
bcrypt == 3.2.0
pyjwt == 1.7.1
mysql-connector-python-rf == 2.2.2
Expand Down
22 changes: 17 additions & 5 deletions src/kubejob.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

import global_vars
from global_vars import config, log
from kubernetes import client, config as kubeconfig
Expand All @@ -13,12 +15,22 @@

## Load Kubernetes cluster config. Unhandled exception if not in Kubernetes environment.
try:
kubeconfig.load_kube_config(config_file=config['server']['kubeconfig'])
except:
kubeconfig.load_incluster_config()
configuration = client.Configuration()
api_batch_v1 = client.BatchV1Api(client.ApiClient(configuration))
api_v1 = client.CoreV1Api(client.ApiClient(configuration))
log.info('Successfully loaded in-cluster config!')
except Exception as e1:
log.error('In-cluster config failed: ', e1)
config_file_path = config['server']['kubeconfig']
log.info('Falling back to provided kubeconfig path: ', config_file_path)
try:
kubeconfig.load_kube_config(config_file=config_file_path)
except Exception as e2:
log.fatal('Failed to get any cluster config: ', e2)
sys.exit(1)
#configuration = client.Configuration()
#api_batch_v1 = client.BatchV1Api(client.ApiClient(configuration))
#api_v1 = client.CoreV1Api(client.ApiClient(configuration))
api_batch_v1 = client.BatchV1Api()
api_v1 = client.CoreV1Api()
Comment on lines +29 to +33
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriding configuration has changed between versions, but this is not required for how we are using the K8S API client


def get_namespace():
# When running in a pod, the namespace should be determined automatically,
Expand Down
15 changes: 11 additions & 4 deletions src/kubewatcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import time
import threading

from kubernetes import watch, config as kubeconfig
from kubernetes.client import V1JobList
from kubernetes.client.rest import ApiException
from requests import HTTPError

Expand Down Expand Up @@ -70,11 +72,16 @@ def run(self):
time.sleep(1)
self.logger.info('KubeWatcher is connecting...')
try:
# Resource version is used to keep track of stream progress (in case of resume)
# List all pods in watched namespace to get resource_version
namespaced_jobs: V1JobList = kubejob.api_batch_v1.list_namespaced_job(namespace=kubejob.get_namespace())
resource_version = namespaced_jobs.metadata.resource_version if namespaced_jobs.metadata.resource_version else resource_version

# Then, watch for new events using the most recent resource_version
# Resource version is used to keep track of stream progress (in case of resume/retry)
k8s_event_stream = w.stream(func=kubejob.api_batch_v1.list_namespaced_job,
namespace=kubejob.get_namespace(),
timeout_seconds=timeout_seconds,
resource_version=resource_version)
resource_version=resource_version,
timeout_seconds=timeout_seconds)
Comment on lines -73 to +84
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attempt to implement List+Watch pattern, as described here:
kubernetes-client/python#843 (comment)


self.logger.info('KubeWatcher connected!')

Expand Down Expand Up @@ -147,7 +154,7 @@ def run(self):
k8s_event_stream = None
if e.status == 410:
# Resource too old
resource_version = None
resource_version = ''
self.logger.warning("Resource too old (410) - reconnecting: " + str(e))
time.sleep(2)
continue
Expand Down