Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected completion of flow runs previously marked as crashed #16429

Open
criskurtin opened this issue Dec 17, 2024 · 1 comment
Open

Unexpected completion of flow runs previously marked as crashed #16429

criskurtin opened this issue Dec 17, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@criskurtin
Copy link

criskurtin commented Dec 17, 2024

Bug summary

Keywords

  • prefect 3, integrations, k8s, kubernetes, crashed, flow run, infrastructure, job, pod, pod_watch_timeout_seconds, pod never started

Overview

We came across this issue during some load testing of our self-hosted prefect 3 deployment in a managed EKS cluster (deployed using the official PrefectHQ helm charts). For these particular tests, we used a "dummy" flow which consists of a dummy do_work task (just executes asyncio.sleep for some arbitrary time and "does some work"). The main focus of the load test was to see how the prefect worker, server and PostgreSQL database handle a sudden burst of up to 1000 newly scheduled flow runs.
The system handles this situation quite nicely, but at certain times, we need to wait for Karpenter to provision additional nodes in order for the new k8s jobs to be able to start the pods. Sometimes this takes a bit longer and from time to time we reach the configured pod_watch_timeout_seconds. At this point, Prefect worker logs that the pod never started, logs some k8s context and sets the flow run state to CRASHED. This is all expected and well-handled. We also have some logic in our in-house service which polls the prefect API for flow run states and links these with our own custom entities. Since CRASHED is one of the terminal states, we capture it and declare our own entity as FAILED. This is where the problematic part starts.

Expected behavior

Since CRASHED is one of the terminal states, we expect the flow run state to indeed be final. We declare our own linked entity (undisclosed for the NDA reasons) as FAILED and basically forget about it until it is reviewed and possibly retried at some later point (a new flow run is created then). K8s Job template has the backoffLimit set to 0 and restartPolicy set to Never. The task and the flow have no retries defined via annotations in the source code. This would effectively mean that once the flow run has failed by crashing due to infrastructure problems, it should stay that way.

What actually happens

