Skip to content

feat(BA-1213): Add detection and event notifications for kernel/container mismatches #4252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6edc345
feat: Produce kernel/container dangling events
fregataa Apr 22, 2025
0d698de
add news fragment
fregataa Apr 22, 2025
29060ab
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 25, 2025
885ac1a
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 28, 2025
92b68af
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 28, 2025
f45266e
remove commented codes
fregataa Apr 28, 2025
107a689
add probe
fregataa Apr 28, 2025
3e03d30
left comments
fregataa Apr 28, 2025
3781f50
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 28, 2025
ba28c43
apply probe to agent
fregataa Apr 28, 2025
3e0417f
change log msg
fregataa Apr 28, 2025
33dcc2e
inject kernel/container getter to probe
fregataa Apr 28, 2025
83b2bee
relocate agent probe codes
fregataa Apr 29, 2025
6365cde
fix kernel re-init after agent restart
fregataa Apr 29, 2025
49dd14a
correct exception
fregataa Apr 29, 2025
e41f033
relocate compare_with_container to probe
fregataa Apr 29, 2025
fc7de24
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 29, 2025
161e50e
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 29, 2025
bd6b21f
rename method
fregataa Apr 29, 2025
964ebc0
Merge branch 'main' into feat/produce-dangling-events
fregataa Apr 30, 2025
a772980
remove not useful codes
fregataa Apr 30, 2025
7726961
remove abc kernelprobe class
fregataa May 1, 2025
1911063
Add prode runner with no resource
fregataa May 1, 2025
304b6a6
better AgentProbe
fregataa May 1, 2025
772422a
Merge branch 'main' into feat/produce-dangling-events
fregataa May 1, 2025
893aff2
better name?
fregataa May 1, 2025
c09764b
small change
fregataa May 3, 2025
294ad2d
remove probe_runner_with_no_resource_ctx
fregataa May 3, 2025
526782f
use label name enum
fregataa May 3, 2025
64157ef
Merge branch 'main' into feat/produce-dangling-events
fregataa May 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/4252.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add detection and event notifications for dangling kernel/container mismatches
20 changes: 20 additions & 0 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
from ai.backend.common.metrics.metric import CommonMetricRegistry
from ai.backend.common.metrics.types import UTILIZATION_METRIC_INTERVAL
from ai.backend.common.plugin.monitor import ErrorPluginContext, StatsPluginContext
from ai.backend.common.runner import ProbeRunner
from ai.backend.common.service_ports import parse_service_ports
from ai.backend.common.types import (
MODEL_SERVICE_RUNTIME_PROFILES,
Expand Down Expand Up @@ -172,6 +173,7 @@
from .affinity_map import AffinityMap
from .exception import AgentError, ContainerCreationError, ResourceError
from .kernel import AbstractKernel, match_distro_data
from .probe import AgentProbe
from .resources import (
AbstractAllocMap,
AbstractComputeDevice,
Expand Down Expand Up @@ -673,6 +675,7 @@ class AbstractAgent(
_ongoing_exec_batch_tasks: weakref.WeakSet[asyncio.Task]
_ongoing_destruction_tasks: weakref.WeakValueDictionary[KernelId, asyncio.Task]
_metric_registry: CommonMetricRegistry
_probe_runner: ProbeRunner

def __init__(
self,
Expand Down Expand Up @@ -834,6 +837,9 @@ async def _pipeline(r: Redis):
self.last_registry_written_time = time.monotonic()
self.container_lifecycle_handler = loop.create_task(self.process_lifecycle_events())

self._probe_runner = self._init_probe_runner_obj()
await self._probe_runner.run()

# Notify the gateway.
await self.produce_event(AgentStartedEvent(reason="self-started"))

Expand Down Expand Up @@ -874,6 +880,7 @@ async def shutdown(self, stop_signal: signal.Signals) -> None:
It must call this super method in an appropriate order, only once.
"""
await cancel_tasks(self._ongoing_exec_batch_tasks)
await self._probe_runner.close()

async with self.registry_lock:
# Close all pending kernel runners.
Expand Down Expand Up @@ -1412,6 +1419,17 @@ async def reconstruct_resource_usage(self) -> None:
kernel_id,
)

def get_kernel_registry(self) -> Mapping[KernelId, AbstractKernel]:
return self.kernel_registry

def _init_probe_runner_obj(self) -> ProbeRunner:
probe = AgentProbe(
self.enumerate_containers,
self.get_kernel_registry,
self.event_producer,
)
return ProbeRunner.with_nop_resource_ctx(11.0, [probe])

async def sync_container_lifecycles(self, interval: float) -> None:
"""
Periodically synchronize the alive/known container sets,
Expand Down Expand Up @@ -1779,9 +1797,11 @@ async def scan_running_kernels(self) -> None:
pass
for kernel_obj in self.kernel_registry.values():
kernel_obj.agent_config = self.local_config
kernel_obj._event_producer = self.event_producer
if kernel_obj.runner is not None:
kernel_obj.runner.event_producer = self.event_producer
await kernel_obj.runner.__ainit__()
await kernel_obj.init_probe_runner()
async with self.registry_lock:
for kernel_id, container in await self.enumerate_containers(
ACTIVE_STATUS_SET | DEAD_STATUS_SET,
Expand Down
49 changes: 15 additions & 34 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
)
from ..exception import ContainerCreationError, UnsupportedResource
from ..fs import create_scratch_filesystem, destroy_scratch_filesystem
from ..kernel import AbstractKernel
from ..kernel import AbstractKernel, KernelInitArgs
from ..plugin.network import ContainerNetworkCapability, ContainerNetworkInfo, NetworkPluginContext
from ..proxy import DomainSocketProxy, proxy_connection
from ..resources import AbstractComputePlugin, KernelResourceSpec, Mount, known_slot_types
Expand All @@ -111,7 +111,6 @@
KernelOwnershipData,
LifecycleEvent,
MountInfo,
Port,
)
from ..utils import (
closing_async,
Expand All @@ -123,7 +122,7 @@
from .kernel import DockerKernel
from .metadata.server import MetadataServer
from .resources import load_resources, scan_available_resources
from .utils import PersistentServiceContainer
from .utils import PersistentServiceContainer, container_from_docker_container

if TYPE_CHECKING:
from ai.backend.common.auth import PublicKey
Expand All @@ -146,27 +145,6 @@
}


def container_from_docker_container(src: DockerContainer) -> Container:
ports = []
for private_port, host_ports in src["NetworkSettings"]["Ports"].items():
private_port = int(private_port.split("/")[0])
if host_ports is None:
host_ip = "127.0.0.1"
host_port = 0
else:
host_ip = host_ports[0]["HostIp"]
host_port = int(host_ports[0]["HostPort"])
ports.append(Port(host_ip, private_port, host_port))
return Container(
id=src._id,
status=src["State"]["Status"],
image=src["Config"]["Image"],
labels=src["Config"]["Labels"],
ports=ports,
backend_obj=src,
)


async def _clean_scratch(
loop: asyncio.AbstractEventLoop,
scratch_type: str,
Expand Down Expand Up @@ -874,16 +852,19 @@ def chown_idfile(uid: Optional[int], gid: Optional[int]) -> None:
)

kernel_obj = DockerKernel(
self.ownership_data,
self.kernel_config["network_id"],
self.image_ref,
self.kspec_version,
cluster_info["network_config"].get("mode", "bridge"),
agent_config=self.local_config,
service_ports=service_ports,
resource_spec=resource_spec,
environ=environ,
data={},
KernelInitArgs(
ownership_data=self.ownership_data,
network_id=self.kernel_config["network_id"],
image=self.image_ref,
version=self.kspec_version,
agent_config=self.local_config,
service_ports=service_ports,
resource_spec=resource_spec,
environ=environ,
data={},
event_producer=self.event_producer,
),
network_driver=cluster_info["network_config"].get("mode", "bridge"),
)
return kernel_obj

Expand Down
48 changes: 19 additions & 29 deletions src/ai/backend/agent/docker/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import subprocess
import textwrap
from pathlib import Path, PurePosixPath
from typing import Any, Dict, Final, FrozenSet, Mapping, Optional, Sequence, Tuple, cast, override
from typing import Any, Final, FrozenSet, Mapping, Optional, Sequence, Tuple, cast, override

import janus
import pkg_resources
Expand All @@ -21,18 +21,20 @@
from aiotools import TaskGroup

from ai.backend.agent.docker.utils import PersistentServiceContainer
from ai.backend.common.docker import ImageRef
from ai.backend.common.events import EventProducer
from ai.backend.common.lock import FileLock
from ai.backend.common.runner import ProbeRunner
from ai.backend.common.types import CommitStatus, KernelId, Sentinel
from ai.backend.common.utils import current_loop
from ai.backend.logging import BraceStyleAdapter
from ai.backend.plugin.entrypoint import scan_entrypoints

from ..kernel import AbstractCodeRunner, AbstractKernel
from ..resources import KernelResourceSpec
from ..types import AgentEventData, KernelOwnershipData
from ..kernel import AbstractCodeRunner, AbstractKernel, KernelInitArgs
from ..types import (
AgentEventData,
)
from ..utils import closing_async, get_arch_name
from .probe import DockerKernelProbe

log = BraceStyleAdapter(logging.getLogger(__spec__.name))

Expand All @@ -45,35 +47,13 @@ class DockerKernel(AbstractKernel):

def __init__(
self,
ownership_data: KernelOwnershipData,
network_id: str,
image: ImageRef,
version: int,
args: KernelInitArgs,
network_driver: str,
*,
agent_config: Mapping[str, Any],
resource_spec: KernelResourceSpec,
service_ports: Any, # TODO: type-annotation
environ: Mapping[str, Any],
data: Dict[str, Any],
) -> None:
super().__init__(
ownership_data,
network_id,
image,
version,
agent_config=agent_config,
resource_spec=resource_spec,
service_ports=service_ports,
data=data,
environ=environ,
)
super().__init__(args)
Comment on lines -49 to +53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


self.network_driver = network_driver

async def close(self) -> None:
pass

def __getstate__(self):
props = super().__getstate__()
return props
Expand All @@ -83,6 +63,16 @@ def __setstate__(self, props):
props["network_driver"] = "bridge"
super().__setstate__(props)

@override
def _init_probe_runner_obj(self) -> ProbeRunner:
probe = DockerKernelProbe(
self.kernel_id,
self.get_kernel_lifecycle_state,
self.get_container_id,
self._event_producer,
)
return ProbeRunner.with_nop_resource_ctx(5.0, [probe])

async def create_code_runner(
self, event_producer: EventProducer, *, client_features: FrozenSet[str], api_version: int
) -> AbstractCodeRunner:
Expand Down
69 changes: 69 additions & 0 deletions src/ai/backend/agent/docker/probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

from typing import Callable, Optional

from aiodocker.docker import Docker
from aiodocker.exceptions import DockerError

from ai.backend.common.events import (
DanglingKernelDetected,
EventProducer,
)
from ai.backend.common.types import ContainerId, KernelId

from ..probe import DanglingKernel
from ..types import (
Container,
ContainerStatus,
KernelLifecycleStatus,
)
from ..utils import closing_async
from .utils import container_from_docker_container


class DockerKernelProbe:
def __init__(
self,
kernel_id: KernelId,
kernel_state_getter: Callable[..., KernelLifecycleStatus],
container_id_getter: Callable[..., Optional[ContainerId]],
event_producer: EventProducer,
) -> None:
self._kernel_id = kernel_id
self._container_id_getter = container_id_getter
self._kernel_state_getter = kernel_state_getter
self._event_producer = event_producer

async def _get_container_info(self) -> Optional[Container]:
cid = self._container_id_getter()
if cid is None:
return None
async with closing_async(Docker()) as docker:
try:
container = await docker.containers.get(str(cid))
except DockerError as e:
if e.status == 404:
raise DanglingKernel
return container_from_docker_container(container)

def _compare_with_container(self, container: Optional[Container]) -> None:
kernel_state = self._kernel_state_getter()
match kernel_state:
case KernelLifecycleStatus.PREPARING:
if container is not None:
# container exists but kernel is hanging in PREPARING state
raise DanglingKernel
case KernelLifecycleStatus.RUNNING:
if container is None or container.status != ContainerStatus.RUNNING:
raise DanglingKernel
case KernelLifecycleStatus.TERMINATING:
# There might be a delay in the container status change
# after the kernel is being terminated.
pass

async def probe(self, resource_ctx: None) -> None:
try:
container = await self._get_container_info()
self._compare_with_container(container)
except DanglingKernel:
await self._event_producer.produce_event(DanglingKernelDetected(self._kernel_id))
24 changes: 23 additions & 1 deletion src/ai/backend/agent/docker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from typing import Any, Final, Mapping, Optional, Tuple

import pkg_resources
from aiodocker.docker import Docker
from aiodocker.docker import Docker, DockerContainer
from aiodocker.exceptions import DockerError

from ai.backend.logging import BraceStyleAdapter

from ..exception import InitializationError
from ..types import Container, Port
from ..utils import closing_async, get_arch_name, update_nested_dict

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
Expand Down Expand Up @@ -182,3 +183,24 @@
async with closing_async(Docker()) as docker:
c = docker.containers.container(self.container_name)
await c.start()


def container_from_docker_container(src: DockerContainer) -> Container:
ports = []
for private_port, host_ports in src["NetworkSettings"]["Ports"].items():
private_port = int(private_port.split("/")[0])
if host_ports is None:
host_ip = "127.0.0.1"

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
host_port = 0
else:
host_ip = host_ports[0]["HostIp"]
host_port = int(host_ports[0]["HostPort"])
ports.append(Port(host_ip, private_port, host_port))
return Container(
id=src._id,
status=src["State"]["Status"],
image=src["Config"]["Image"],
labels=src["Config"]["Labels"],
ports=ports,
backend_obj=src,
)
23 changes: 13 additions & 10 deletions src/ai/backend/agent/dummy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
ScanImagesResult,
)
from ..exception import UnsupportedResource
from ..kernel import AbstractKernel
from ..kernel import AbstractKernel, KernelInitArgs
from ..resources import AbstractComputePlugin, KernelResourceSpec, Mount, known_slot_types
from ..types import Container, ContainerStatus, KernelOwnershipData, MountInfo
from .config import DEFAULT_CONFIG_PATH, dummy_local_config
Expand Down Expand Up @@ -175,15 +175,18 @@ async def prepare_container(
delay = self.creation_ctx_config["delay"]["spawn"]
await asyncio.sleep(delay)
return DummyKernel(
self.ownership_data,
self.kernel_config["network_id"],
self.image_ref,
self.kspec_version,
agent_config=self.local_config,
service_ports=service_ports,
resource_spec=resource_spec,
environ=environ,
data={},
KernelInitArgs(
ownership_data=self.ownership_data,
network_id=self.kernel_config["network_id"],
image=self.image_ref,
version=self.kspec_version,
agent_config=self.local_config,
resource_spec=resource_spec,
service_ports=service_ports,
environ=environ,
data={},
event_producer=self.event_producer,
),
dummy_config=self.dummy_config,
)

Expand Down
Loading
Loading