You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
prefect 3, integrations, k8s, kubernetes, crashed, flow run, infrastructure, job, pod, pod_watch_timeout_seconds, pod never started
Overview
We came across this issue during some load testing of our self-hosted prefect 3 deployment in a managed EKS cluster (deployed using the official PrefectHQ helm charts). For these particular tests, we used a "dummy" flow which consists of a dummy do_work task (just executes asyncio.sleep for some arbitrary time and "does some work"). The main focus of the load test was to see how the prefect worker, server and PostgreSQL database handle a sudden burst of up to 1000 newly scheduled flow runs.
The system handles this situation quite nicely, but at certain times, we need to wait for Karpenter to provision additional nodes in order for the new k8s jobs to be able to start the pods. Sometimes this takes a bit longer and from time to time we reach the configured pod_watch_timeout_seconds. At this point, Prefect worker logs that the pod never started, logs some k8s context and sets the flow run state to CRASHED. This is all expected and well-handled. We also have some logic in our in-house service which polls the prefect API for flow run states and links these with our own custom entities. Since CRASHED is one of the terminal states, we capture it and declare our own entity as FAILED. This is where the problematic part starts.
Expected behavior
Since CRASHED is one of the terminal states, we expect the flow run state to indeed be final. We declare our own linked entity (undisclosed for the NDA reasons) as FAILED and basically forget about it until it is reviewed and possibly retried at some later point (a new flow run is created then). K8s Job template has the backoffLimit set to 0 and restartPolicy set to Never. The task and the flow have no retries defined via annotations in the source code. This would effectively mean that once the flow run has failed by crashing due to infrastructure problems, it should stay that way.
What actually happens
For some reason, if I keep tracking the same crashed flow run via the Prefect UI, after a couple of minutes it is, for the lack of better words, "resurrected" out of nowhere, its state is suddenly switched to RUNNING and the pod actually starts and executes the flow run. It is then set to the COMPLETED state. At first, I thought this is some sort of undocumented (or vaguely documented) automated prefect behavior which internally retries the infrastructure provisioning for the crashed flow runs which failed specifically due to Pod never started issue, but after going through the Prefect code, I haven't managed to find any such implementations. So I decided to open this issue and file it as a bug.
This behavior is, on one hand, kind of nice (or would be if it wasn't a bug), as we wouldn't have to worry about the retries due to this specific problem ourselves, but on the other hand, it's problematic as there is no good way to determine that, in this particular case, the CRASHED state is not really terminal. The state message does not help too much (Flow run infrastructure exited with non-zero status code -1.) as it does not include the detail that the flow crashed due to pod wait timeout. This, in the end, leads us to undefined behavior, as we declare our custom entity as FAILED, but the associated flow run eventually runs nonetheless and has its side effects.
Some assumptions
Currently, my leading assumption (which I haven't managed to confirm nor deny yet, I'll do my best) is that the job still lingers around on the k8s cluster for a while, long enough to actually get its node allocation for the pod it is supposed to bring up. At that point, the flow run probably reports back to the worker (or prefect API?) and this consequentially updates its state to RUNNING.
How can I reproduce this?
For us, this happens whenever Karpenter takes its sweet time to provision a node and the pod wait timer runs out. A good way to simulate it would probably be to set the pod wait time to some low value, and then immediately provision enough additional resources for the pod to be able to start once Prefect sets the state to CRASHED.
Conclusion
As of right now, I don't have a proper workaround for this, other than to keep increasing the pod_watch_timeout_seconds until we see no more crashed runs. This is, unfortunately, not possible due to some other restricting factors (we have various SLA times we are obligated to respect). One of the solutions would be to double check that there are indeed no more associated resources on the underlying infrastructure after the flow run has been declared as CRASHED (maybe even as FAILED) and to make sure that, if there are still some leftovers, these get cleaned up before the flow run starts executing any business logic. This is, of course, if the issue does indeed lie in the fact that the job remains lingering around. Ideally, the flow run would never actually run at all.
Post Scriptum
Important
Read this for additional info added after the fact
To see if this has something to do explicitly with Prefect 3 or with this particular environment, I attempted (and managed/succeeded) to reproduce this issue in an unrelated Prefect 2 deployment which is utilized by our separate project. The behavior is the same. I've added the version info for this particular Prefect 2 deployment too.
Version info
Version: 3.1.0
API version: 0.8.4
Python version: 3.12.7
Git commit: a83ba39b
Built: Thu, Oct 31, 2024 12:43 PM
OS/Arch: linux/aarch64
Profile: ephemeral
Server type: server
Pydantic version: 2.9.2
Integrations:
prefect-kubernetes: 0.5.3
Version: 2.20.14
API version: 0.8.4
Python version: 3.12.7
Git commit: fb919c67
Built: Mon, Nov 18, 2024 4:41 PM
OS/Arch: linux/aarch64
Profile: default
Server type: server
Additional context
Screenshots
Logs
timestamp,level,flow_run_id,task_run_id,message
2024-12-17 00:08:56.620966+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Worker 'KubernetesWorker a78b2758-1ba6-4010-a2c0-ee14ddb44699' submitting flow run 'cb91b9ce-c8df-4457-9d1c-cdec3d516a5e'
2024-12-17 00:08:57.325384+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Creating Kubernetes job...
2024-12-17 00:08:57.547716+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Starting watch for pod start...
2024-12-17 00:08:57.558362+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod 'affable-turaco-q2jpq-hgzf9' has started.
2024-12-17 00:08:57.558791+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod has status 'Pending'.
2024-12-17 00:08:57.598477+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Completed submission of flow run 'cb91b9ce-c8df-4457-9d1c-cdec3d516a5e'
2024-12-17 00:09:34.644728+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod 'affable-turaco-q2jpq-hgzf9' has started.
2024-12-17 00:09:43.835144+00:00,40,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod never started.
2024-12-17 00:09:48.960376+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job event 'SuccessfulCreate' at 2024-12-17 00:08:57+00:00: Created pod: affable-turaco-q2jpq-hgzf9
2024-12-17 00:09:48.961380+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,"Pod event 'FailedScheduling' at 2024-12-17 00:09:03+00:00: Failed to schedule pod, incompatible with nodepool ""system"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, did not tolerate system=True:NoSchedule; incompatible with nodepool ""prefectflows"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, did not tolerate prefectflows=true:NoSchedule; incompatible with nodepool ""prefect2flows"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, no instance type satisfied resources {""cpu"":""1324m"",""memory"":""954Mi"",""pods"":""7""} and requirements karpenter.k8s.aws/instance-category In [c m r t], karpenter.k8s.aws/instance-cpu In [2 4 8], karpenter.k8s.aws/instance-generation Exists >2, karpenter.k8s.aws/instance-size NotIn [16xlarge 32xlarge 4xlarge 8xlarge micro and 2 others], karpenter.sh/capacity-type In [on-demand spot], karpenter.sh/nodepool In [prefect2flows], kubernetes.io/arch In [amd64], kubernetes.io/os In [linux], prefect2flows In [true] (no instance type met the scheduling requirements or had enough resources); incompatible with nodepool ""ghrunner"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, did not tolerate ghrunner=true:NoSchedule; incompatible with nodepool ""default"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, incompatible requirements, label ""prefect2flows"" does not have known values"
2024-12-17 00:09:48.962392+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,"Pod event 'FailedScheduling' (5 times) at 2024-12-17 00:09:22+00:00: 0/25 nodes are available: 1 Insufficient cpu, 1 node(s) had untolerated taint {ghrunner: true}, 1 node(s) had untolerated taint {prefectflows: true}, 18 node(s) didn't match Pod's node affinity/selector, 2 node(s) had untolerated taint {eks.amazonaws.com/compute-type: fargate}, 2 node(s) had untolerated taint {system: True}. preemption: 0/25 nodes are available: 1 No preemption victims found for incoming pod, 24 Preemption is not helpful for scheduling."
2024-12-17 00:09:48.963316+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,"Pod event 'FailedScheduling' at 2024-12-17 00:09:34+00:00: 0/30 nodes are available: 1 Insufficient cpu, 1 node(s) had untolerated taint {ghrunner: true}, 1 node(s) had untolerated taint {prefectflows: true}, 18 node(s) didn't match Pod's node affinity/selector, 2 node(s) had untolerated taint {eks.amazonaws.com/compute-type: fargate}, 2 node(s) had untolerated taint {system: True}, 5 node(s) had untolerated taint {node.kubernetes.io/not-ready: }. preemption: 0/30 nodes are available: 1 No preemption victims found for incoming pod, 29 Preemption is not helpful for scheduling."
2024-12-17 00:09:57.475350+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Reported flow run 'cb91b9ce-c8df-4457-9d1c-cdec3d516a5e' as crashed: Flow run infrastructure exited with non-zero status code -1.
2024-12-17 00:11:53.550565+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Opening process...
2024-12-17 00:11:55.878500+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Downloading flow code from storage at '.'
2024-12-17 00:11:57.158898+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,bf6cc6dd-869b-4316-9598-7f7062117849,2024-12-17 00:11:57.158 | INFO | __prefect_loader__:do_work:12 - doing some work
2024-12-17 00:12:18.248445+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,bf6cc6dd-869b-4316-9598-7f7062117849,Finished in state Completed()
2024-12-17 00:12:18.249969+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,2024-12-17 00:12:18.248 | INFO | sentry_sdk.integrations.logging:sentry_patched_callhandlers:95 - Finished in state Completed()
2024-12-17 00:12:18.292922+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Finished in state Completed()
2024-12-17 00:12:18.762510+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Process for flow run 'affable-turaco' exited cleanly.
Note
Please note the near-2-minute gap in the logs between the flow run being marked as CRASHED and the continuation of execution.
Code
The base job template defined in the prefect-worker helm chart values, used for all deployments/flow runs:
worker:
config:
baseJobTemplate:
name: {{ .Release.Name }}-base-job-templateconfiguration: | { "variables": { "type": "object", "properties": { "name": { "title": "Name", "description": "Name given to infrastructure created by a worker.", "type": "string" }, "env": { "title": "Environment Variables", "description": "Environment variables to set when starting a flow run.", "type": "object", "additionalProperties": { "type": "string" } }, "labels": { "title": "Labels", "description": "Labels applied to infrastructure created by a worker.", "type": "object", "additionalProperties": { "type": "string" } }, "command": { "title": "Command", "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.", "type": "string" }, "image": { "title": "Image", "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.", "example": "docker.io/prefecthq/prefect:3-latest", "type": "string" }, "service_account_name": { "title": "Service Account Name", "description": "The Kubernetes service account to use for job creation.", "type": "string", "default": "CENSORED" }, "image_pull_policy": { "title": "Image Pull Policy", "description": "The Kubernetes image pull policy to use for job containers.", "default": "IfNotPresent", "enum": [ "IfNotPresent", "Always", "Never" ], "type": "string" }, "finished_job_ttl": { "title": "Finished Job TTL", "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.", "type": "integer" }, "job_watch_timeout_seconds": { "title": "Job Watch Timeout Seconds", "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.", "type": "integer" }, "pod_watch_timeout_seconds": { "title": "Pod Watch Timeout Seconds", "description": "Number of seconds to watch for pod creation before timing out.", "default": 300, "type": "integer" }, "stream_output": { "title": "Stream Output", "description": "If set, output will be streamed from the job to local standard output.", "default": true, "type": "boolean" }, "cluster_config": { "title": "Cluster Config", "description": "The Kubernetes cluster config to use for job creation.", "allOf": [ { "$ref": "#/definitions/KubernetesClusterConfig" } ] }, "cpu": { "title": "CPU", "description": "The number of vCPUs to request for each job.", "default": 1, "type": "number", "min": 0.1, "max": 8 }, "memory": { "title": "Memory", "description": "The amount of memory to use for each job in MiB", "default": 512, "type": "number", "min": 64, "max": 16384 }, "arch": { "title": "Architecture", "description": "The architecture of the node to which the pods will be deployed.", "default": "amd64", "enum": [ "amd64", "arm64" ], "type": "string" } }, "definitions": { "KubernetesClusterConfig": { "title": "KubernetesClusterConfig", "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.", "type": "object", "properties": { "config": { "title": "Config", "description": "The entire contents of a kubectl config file.", "type": "object" }, "context_name": { "title": "Context Name", "description": "The name of the kubectl context to use.", "type": "string" } }, "required": [ "config", "context_name" ], "block_type_slug": "kubernetes-cluster-config", "secret_fields": [], "block_schema_references": {} } } }, "job_configuration": { "command": "{{`{{ command }}`}}", "env": "{{`{{ env }}`}}", "labels": "{{`{{ labels }}`}}", "name": "{{`{{ name }}`}}", "namespace": {{ requiredEnv "CENSORED" | quote }}, "cluster_config": "{{`{{ cluster_config }}`}}", "job_watch_timeout_seconds": "{{`{{ job_watch_timeout_seconds }}`}}", "pod_watch_timeout_seconds": "{{`{{ pod_watch_timeout_seconds }}`}}", "stream_output": "{{`{{ stream_output }}`}}", "job_manifest": { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "labels": "{{`{{ labels }}`}}", "namespace": {{ requiredEnv "CENSORED" | quote }}, "generateName": "{{`{{ name }}`}}-" }, "spec": { "backoffLimit": 0, "ttlSecondsAfterFinished": "{{`{{ finished_job_ttl }}`}}", "template": { "metadata": { "labels": { "workload": "flow-run" }, "annotations": { "karpenter.sh/do-not-disrupt": "true" } }, "spec": { "parallelism": 1, "completions": 1, "restartPolicy": "Never", "serviceAccountName": "{{`{{ service_account_name }}`}}", "containers": [ { "name": "prefect-job", "env": "{{`{{ env }}`}}", "image": "{{`{{ image }}`}}", "imagePullPolicy": "{{`{{ image_pull_policy }}`}}", "args": "{{`{{ command }}`}}", "lifecycle": { "preStop": { "exec": { "command": [ "sleep", "30" ] } } }, "resources": { "requests": { "cpu": "{{`{{ cpu }}`}}", "memory": "{{`{{ memory }}`}}Mi" }, "limits": { "memory": "{{`{{ memory }}`}}Mi" } }, "terminationGracePeriodSeconds": 60 } ], "nodeSelector": { "kubernetes.io/arch": "{{`{{ arch }}`}}", "prefect2flows": "true" }, "tolerations": [ { "key": "prefect2flows", "value": "true", "effect": "NoSchedule" } ] } } } } } }
The flow used for testing purposes, registered as a deployment on the self-hosted prefect server:
Bug summary
Keywords
Overview
We came across this issue during some load testing of our self-hosted prefect 3 deployment in a managed EKS cluster (deployed using the official PrefectHQ helm charts). For these particular tests, we used a "dummy" flow which consists of a dummy
do_work
task (just executesasyncio.sleep
for some arbitrary time and "does some work"). The main focus of the load test was to see how the prefect worker, server and PostgreSQL database handle a sudden burst of up to 1000 newly scheduled flow runs.The system handles this situation quite nicely, but at certain times, we need to wait for Karpenter to provision additional nodes in order for the new k8s jobs to be able to start the pods. Sometimes this takes a bit longer and from time to time we reach the configured
pod_watch_timeout_seconds
. At this point, Prefect worker logs that the pod never started, logs some k8s context and sets the flow run state toCRASHED
. This is all expected and well-handled. We also have some logic in our in-house service which polls the prefect API for flow run states and links these with our own custom entities. SinceCRASHED
is one of the terminal states, we capture it and declare our own entity asFAILED
. This is where the problematic part starts.Expected behavior
Since
CRASHED
is one of the terminal states, we expect the flow run state to indeed be final. We declare our own linked entity (undisclosed for the NDA reasons) asFAILED
and basically forget about it until it is reviewed and possibly retried at some later point (a new flow run is created then). K8s Job template has thebackoffLimit
set to0
andrestartPolicy
set toNever
. The task and the flow have no retries defined via annotations in the source code. This would effectively mean that once the flow run has failed by crashing due to infrastructure problems, it should stay that way.What actually happens
For some reason, if I keep tracking the same crashed flow run via the Prefect UI, after a couple of minutes it is, for the lack of better words, "resurrected" out of nowhere, its state is suddenly switched to
RUNNING
and the pod actually starts and executes the flow run. It is then set to theCOMPLETED
state. At first, I thought this is some sort of undocumented (or vaguely documented) automated prefect behavior which internally retries the infrastructure provisioning for the crashed flow runs which failed specifically due toPod never started
issue, but after going through the Prefect code, I haven't managed to find any such implementations. So I decided to open this issue and file it as a bug.This behavior is, on one hand, kind of nice (or would be if it wasn't a bug), as we wouldn't have to worry about the retries due to this specific problem ourselves, but on the other hand, it's problematic as there is no good way to determine that, in this particular case, the
CRASHED
state is not really terminal. The state message does not help too much (Flow run infrastructure exited with non-zero status code -1.
) as it does not include the detail that the flow crashed due to pod wait timeout. This, in the end, leads us to undefined behavior, as we declare our custom entity asFAILED
, but the associated flow run eventually runs nonetheless and has its side effects.Some assumptions
Currently, my leading assumption (which I haven't managed to confirm nor deny yet, I'll do my best) is that the job still lingers around on the k8s cluster for a while, long enough to actually get its node allocation for the pod it is supposed to bring up. At that point, the flow run probably reports back to the worker (or prefect API?) and this consequentially updates its state to
RUNNING
.How can I reproduce this?
For us, this happens whenever Karpenter takes its sweet time to provision a node and the pod wait timer runs out. A good way to simulate it would probably be to set the pod wait time to some low value, and then immediately provision enough additional resources for the pod to be able to start once Prefect sets the state to
CRASHED
.Conclusion
As of right now, I don't have a proper workaround for this, other than to keep increasing the
pod_watch_timeout_seconds
until we see no more crashed runs. This is, unfortunately, not possible due to some other restricting factors (we have various SLA times we are obligated to respect). One of the solutions would be to double check that there are indeed no more associated resources on the underlying infrastructure after the flow run has been declared asCRASHED
(maybe even asFAILED
) and to make sure that, if there are still some leftovers, these get cleaned up before the flow run starts executing any business logic. This is, of course, if the issue does indeed lie in the fact that the job remains lingering around. Ideally, the flow run would never actually run at all.Post Scriptum
Important
Read this for additional info added after the fact
To see if this has something to do explicitly with Prefect 3 or with this particular environment, I attempted (and managed/succeeded) to reproduce this issue in an unrelated Prefect 2 deployment which is utilized by our separate project. The behavior is the same. I've added the version info for this particular Prefect 2 deployment too.
Version info
Additional context
Screenshots
Logs
Note
Please note the near-2-minute gap in the logs between the flow run being marked as
CRASHED
and the continuation of execution.Code
The base job template defined in the
prefect-worker
helm chart values, used for all deployments/flow runs:The flow used for testing purposes, registered as a deployment on the self-hosted prefect server:
The definition of custom
@async_task
decorator:Edits
CRASHED
toRUNNING
The text was updated successfully, but these errors were encountered: