Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry OOM killed jobs #4

Draft
wants to merge 12 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions gantry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ async def apply_migrations(db: aiosqlite.Connection):
# and not inadvertently added to the migrations folder
("001_initial.sql", 1),
("002_spec_index.sql", 2),
("003_oom_retry.sql", 3),
]

# apply migrations that have not been applied
Expand Down
19 changes: 13 additions & 6 deletions gantry/clients/gitlab.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import urllib.parse

import aiohttp


Expand All @@ -6,26 +8,31 @@ def __init__(self, base_url: str, api_token: str):
self.base_url = base_url
self.headers = {"PRIVATE-TOKEN": api_token}

async def _request(self, url: str, response_type: str) -> dict | str:
async def _request(self, method: str, url: str, response_type: str) -> dict | str:
"""
Helper for requests to the Gitlab API.

args:
method: HTTP method (GET, POST)
url: the url to request
response_type: the type of response to expect (json or text)
response_type: the type of response to expect (json, text by default)

returns: the response from Gitlab in the specified format
"""

async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as resp:
async with session.request(method, url, headers=self.headers) as resp:
if response_type == "json":
return await resp.json()
if response_type == "text":
return await resp.text()
return await resp.text()

async def job_log(self, gl_id: int) -> str:
"""Given a job id, returns the log from that job"""

url = f"{self.base_url}/jobs/{gl_id}/trace"
return await self._request(url, "text")
return await self._request("get", url, "text")

async def start_pipeline(self, ref: str) -> dict:
"""Given a ref, starts a pipeline"""
url = f"{self.base_url}/pipeline?ref={urllib.parse.quote(ref)}"
return await self._request("POST", url, "json")
17 changes: 17 additions & 0 deletions gantry/clients/prometheus/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ async def get_annotations(self, gl_id: int, time: float) -> dict:
"annotation_metrics_spack_job_spec_compiler_version"
],
"stack": annotations["annotation_metrics_spack_ci_stack_name"],
"retry_count": int(
annotations.get("annotation_metrics_spack_job_retry_count", 0)
),
}
except KeyError as e:
# if any of the annotations are missing, raise an error
Expand Down Expand Up @@ -152,3 +155,17 @@ async def get_usage(self, pod: str, start: float, end: float) -> dict:
"mem_min": mem_usage["min"],
"mem_stddev": mem_usage["stddev"],
}

async def is_oom(self, pod: str, start: float, end: float) -> bool:
"""checks if a job was OOM killed"""
# TODO this does not work
oom_status = await self.client.query_range(
query={
"metric": "kube_pod_container_status_last_terminated_reason",
"filters": {"container": "build", "pod": pod, "reason": "OOMKilled"},
},
start=start,
end=end + (10 * 60), # 10 minute buffer, handle where there is no data...
)

return bool(oom_status)
99 changes: 90 additions & 9 deletions gantry/routes/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,92 @@
from gantry.clients.prometheus import PrometheusClient
from gantry.clients.prometheus.util import IncompleteData
from gantry.models import Job
from gantry.routes.prediction import RETRY_COUNT_LIMIT

MB_IN_BYTES = 1_000_000
BUILD_STAGE_REGEX = r"^stage-\d+$"

logger = logging.getLogger(__name__)


async def handle_pipeline(
payload: dict,
db_conn: aiosqlite.Connection,
gitlab: GitlabClient,
prometheus: PrometheusClient,
) -> None | bool:
"""
Sends any failed jobs from a pipeline to fetch_job.
If any of the failed jobs were OOM killed, the pipeline will be recreated.

args:
payload: a dictionary containing the information from the Gitlab pipeline hook
db: an active aiosqlite connection
gitlab: gitlab client
prometheus: prometheus client

returns: True if the pipeline was recreated, else None
"""

if payload["object_attributes"]["status"] != "failed":
return

ref = payload["object_attributes"]["ref"]
failed_jobs = [
# imitate the payload from the job hook, which fetch_jobs expects
{
"build_status": job["status"],
"build_id": job["id"],
"build_started_at": job["started_at"],
"build_finished_at": job["finished_at"],
"ref": ref,
"build_stage": job["stage"],
"runner": job["runner"],
}
for job in payload["builds"]
if job["status"] == "failed"
]