For some reason, if I keep tracking the same crashed flow run via the Prefect UI, after a couple of minutes it is, for the lack of better words, "resurrected" out of nowhere, its state is suddenly switched to RUNNING and the pod actually starts and executes the flow run. It is then set to the COMPLETED state. At first, I thought this is some sort of undocumented (or vaguely documented) automated prefect behavior which internally retries the infrastructure provisioning for the crashed flow runs which failed specifically due to Pod never started issue, but after going through the Prefect code, I haven't managed to find any such implementations. So I decided to open this issue and file it as a bug.
This behavior is, on one hand, kind of nice (or would be if it wasn't a bug), as we wouldn't have to worry about the retries due to this specific problem ourselves, but on the other hand, it's problematic as there is no good way to determine that, in this particular case, the CRASHED state is not really terminal. The state message does not help too much (Flow run infrastructure exited with non-zero status code -1.) as it does not include the detail that the flow crashed due to pod wait timeout. This, in the end, leads us to undefined behavior, as we declare our custom entity as FAILED, but the associated flow run eventually runs nonetheless and has its side effects.

Some assumptions

Currently, my leading assumption (which I haven't managed to confirm nor deny yet, I'll do my best) is that the job still lingers around on the k8s cluster for a while, long enough to actually get its node allocation for the pod it is supposed to bring up. At that point, the flow run probably reports back to the worker (or prefect API?) and this consequentially updates its state to RUNNING.

How can I reproduce this?

For us, this happens whenever Karpenter takes its sweet time to provision a node and the pod wait timer runs out. A good way to simulate it would probably be to set the pod wait time to some low value, and then immediately provision enough additional resources for the pod to be able to start once Prefect sets the state to CRASHED.

Conclusion

As of right now, I don't have a proper workaround for this, other than to keep increasing the pod_watch_timeout_seconds until we see no more crashed runs. This is, unfortunately, not possible due to some other restricting factors (we have various SLA times we are obligated to respect). One of the solutions would be to double check that there are indeed no more associated resources on the underlying infrastructure after the flow run has been declared as CRASHED (maybe even as FAILED) and to make sure that, if there are still some leftovers, these get cleaned up before the flow run starts executing any business logic. This is, of course, if the issue does indeed lie in the fact that the job remains lingering around. Ideally, the flow run would never actually run at all.

Post Scriptum

Important

Read this for additional info added after the fact

To see if this has something to do explicitly with Prefect 3 or with this particular environment, I attempted (and managed/succeeded) to reproduce this issue in an unrelated Prefect 2 deployment which is utilized by our separate project. The behavior is the same. I've added the version info for this particular Prefect 2 deployment too.

Version info

Version:             3.1.0
API version:         0.8.4
Python version:      3.12.7
Git commit:          a83ba39b
Built:               Thu, Oct 31, 2024 12:43 PM
OS/Arch:             linux/aarch64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.9.2
Integrations:
  prefect-kubernetes: 0.5.3
Version:             2.20.14
API version:         0.8.4
Python version:      3.12.7
Git commit:          fb919c67
Built:               Mon, Nov 18, 2024 4:41 PM
OS/Arch:             linux/aarch64
Profile:             default
Server type:         server

Additional context

Screenshots

image

image

image

Logs

timestamp,level,flow_run_id,task_run_id,message
2024-12-17 00:08:56.620966+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Worker 'KubernetesWorker a78b2758-1ba6-4010-a2c0-ee14ddb44699' submitting flow run 'cb91b9ce-c8df-4457-9d1c-cdec3d516a5e'
2024-12-17 00:08:57.325384+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Creating Kubernetes job...
2024-12-17 00:08:57.547716+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Starting watch for pod start...
2024-12-17 00:08:57.558362+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod 'affable-turaco-q2jpq-hgzf9' has started.
2024-12-17 00:08:57.558791+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod has status 'Pending'.
2024-12-17 00:08:57.598477+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Completed submission of flow run 'cb91b9ce-c8df-4457-9d1c-cdec3d516a5e'
2024-12-17 00:09:34.644728+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod 'affable-turaco-q2jpq-hgzf9' has started.
2024-12-17 00:09:43.835144+00:00,40,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job 'affable-turaco-q2jpq': Pod never started.
2024-12-17 00:09:48.960376+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Job event 'SuccessfulCreate' at 2024-12-17 00:08:57+00:00: Created pod: affable-turaco-q2jpq-hgzf9
2024-12-17 00:09:48.961380+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,"Pod event 'FailedScheduling' at 2024-12-17 00:09:03+00:00: Failed to schedule pod, incompatible with nodepool ""system"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, did not tolerate system=True:NoSchedule; incompatible with nodepool ""prefectflows"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, did not tolerate prefectflows=true:NoSchedule; incompatible with nodepool ""prefect2flows"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, no instance type satisfied resources {""cpu"":""1324m"",""memory"":""954Mi"",""pods"":""7""} and requirements karpenter.k8s.aws/instance-category In [c m r t], karpenter.k8s.aws/instance-cpu In [2 4 8], karpenter.k8s.aws/instance-generation Exists >2, karpenter.k8s.aws/instance-size NotIn [16xlarge 32xlarge 4xlarge 8xlarge micro and 2 others], karpenter.sh/capacity-type In [on-demand spot], karpenter.sh/nodepool In [prefect2flows], kubernetes.io/arch In [amd64], kubernetes.io/os In [linux], prefect2flows In [true] (no instance type met the scheduling requirements or had enough resources); incompatible with nodepool ""ghrunner"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, did not tolerate ghrunner=true:NoSchedule; incompatible with nodepool ""default"", daemonset overhead={""cpu"":""324m"",""memory"":""442Mi"",""pods"":""6""}, incompatible requirements, label ""prefect2flows"" does not have known values"
2024-12-17 00:09:48.962392+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,"Pod event 'FailedScheduling' (5 times) at 2024-12-17 00:09:22+00:00: 0/25 nodes are available: 1 Insufficient cpu, 1 node(s) had untolerated taint {ghrunner: true}, 1 node(s) had untolerated taint {prefectflows: true}, 18 node(s) didn't match Pod's node affinity/selector, 2 node(s) had untolerated taint {eks.amazonaws.com/compute-type: fargate}, 2 node(s) had untolerated taint {system: True}. preemption: 0/25 nodes are available: 1 No preemption victims found for incoming pod, 24 Preemption is not helpful for scheduling."
2024-12-17 00:09:48.963316+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,"Pod event 'FailedScheduling' at 2024-12-17 00:09:34+00:00: 0/30 nodes are available: 1 Insufficient cpu, 1 node(s) had untolerated taint {ghrunner: true}, 1 node(s) had untolerated taint {prefectflows: true}, 18 node(s) didn't match Pod's node affinity/selector, 2 node(s) had untolerated taint {eks.amazonaws.com/compute-type: fargate}, 2 node(s) had untolerated taint {system: True}, 5 node(s) had untolerated taint {node.kubernetes.io/not-ready: }. preemption: 0/30 nodes are available: 1 No preemption victims found for incoming pod, 29 Preemption is not helpful for scheduling."
2024-12-17 00:09:57.475350+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Reported flow run 'cb91b9ce-c8df-4457-9d1c-cdec3d516a5e' as crashed: Flow run infrastructure exited with non-zero status code -1.
2024-12-17 00:11:53.550565+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Opening process...
2024-12-17 00:11:55.878500+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Downloading flow code from storage at '.'
2024-12-17 00:11:57.158898+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,bf6cc6dd-869b-4316-9598-7f7062117849,2024-12-17 00:11:57.158 | INFO     | __prefect_loader__:do_work:12 - doing some work
2024-12-17 00:12:18.248445+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,bf6cc6dd-869b-4316-9598-7f7062117849,Finished in state Completed()
2024-12-17 00:12:18.249969+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,2024-12-17 00:12:18.248 | INFO     | sentry_sdk.integrations.logging:sentry_patched_callhandlers:95 - Finished in state Completed()
2024-12-17 00:12:18.292922+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Finished in state Completed()
2024-12-17 00:12:18.762510+00:00,20,cb91b9ce-c8df-4457-9d1c-cdec3d516a5e,,Process for flow run 'affable-turaco' exited cleanly.

Note

Please note the near-2-minute gap in the logs between the flow run being marked as CRASHED and the continuation of execution.

Code

The base job template defined in the prefect-worker helm chart values, used for all deployments/flow runs:

worker:
  config:
    baseJobTemplate:
      name: {{ .Release.Name }}-base-job-template
      configuration: |
        {
          "variables": {
            "type": "object",
            "properties": {
              "name": {
                "title": "Name",
                "description": "Name given to infrastructure created by a worker.",
                "type": "string"
              },
              "env": {
                "title": "Environment Variables",
                "description": "Environment variables to set when starting a flow run.",
                "type": "object",
                "additionalProperties": {
                  "type": "string"
                }
              },
              "labels": {
                "title": "Labels",
                "description": "Labels applied to infrastructure created by a worker.",
                "type": "object",
                "additionalProperties": {
                  "type": "string"
                }
              },
              "command": {
                "title": "Command",
                "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.",
                "type": "string"
              },
              "image": {
                "title": "Image",
                "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.",
                "example": "docker.io/prefecthq/prefect:3-latest",
                "type": "string"
              },
              "service_account_name": {
                "title": "Service Account Name",
                "description": "The Kubernetes service account to use for job creation.",
                "type": "string",
                "default": "CENSORED"
              },
              "image_pull_policy": {
                "title": "Image Pull Policy",
                "description": "The Kubernetes image pull policy to use for job containers.",
                "default": "IfNotPresent",
                "enum": [
                  "IfNotPresent",
                  "Always",
                  "Never"
                ],
                "type": "string"
              },
              "finished_job_ttl": {
                "title": "Finished Job TTL",
                "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.",
                "type": "integer"
              },
              "job_watch_timeout_seconds": {
                "title": "Job Watch Timeout Seconds",
                "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.",
                "type": "integer"
              },
              "pod_watch_timeout_seconds": {
                "title": "Pod Watch Timeout Seconds",
                "description": "Number of seconds to watch for pod creation before timing out.",
                "default": 300,
                "type": "integer"
              },
              "stream_output": {
                "title": "Stream Output",
                "description": "If set, output will be streamed from the job to local standard output.",
                "default": true,
                "type": "boolean"
              },
              "cluster_config": {
                "title": "Cluster Config",
                "description": "The Kubernetes cluster config to use for job creation.",
                "allOf": [
                  {
                    "$ref": "#/definitions/KubernetesClusterConfig"
                  }
                ]
              },
              "cpu": {
                "title": "CPU",
                "description": "The number of vCPUs to request for each job.",
                "default": 1,
                "type": "number",
                "min": 0.1,
                "max": 8
              },
              "memory": {
                "title": "Memory",
                "description": "The amount of memory to use for each job in MiB",
                "default": 512,
                "type": "number",
                "min": 64,
                "max": 16384
              },
              "arch": {
                "title": "Architecture",
                "description": "The architecture of the node to which the pods will be deployed.",
                "default": "amd64",
                "enum": [
                  "amd64",
                  "arm64"
                ],
                "type": "string"
              }
            },
            "definitions": {
              "KubernetesClusterConfig": {
                "title": "KubernetesClusterConfig",
                "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
                "type": "object",
                "properties": {
                  "config": {
                    "title": "Config",
                    "description": "The entire contents of a kubectl config file.",
                    "type": "object"
                  },
                  "context_name": {
                    "title": "Context Name",
                    "description": "The name of the kubectl context to use.",
                    "type": "string"
                  }
                },
                "required": [
                  "config",
                  "context_name"
                ],
                "block_type_slug": "kubernetes-cluster-config",
                "secret_fields": [],
                "block_schema_references": {}
              }
            }
          },
          "job_configuration": {
            "command": "{{`{{ command }}`}}",
            "env": "{{`{{ env }}`}}",
            "labels": "{{`{{ labels }}`}}",
            "name": "{{`{{ name }}`}}",
            "namespace": {{ requiredEnv "CENSORED" | quote }},
            "cluster_config": "{{`{{ cluster_config }}`}}",
            "job_watch_timeout_seconds": "{{`{{ job_watch_timeout_seconds }}`}}",
            "pod_watch_timeout_seconds": "{{`{{ pod_watch_timeout_seconds }}`}}",
            "stream_output": "{{`{{ stream_output }}`}}",
            "job_manifest": {
              "apiVersion": "batch/v1",
              "kind": "Job",
              "metadata": {
                "labels": "{{`{{ labels }}`}}",
                "namespace": {{ requiredEnv "CENSORED" | quote }},
                "generateName": "{{`{{ name }}`}}-"
              },
              "spec": {
                "backoffLimit": 0,
                "ttlSecondsAfterFinished": "{{`{{ finished_job_ttl }}`}}",
                "template": {
                  "metadata": {
                    "labels": {
                      "workload": "flow-run"
                    },
                    "annotations": {
                      "karpenter.sh/do-not-disrupt": "true"
                    }
                  },
                  "spec": {
                    "parallelism": 1,
                    "completions": 1,
                    "restartPolicy": "Never",
                    "serviceAccountName": "{{`{{ service_account_name }}`}}",
                    "containers": [
                      {
                        "name": "prefect-job",
                        "env": "{{`{{ env }}`}}",
                        "image": "{{`{{ image }}`}}",
                        "imagePullPolicy": "{{`{{ image_pull_policy }}`}}",
                        "args": "{{`{{ command }}`}}",
                        "lifecycle": {
                          "preStop": {
                            "exec": {
                              "command": [
                                "sleep",
                                "30"
                              ]
                            }
                          }
                        },
                        "resources": {
                          "requests": {
                            "cpu": "{{`{{ cpu }}`}}",
                            "memory": "{{`{{ memory }}`}}Mi"
                          },
                          "limits": {
                            "memory": "{{`{{ memory }}`}}Mi"
                          }
                        },
                        "terminationGracePeriodSeconds": 60
                      }
                    ],
                    "nodeSelector": {
                      "kubernetes.io/arch": "{{`{{ arch }}`}}",
                      "prefect2flows": "true"
                    },
                    "tolerations": [
                      {
                        "key": "prefect2flows",
                        "value": "true",
                        "effect": "NoSchedule"
                      }
                    ]
                  }
                }
              }
            }
          }
        }

The flow used for testing purposes, registered as a deployment on the self-hosted prefect server:

import asyncio
from uuid import uuid4

from CENSORED.logger import logger
from prefect import flow as prefect_flow

from CENSORED.prefect import async_task


@async_task()
async def do_work(wait_time: int):
    logger.info("doing some work")
    await asyncio.sleep(wait_time)
    return {"message": "dummy message"}


@prefect_flow(name="dummy_flow")
async def flow(report_id: str, wait_time: int = 5):
    await do_work(wait_time)


if __name__ == "__main__":
    asyncio.run(flow(report_id=str(uuid4())))

The definition of custom @async_task decorator:

def async_task(fn=None, **kwargs):
    if fn is None:
        return lambda fn: async_task(fn, **kwargs)

    cache_funciton = kwargs.pop("cache_key_fn", cache_key_fn)
    persist_result = kwargs.pop("persist_result", True)

    @task(
        log_prints=True,
        cache_key_fn=cache_funciton,
        persist_result=persist_result,
        result_storage=get_prefect_storage(),
        **kwargs,
    )
    @wraps(fn)
    async def inner(*args, **kwargs) -> dict | None:
        enable_loguru_support()
        init_sentry()

        try:
            return await fn(*args, **kwargs)
        except Exception as err:
            sentry_sdk.capture_exception(err)
            raise err

    return inner

Edits

  1. Added keywords
  2. Added the custom base job template used by the prefect worker, added some deployment details
  3. Added some details on reproducing the issue
  4. Added a detail screenshot of the events captured at the moment of state change from CRASHED to RUNNING
  5. Attempted to reproduce in another, unrelated environment with Prefect 2 and managed to do it. Added details about that
@criskurtin criskurtin added the bug Something isn't working label Dec 17, 2024
@criskurtin
Copy link
Author

If you need any additional info, let me know and I'll update the original comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant