diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/services.py b/packages/models-library/src/models_library/api_schemas_directorv2/services.py index c797c687fd1..3d2fb51f302 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/services.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/services.py @@ -103,3 +103,7 @@ class ServiceExtras(BaseModel): CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME: Final[NonNegativeInt] = 89 + + +DYNAMIC_SIDECAR_SERVICE_PREFIX: Final[str] = "dy-sidecar" +DYNAMIC_PROXY_SERVICE_PREFIX: Final[str] = "dy-proxy" diff --git a/packages/models-library/src/models_library/docker.py b/packages/models-library/src/models_library/docker.py index 6e87f06b62e..db5f51ef359 100644 --- a/packages/models-library/src/models_library/docker.py +++ b/packages/models-library/src/models_library/docker.py @@ -37,7 +37,15 @@ def from_key(cls, key: str) -> "DockerLabelKey": str, StringConstraints(pattern=DOCKER_GENERIC_TAG_KEY_RE) ] -DockerPlacementConstraint: TypeAlias = Annotated[str, StringConstraints(strip_whitespace = True, pattern = re.compile(r"^(?!-)(?![.])(?!.*--)(?!.*[.][.])[a-zA-Z0-9.-]*(? "StandardSimcoreDockerLabels": ] }, ) + + +DockerNodeID: TypeAlias = Annotated[ + str, StringConstraints(strip_whitespace=True, pattern=re.compile(r"[a-zA-Z0-9]")) +] diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/containers.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/containers.py new file mode 100644 index 00000000000..2049f0a409f --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/containers.py @@ -0,0 +1,37 @@ +import logging +from datetime import timedelta +from typing import Final + +from models_library.docker import DockerNodeID +from models_library.projects_nodes_io import NodeID +from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace +from pydantic import NonNegativeInt, TypeAdapter +from servicelib.logging_utils import log_decorator +from servicelib.rabbitmq import RabbitMQRPCClient + +_logger = logging.getLogger(__name__) + +_REQUEST_TIMEOUT: Final[NonNegativeInt] = int(timedelta(minutes=60).total_seconds()) + + +@log_decorator(_logger, level=logging.DEBUG) +async def force_container_cleanup( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + docker_node_id: DockerNodeID, + swarm_stack_name: str, + node_id: NodeID, +) -> None: + result = await rabbitmq_rpc_client.request( + RPCNamespace.from_entries( + { + "service": "agent", + "docker_node_id": docker_node_id, + "swarm_stack_name": swarm_stack_name, + } + ), + TypeAdapter(RPCMethodName).validate_python("force_container_cleanup"), + node_id=node_id, + timeout_s=_REQUEST_TIMEOUT, + ) + assert result is None # nosec diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py index 043898dcb30..41cf2ffd8b8 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py @@ -2,6 +2,7 @@ from datetime import timedelta from typing import Final +from models_library.docker import DockerNodeID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace from pydantic import NonNegativeInt, TypeAdapter @@ -17,7 +18,7 @@ async def remove_volumes_without_backup_for_service( rabbitmq_rpc_client: RabbitMQRPCClient, *, - docker_node_id: str, + docker_node_id: DockerNodeID, swarm_stack_name: str, node_id: NodeID, ) -> None: @@ -42,7 +43,7 @@ async def remove_volumes_without_backup_for_service( async def backup_and_remove_volumes_for_all_services( rabbitmq_rpc_client: RabbitMQRPCClient, *, - docker_node_id: str, + docker_node_id: DockerNodeID, swarm_stack_name: str, ) -> None: result = await rabbitmq_rpc_client.request( diff --git a/services/agent/src/simcore_service_agent/api/rpc/_containers.py b/services/agent/src/simcore_service_agent/api/rpc/_containers.py new file mode 100644 index 00000000000..e7d651d6ede --- /dev/null +++ b/services/agent/src/simcore_service_agent/api/rpc/_containers.py @@ -0,0 +1,20 @@ +import logging + +from fastapi import FastAPI +from models_library.projects_nodes_io import NodeID +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RPCRouter + +from ...services.containers_manager import ContainersManager + +_logger = logging.getLogger(__name__) + +router = RPCRouter() + + +@router.expose() +async def force_container_cleanup(app: FastAPI, *, node_id: NodeID) -> None: + with log_context( + _logger, logging.INFO, f"removing all orphan container for {node_id=}" + ): + await ContainersManager.get_from_app_state(app).force_container_cleanup(node_id) diff --git a/services/agent/src/simcore_service_agent/api/rpc/_volumes.py b/services/agent/src/simcore_service_agent/api/rpc/_volumes.py index 96edb817e62..9d2433a19af 100644 --- a/services/agent/src/simcore_service_agent/api/rpc/_volumes.py +++ b/services/agent/src/simcore_service_agent/api/rpc/_volumes.py @@ -7,7 +7,8 @@ from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( NoServiceVolumesFoundRPCError, ) -from simcore_service_agent.services.volumes_manager import VolumesManager + +from ...services.volumes_manager import VolumesManager _logger = logging.getLogger(__name__) diff --git a/services/agent/src/simcore_service_agent/api/rpc/routes.py b/services/agent/src/simcore_service_agent/api/rpc/routes.py index 7a658ae5280..e8b0cea8f4c 100644 --- a/services/agent/src/simcore_service_agent/api/rpc/routes.py +++ b/services/agent/src/simcore_service_agent/api/rpc/routes.py @@ -4,9 +4,10 @@ from simcore_service_agent.core.settings import ApplicationSettings from ...services.rabbitmq import get_rabbitmq_rpc_server -from . import _volumes +from . import _containers, _volumes ROUTERS: list[RPCRouter] = [ + _containers.router, _volumes.router, ] diff --git a/services/agent/src/simcore_service_agent/core/application.py b/services/agent/src/simcore_service_agent/core/application.py index fe226a33558..b0cfa8720e4 100644 --- a/services/agent/src/simcore_service_agent/core/application.py +++ b/services/agent/src/simcore_service_agent/core/application.py @@ -18,6 +18,7 @@ ) from ..api.rest.routes import setup_rest_api from ..api.rpc.routes import setup_rpc_api_routes +from ..services.containers_manager import setup_containers_manager from ..services.instrumentation import setup_instrumentation from ..services.rabbitmq import setup_rabbitmq from ..services.volumes_manager import setup_volume_manager @@ -28,8 +29,8 @@ def _setup_logger(settings: ApplicationSettings): # SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148 - logging.basicConfig(level=settings.LOGLEVEL.value) # NOSONAR - logging.root.setLevel(settings.LOGLEVEL.value) + logging.basicConfig(level=settings.LOG_LEVEL.value) # NOSONAR + logging.root.setLevel(settings.LOG_LEVEL.value) config_all_loggers( log_format_local_dev_enabled=settings.AGENT_VOLUMES_LOG_FORMAT_LOCAL_DEV_ENABLED, logger_filter_mapping=settings.AGENT_VOLUMES_LOG_FILTER_MAPPING, @@ -58,6 +59,7 @@ def create_app() -> FastAPI: setup_rabbitmq(app) setup_volume_manager(app) + setup_containers_manager(app) setup_rest_api(app) setup_rpc_api_routes(app) diff --git a/services/agent/src/simcore_service_agent/core/settings.py b/services/agent/src/simcore_service_agent/core/settings.py index f37d7c8d263..742d3bf02d1 100644 --- a/services/agent/src/simcore_service_agent/core/settings.py +++ b/services/agent/src/simcore_service_agent/core/settings.py @@ -1,6 +1,7 @@ from datetime import timedelta from models_library.basic_types import BootModeEnum, LogLevel +from models_library.docker import DockerNodeID from pydantic import AliasChoices, AnyHttpUrl, Field, field_validator from servicelib.logging_utils_filtering import LoggerName, MessageSubstring from settings_library.base import BaseCustomSettings @@ -11,7 +12,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): - LOGLEVEL: LogLevel = Field( + LOG_LEVEL: LogLevel = Field( LogLevel.WARNING, validation_alias=AliasChoices( "AGENT_LOGLEVEL", @@ -79,7 +80,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): AGENT_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True - AGENT_DOCKER_NODE_ID: str = Field(..., description="used by the rabbitmq module") + AGENT_DOCKER_NODE_ID: DockerNodeID = Field( + ..., description="used by the rabbitmq module" + ) AGENT_RABBITMQ: RabbitSettings = Field( description="settings for service/rabbitmq", @@ -91,7 +94,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): json_schema_extra={"auto_default_from_env": True}, ) - @field_validator("LOGLEVEL") + @field_validator("LOG_LEVEL") @classmethod def valid_log_level(cls, value) -> LogLevel: return LogLevel(cls.validate_log_level(value)) diff --git a/services/agent/src/simcore_service_agent/services/containers_manager.py b/services/agent/src/simcore_service_agent/services/containers_manager.py new file mode 100644 index 00000000000..ca2317e156e --- /dev/null +++ b/services/agent/src/simcore_service_agent/services/containers_manager.py @@ -0,0 +1,71 @@ +import logging +from dataclasses import dataclass, field + +from aiodocker import Docker +from fastapi import FastAPI +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) +from models_library.projects_nodes_io import NodeID +from servicelib.fastapi.app_state import SingletonInAppStateMixin + +from .docker_utils import get_containers_with_prefixes, remove_container_forcefully + +_logger = logging.getLogger(__name__) + + +@dataclass +class ContainersManager(SingletonInAppStateMixin): + app_state_name: str = "containers_manager" + + docker: Docker = field(default_factory=Docker) + + async def force_container_cleanup(self, node_id: NodeID) -> None: + # compose all possible used container prefixes + proxy_prefix = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}" + dy_sidecar_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}" + user_service_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}" + + orphan_containers = await get_containers_with_prefixes( + self.docker, {proxy_prefix, dy_sidecar_prefix, user_service_prefix} + ) + _logger.debug( + "Detected orphan containers for node_id='%s': %s", + node_id, + orphan_containers, + ) + + unexpected_orphans = { + orphan + for orphan in orphan_containers + if orphan.startswith(user_service_prefix) + } + if unexpected_orphans: + _logger.warning( + "Unexpected orphans detected for node_id='%s': %s", + node_id, + unexpected_orphans, + ) + + # avoids parallel requests to docker engine + for container in orphan_containers: + await remove_container_forcefully(self.docker, container) + + async def shutdown(self) -> None: + await self.docker.close() + + +def get_containers_manager(app: FastAPI) -> ContainersManager: + return ContainersManager.get_from_app_state(app) + + +def setup_containers_manager(app: FastAPI) -> None: + async def _on_startup() -> None: + ContainersManager().set_to_app_state(app) + + async def _on_shutdown() -> None: + await ContainersManager.get_from_app_state(app).shutdown() + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/agent/src/simcore_service_agent/services/docker_utils.py b/services/agent/src/simcore_service_agent/services/docker_utils.py index 83656783b55..1390a5b12df 100644 --- a/services/agent/src/simcore_service_agent/services/docker_utils.py +++ b/services/agent/src/simcore_service_agent/services/docker_utils.py @@ -106,3 +106,27 @@ async def remove_volume( get_instrumentation(app).agent_metrics.remove_volumes( settings.AGENT_DOCKER_NODE_ID ) + + +async def get_containers_with_prefixes(docker: Docker, prefixes: set[str]) -> set[str]: + """Returns a set of container names matching any of the given prefixes""" + all_containers = await docker.containers.list(all=True) + + result: set[str] = set() + for container in all_containers: + container_info = await container.show() + container_name = container_info.get("Name", "").lstrip("/") + if any(container_name.startswith(prefix) for prefix in prefixes): + result.add(container_name) + + return result + + +async def remove_container_forcefully(docker: Docker, container_id: str) -> None: + """Removes a container regardless of it's state""" + try: + container = await docker.containers.get(container_id) + await container.delete(force=True) + except DockerError as e: + if e.status != status.HTTP_404_NOT_FOUND: + raise diff --git a/services/agent/src/simcore_service_agent/services/instrumentation/_models.py b/services/agent/src/simcore_service_agent/services/instrumentation/_models.py index bf554374595..2c49859e897 100644 --- a/services/agent/src/simcore_service_agent/services/instrumentation/_models.py +++ b/services/agent/src/simcore_service_agent/services/instrumentation/_models.py @@ -1,6 +1,7 @@ from dataclasses import dataclass, field from typing import Final +from models_library.docker import DockerNodeID from prometheus_client import CollectorRegistry, Counter from servicelib.instrumentation import MetricsBase, get_metrics_namespace @@ -34,10 +35,10 @@ def __post_init__(self) -> None: registry=self.registry, ) - def remove_volumes(self, docker_node_id: str) -> None: + def remove_volumes(self, docker_node_id: DockerNodeID) -> None: self.volumes_removed.labels(docker_node_id=docker_node_id).inc() - def backedup_volumes(self, docker_node_id: str) -> None: + def backedup_volumes(self, docker_node_id: DockerNodeID) -> None: self.volumes_backedup.labels(docker_node_id=docker_node_id).inc() diff --git a/services/agent/tests/conftest.py b/services/agent/tests/conftest.py index 14e8cd1d9e3..97df58d4e5a 100644 --- a/services/agent/tests/conftest.py +++ b/services/agent/tests/conftest.py @@ -5,6 +5,7 @@ import pytest from faker import Faker from models_library.basic_types import BootModeEnum +from models_library.docker import DockerNodeID from moto.server import ThreadedMotoServer from pydantic import HttpUrl, TypeAdapter from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict @@ -25,8 +26,8 @@ def swarm_stack_name() -> str: @pytest.fixture -def docker_node_id() -> str: - return "test-node-id" +def docker_node_id() -> DockerNodeID: + return TypeAdapter(DockerNodeID).validate_python("testnodeid") @pytest.fixture @@ -40,7 +41,7 @@ def mock_environment( mocked_s3_server_url: HttpUrl, bucket: str, swarm_stack_name: str, - docker_node_id: str, + docker_node_id: DockerNodeID, ) -> EnvVarsDict: return setenvs_from_dict( monkeypatch, diff --git a/services/agent/tests/unit/test_api_rpc__containers.py b/services/agent/tests/unit/test_api_rpc__containers.py new file mode 100644 index 00000000000..201acf5d218 --- /dev/null +++ b/services/agent/tests/unit/test_api_rpc__containers.py @@ -0,0 +1,55 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from collections.abc import Awaitable, Callable +from unittest.mock import AsyncMock + +import pytest +import pytest_mock +from faker import Faker +from fastapi import FastAPI +from models_library.docker import DockerNodeID +from models_library.projects_nodes_io import NodeID +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.agent import containers + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +async def rpc_client( + initialized_app: FastAPI, + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + +@pytest.fixture +def mocked_force_container_cleanup(mocker: pytest_mock.MockerFixture) -> AsyncMock: + return mocker.patch( + "simcore_service_agent.services.containers_manager.ContainersManager.force_container_cleanup" + ) + + +async def test_force_container_cleanup( + rpc_client: RabbitMQRPCClient, + swarm_stack_name: str, + docker_node_id: DockerNodeID, + node_id: NodeID, + mocked_force_container_cleanup: AsyncMock, +): + assert mocked_force_container_cleanup.call_count == 0 + await containers.force_container_cleanup( + rpc_client, + docker_node_id=docker_node_id, + swarm_stack_name=swarm_stack_name, + node_id=node_id, + ) + assert mocked_force_container_cleanup.call_count == 1 diff --git a/services/agent/tests/unit/test_api_rpc__volumes.py b/services/agent/tests/unit/test_api_rpc__volumes.py index df7121d1418..6e7eeb76485 100644 --- a/services/agent/tests/unit/test_api_rpc__volumes.py +++ b/services/agent/tests/unit/test_api_rpc__volumes.py @@ -8,6 +8,7 @@ import pytest import pytest_mock from fastapi import FastAPI +from models_library.docker import DockerNodeID from servicelib.rabbitmq import RabbitMQRPCClient from servicelib.rabbitmq.rpc_interfaces.agent import volumes @@ -41,7 +42,7 @@ def mocked_remove_all_volumes(mocker: pytest_mock.MockerFixture) -> AsyncMock: async def test_backup_and_remove_volumes_for_all_services( rpc_client: RabbitMQRPCClient, swarm_stack_name: str, - docker_node_id: str, + docker_node_id: DockerNodeID, mocked_remove_all_volumes: AsyncMock, ): assert mocked_remove_all_volumes.call_count == 0 diff --git a/services/agent/tests/unit/test_services_containers_manager.py b/services/agent/tests/unit/test_services_containers_manager.py new file mode 100644 index 00000000000..4489d975ab3 --- /dev/null +++ b/services/agent/tests/unit/test_services_containers_manager.py @@ -0,0 +1,107 @@ +# pylint: disable=redefined-outer-name + + +import logging +from collections.abc import AsyncIterable, Awaitable, Callable +from enum import Enum + +import pytest +from aiodocker import Docker, DockerError +from asgi_lifespan import LifespanManager +from faker import Faker +from fastapi import FastAPI, status +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) +from models_library.projects_nodes_io import NodeID +from simcore_service_agent.services.containers_manager import ( + get_containers_manager, + setup_containers_manager, +) + + +@pytest.fixture +async def app() -> AsyncIterable[FastAPI]: + app = FastAPI() + setup_containers_manager(app) + + async with LifespanManager(app): + yield app + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +async def docker() -> AsyncIterable[Docker]: + async with Docker() as docker: + yield docker + + +class _ContainerMode(Enum): + CREATED = "CREATED" + RUNNING = "RUNNING" + STOPPED = "STOPPED" + + +@pytest.fixture +async def create_container( + docker: Docker, +) -> AsyncIterable[Callable[[str, _ContainerMode], Awaitable[str]]]: + created_containers: set[str] = set() + + async def _(name: str, container_mode: _ContainerMode) -> str: + container = await docker.containers.create( + config={ + "Image": "alpine", + "Cmd": ["sh", "-c", "while true; do sleep 1; done"], + }, + name=name, + ) + + if container_mode in (_ContainerMode.RUNNING, _ContainerMode.STOPPED): + await container.start() + if container_mode == _ContainerMode.STOPPED: + await container.stop() + + created_containers.add(container.id) + return container.id + + yield _ + + # cleanup containers + for container_id in created_containers: + try: + container = await docker.containers.get(container_id) + await container.delete(force=True) + except DockerError as e: + if e.status != status.HTTP_404_NOT_FOUND: + raise + + +async def test_force_container_cleanup( + app: FastAPI, + node_id: NodeID, + create_container: Callable[[str, _ContainerMode], Awaitable[str]], + faker: Faker, + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(logging.DEBUG) + caplog.clear() + + proxy_name = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}{faker.pystr()}" + dynamic_sidecar_name = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}{faker.pystr()}" + user_service_name = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}{faker.pystr()}" + + await create_container(proxy_name, _ContainerMode.CREATED) + await create_container(dynamic_sidecar_name, _ContainerMode.RUNNING) + await create_container(user_service_name, _ContainerMode.STOPPED) + + await get_containers_manager(app).force_container_cleanup(node_id) + + assert proxy_name in caplog.text + assert dynamic_sidecar_name in caplog.text + assert user_service_name in caplog.text diff --git a/services/director-v2/openapi.json b/services/director-v2/openapi.json index 63418baabe5..51e5f191b27 100644 --- a/services/director-v2/openapi.json +++ b/services/director-v2/openapi.json @@ -2018,7 +2018,8 @@ "docker_node_id": { "anyOf": [ { - "type": "string" + "type": "string", + "pattern": "[a-zA-Z0-9]" }, { "type": "null" diff --git a/services/director-v2/src/simcore_service_director_v2/constants.py b/services/director-v2/src/simcore_service_director_v2/constants.py index b84865745df..194425d0328 100644 --- a/services/director-v2/src/simcore_service_director_v2/constants.py +++ b/services/director-v2/src/simcore_service_director_v2/constants.py @@ -1,8 +1,11 @@ from typing import Final -# dynamic services -DYNAMIC_SIDECAR_SERVICE_PREFIX: Final[str] = "dy-sidecar" -DYNAMIC_PROXY_SERVICE_PREFIX: Final[str] = "dy-proxy" +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) + +# dynamic services # label storing scheduler_data to allow service # monitoring recovery after director-v2 reboots diff --git a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py index 7e9b3ebeac6..560517b538c 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py @@ -14,8 +14,13 @@ from models_library.api_schemas_directorv2.dynamic_services_service import ( CommonServiceDetails, ) +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) from models_library.basic_types import PortInt from models_library.callbacks_mapping import CallbacksMapping +from models_library.docker import DockerNodeID from models_library.generated_models.docker_rest_api import ContainerState, Status2 from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo, PricingInfo @@ -39,9 +44,7 @@ from servicelib.exception_utils import DelayedExceptionHandler from ..constants import ( - DYNAMIC_PROXY_SERVICE_PREFIX, DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL, - DYNAMIC_SIDECAR_SERVICE_PREFIX, REGEX_DY_SERVICE_PROXY, REGEX_DY_SERVICE_SIDECAR, ) @@ -297,7 +300,7 @@ def compose_spec_submitted(self) -> bool: default=None, description="used for starting the proxy" ) - docker_node_id: str | None = Field( + docker_node_id: DockerNodeID | None = Field( default=None, description=( "contains node id of the docker node where all services " diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py index 32bb114f095..37dc914451c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py @@ -1,7 +1,8 @@ +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) from models_library.projects_nodes_io import NodeID -from ...constants import DYNAMIC_SIDECAR_SERVICE_PREFIX - def get_compose_namespace(node_uuid: NodeID) -> str: # To avoid collisions for started docker resources a unique identifier is computed: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py index 1e05524b48d..350c406c1eb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py @@ -9,7 +9,10 @@ from common_library.json_serialization import json_dumps from fastapi.encoders import jsonable_encoder from models_library.aiodocker_api import AioDockerServiceSpec -from models_library.docker import to_simcore_runtime_docker_label_key +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) +from models_library.docker import DockerNodeID, to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_networks import DockerNetworkName from models_library.projects_nodes_io import NodeID @@ -22,10 +25,7 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_exponential, wait_random_exponential -from ....constants import ( - DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL, - DYNAMIC_SIDECAR_SERVICE_PREFIX, -) +from ....constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL from ....core.dynamic_services_settings.scheduler import ( DynamicServicesSchedulerSettings, ) @@ -170,7 +170,7 @@ async def _get_service_latest_task(service_id: str) -> Mapping[str, Any]: async def get_dynamic_sidecar_placement( service_id: str, dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings, -) -> str: +) -> DockerNodeID: """ Waits until the service has a task in `running` state and returns it's `docker_node_id`. @@ -205,7 +205,7 @@ async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, A task = await _get_task_data_when_service_running(service_id=service_id) - docker_node_id: None | str = task.get("NodeID", None) + docker_node_id: DockerNodeID | None = task.get("NodeID", None) if not docker_node_id: msg = f"Could not find an assigned NodeID for service_id={service_id}. Last task inspect result: {task}" raise DynamicSidecarError(msg=msg) @@ -494,7 +494,9 @@ async def update_scheduler_data_label(scheduler_data: SchedulerData) -> None: ) -async def constrain_service_to_node(service_name: str, docker_node_id: str) -> None: +async def constrain_service_to_node( + service_name: str, docker_node_id: DockerNodeID +) -> None: await _update_service_spec( service_name, update_in_service_spec={ diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index e3b6d024bf8..4a127e59e51 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -28,6 +28,7 @@ from servicelib.logging_utils import log_context from servicelib.rabbitmq import RabbitMQClient from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.agent.containers import force_container_cleanup from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( NoServiceVolumesFoundRPCError, ) @@ -210,6 +211,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( set_were_state_and_outputs_saved: bool | None = None, ) -> None: scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid) + rabbit_rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client if set_were_state_and_outputs_saved is not None: scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True @@ -221,7 +223,14 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( node_uuid=scheduler_data.node_uuid, swarm_stack_name=swarm_stack_name, ) - # remove network + if scheduler_data.dynamic_sidecar.docker_node_id: + await force_container_cleanup( + rabbit_rpc_client, + docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id, + swarm_stack_name=swarm_stack_name, + node_id=scheduler_data.node_uuid, + ) + task_progress.update(message="removing network", percent=ProgressPercent(0.2)) await remove_dynamic_sidecar_network(scheduler_data.dynamic_sidecar_network_name) @@ -237,7 +246,6 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( message="removing volumes", percent=ProgressPercent(0.3) ) with log_context(_logger, logging.DEBUG, f"removing volumes '{node_uuid}'"): - rabbit_rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client try: await remove_volumes_without_backup_for_service( rabbit_rpc_client, diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py index 5410c37f203..fd328bd66aa 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py @@ -14,6 +14,8 @@ import respx from faker import Faker from fastapi import FastAPI +from models_library.docker import DockerNodeID +from pydantic import TypeAdapter from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict @@ -72,7 +74,9 @@ def mock_env( @pytest.fixture def scheduler_data(scheduler_data_from_http_request: SchedulerData) -> SchedulerData: - scheduler_data_from_http_request.docker_node_id = "test_docker_node_id" + scheduler_data_from_http_request.dynamic_sidecar.docker_node_id = TypeAdapter( + DockerNodeID + ).validate_python("testdockernodeid") return scheduler_data_from_http_request @@ -211,8 +215,10 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: @pytest.fixture -def mock_remove_calls(mocker: MockerFixture) -> None: +def mock_rpc_calls(mocker: MockerFixture, minimal_app: FastAPI) -> None: + minimal_app.state.rabbitmq_rpc_client = AsyncMock() mocker.patch.object(_events_utils, "remove_volumes_without_backup_for_service") + mocker.patch.object(_events_utils, "force_container_cleanup") @pytest.fixture(params=[True, False]) @@ -241,8 +247,9 @@ async def test_skip_observation_cycle_after_error( mocked_dynamic_scheduler_events: ACounter, error_raised_by_saving_state: bool, use_case: UseCase, - mock_remove_calls: None, + mock_rpc_calls: None, ): + # add a task, emulate an error make sure no observation cycle is # being triggered again assert mocked_dynamic_scheduler_events.count == 0 diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py index f7423b3944c..278b386eb86 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py @@ -14,11 +14,12 @@ import pytest from aiodocker.utils import clean_filters from faker import Faker -from models_library.docker import to_simcore_runtime_docker_label_key +from models_library.docker import DockerNodeID, to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_enums import ServiceState from models_library.users import UserID +from pydantic import TypeAdapter from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict from simcore_service_director_v2.constants import ( DYNAMIC_PROXY_SERVICE_PREFIX, @@ -763,16 +764,16 @@ async def test_regression_update_service_update_out_of_sequence( @pytest.fixture -async def target_node_id(async_docker_client: aiodocker.Docker) -> str: +async def target_node_id(async_docker_client: aiodocker.Docker) -> DockerNodeID: # get a node's ID docker_nodes = await async_docker_client.nodes.list() - return docker_nodes[0]["ID"] + return TypeAdapter(DockerNodeID).validate_python(docker_nodes[0]["ID"]) async def test_constrain_service_to_node( async_docker_client: aiodocker.Docker, mock_service: str, - target_node_id: str, + target_node_id: DockerNodeID, docker_swarm: None, ): await docker_api.constrain_service_to_node( diff --git a/services/web/server/tests/unit/isolated/test_tracing.py b/services/web/server/tests/unit/isolated/test_tracing.py index c236e446ab9..c356a31053c 100644 --- a/services/web/server/tests/unit/isolated/test_tracing.py +++ b/services/web/server/tests/unit/isolated/test_tracing.py @@ -18,14 +18,13 @@ def mock_webserver_service_environment( monkeypatch: pytest.MonkeyPatch, mock_webserver_service_environment: EnvVarsDict ) -> EnvVarsDict: monkeypatch.delenv("WEBSERVER_TRACING") - envs = mock_webserver_service_environment | setenvs_from_dict( + return mock_webserver_service_environment | setenvs_from_dict( monkeypatch, { "TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT": "http://opentelemetry-collector", "TRACING_OPENTELEMETRY_COLLECTOR_PORT": "4318", }, ) - return envs def test_middleware_restrictions_opentelemetry_is_second_middleware(