retry_pipeline = False

for job in failed_jobs:
# insert every potentially oomed job
# if a job has been retried RETRY_COUNT_LIMIT times, oomed will be False
# start_pipeline will be called if any of the failed_jobs fit the criteria
# the same check is performed on the prediction side, and won't re-bump memory
oomed = await fetch_job(job, db_conn, gitlab, prometheus, from_pipeline=True)

# fetch_job can return None or (job_id: int, oomed: bool)
if oomed and oomed[1]:
retry_pipeline = True

# once all jobs are collected/discarded, retry the pipeline if needed
if retry_pipeline:
await gitlab.start_pipeline(ref)
return retry_pipeline


async def fetch_job(
payload: dict,
db_conn: aiosqlite.Connection,
gitlab: GitlabClient,
prometheus: PrometheusClient,
) -> None:
from_pipeline: bool = False,
) -> tuple[int, bool] | None:
"""
Collects a job's information from Prometheus and inserts into db.
Warnings about missing data will be logged; check uncaught exceptions.
Fetches a job's information from Prometheus and inserts it into the database.
If there is data missing at any point, the function will still return so the webhook
responds as expected. If an exception is thrown, that behavior was unanticipated by
this program and should be investigated.

args:
payload: a dictionary containing the information from the Gitlab job hook
payload: a dictionary containing the information from the gitlab job hook
db: an active aiosqlite connection
gitlab: gitlab client
prometheus: prometheus client
from_pipeline: if the job was called from a pipeline handler

returns: None in order to accommodate a 200 response for the webhook.
returns: if data was inserted,
a tuple of the job id and if the job was OOM killed, else None
"""

job = Job(
Expand All @@ -45,7 +107,12 @@ async def fetch_job(

# perform checks to see if we should collect data for this job
if (
# successful jobs should not come from a handle_pipeline call
job.status != "success"
and from_pipeline is False
# we don't want to collect failed jobs that aren't from a handle_pipeline call
or job.status != "failed"
and from_pipeline is True
# if the stage is not stage-NUMBER, it's not a build job
or not re.match(BUILD_STAGE_REGEX, payload["build_stage"])
# some jobs don't have runners..?
Expand All @@ -57,6 +124,9 @@ async def fetch_job(
):
return

# track if job was OOM killed and needs to be retried
oomed = False

try:
# all code that makes HTTP requests should be in this try block

Expand All @@ -66,8 +136,19 @@ async def fetch_job(
if is_ghost:
logger.warning(f"job {job.gl_id} is a ghost, skipping")
return

annotations = await prometheus.job.get_annotations(job.gl_id, job.midpoint)
# check if failed job was OOM killed,
# return early if it wasn't because we don't care about it anymore
# do not retry if the job has already been retried RETRY_COUNT_LIMIT times
if job.status == "failed":
if (
await prometheus.job.is_oom(annotations["pod"], job.start, job.end)
and annotations["retry_count"] < RETRY_COUNT_LIMIT
):
oomed = True
else:
return

resources, node_hostname = await prometheus.job.get_resources(
annotations["pod"], job.midpoint
)
Expand All @@ -90,6 +171,7 @@ async def fetch_job(
"gitlab_id": job.gl_id,
"job_status": job.status,
"ref": job.ref,
"oomed": oomed,
**annotations,
**resources,
**usage,
Expand All @@ -99,8 +181,7 @@ async def fetch_job(
# job and node will get saved at the same time to make sure
# we don't accidentally commit a node without a job
await db_conn.commit()

return job_id
return (job_id, oomed)


async def fetch_node(
Expand Down
76 changes: 68 additions & 8 deletions gantry/routes/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"openmp",
"hdf5",
}
MEM_BUMP_FACTOR = 1.2
RETRY_COUNT_LIMIT = 3


async def predict(db: aiosqlite.Connection, spec: dict) -> dict:
Expand All @@ -33,6 +35,11 @@ async def predict(db: aiosqlite.Connection, spec: dict) -> dict:
CPU in millicore, mem in MB
"""

# check if the memory limit needs to be increased
alloc_oom = await check_oom(db, spec)
if alloc_oom:
return {"variables": alloc_oom}

sample = await get_sample(db, spec)
predictions = {}
if not sample:
Expand All @@ -56,12 +63,7 @@ async def predict(db: aiosqlite.Connection, spec: dict) -> dict:
logger.warning(f"Warning: Memory request for {spec} is below 10MB")
predictions["mem_request"] = DEFAULT_MEM_REQUEST

# convert predictions to k8s friendly format
for k, v in predictions.items():
if k.startswith("cpu"):
predictions[k] = k8s.convert_cores(v)
elif k.startswith("mem"):
predictions[k] = k8s.convert_bytes(v)
predictions = k8s.convert_allocations(predictions)

return {
"variables": {
Expand Down Expand Up @@ -115,7 +117,8 @@ async def select_sample(query: str, filters: dict, extra_params: list = []) -> l
# within this combo, variants included
query = f"""
SELECT cpu_mean, cpu_max, mem_mean, mem_max FROM jobs
WHERE ref='develop' AND {' AND '.join(f'{param}=?' for param in filters.keys())}
WHERE ref='develop' AND job_status='success'
AND {' AND '.join(f'{param}=?' for param in filters.keys())}
ORDER BY end DESC LIMIT {IDEAL_SAMPLE}
"""

Expand Down Expand Up @@ -155,7 +158,8 @@ async def select_sample(query: str, filters: dict, extra_params: list = []) -> l

query = f"""
SELECT cpu_mean, cpu_max, mem_mean, mem_max FROM jobs
WHERE ref='develop' AND {' AND '.join(f'{param}=?' for param in filters.keys())}
WHERE ref='develop' AND job_status='success'
AND {' AND '.join(f'{param}=?' for param in filters.keys())}
AND {' AND '.join(exp_variant_conditions)}
ORDER BY end DESC LIMIT {IDEAL_SAMPLE}
"""
Expand All @@ -164,3 +168,59 @@ async def select_sample(query: str, filters: dict, extra_params: list = []) -> l
return sample

return []


async def check_oom(db: aiosqlite.Connection, spec: dict) -> dict:
"""
Check if the spec's last build was OOM killed and bump
the prediction if necessary.

args:
spec: see predict
returns:
dict of variables for the k8s job
"""

# look for an exact match of the spec that has been OOM killed
query = """
SELECT cpu_mean, cpu_max, mem_mean, mem_limit, retry_count FROM jobs
WHERE pkg_name=? AND pkg_version=? AND pkg_variants=?
AND compiler_name=? AND compiler_version=? AND arch=? AND oomed=1
ORDER BY end DESC LIMIT 1
"""

async with db.execute(
query,
(
spec["pkg_name"],
spec["pkg_version"],
spec["pkg_variants"],
spec["compiler_name"],
spec["compiler_version"],
spec["arch"],
),
) as cursor:
res = await cursor.fetchall()

if not res:
return {}

# use the last build's resource usage as a baseline
# using mem_limit instead of max to ensure it's increased by the bump factor
variables = {
"KUBERNETES_CPU_REQUEST": res[0][0],
"KUBERNETES_CPU_LIMIT": res[0][1],
"KUBERNETES_MEMORY_REQUEST": res[0][2],
"KUBERNETES_MEMORY_LIMIT": res[0][3],
"GANTRY_RETRY_COUNT": res[0][4],
}

if variables["GANTRY_RETRY_COUNT"] < RETRY_COUNT_LIMIT:
# only bump the memory if it's been a certain amount
# the build will likely fail but this is to prevent infinite retries
variables["KUBERNETES_MEMORY_LIMIT"] = (
variables["KUBERNETES_MEMORY_LIMIT"] * MEM_BUMP_FACTOR
)
variables["GANTRY_RETRY_COUNT"] += 1

return k8s.convert_allocations(variables)
Loading