Skip to content

Commit

Permalink
rerouted restart user services
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Dec 11, 2024
1 parent 62f0089 commit 7a74839
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,19 @@ async def stop_dynamic_service(
timeout_s=timeout_s,
)
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def restart_user_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
timeout_s: NonNegativeInt,
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("restart_user_services"),
node_id=node_id,
timeout_s=timeout_s,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ async def stop_dynamic_service(
return await scheduler_interface.stop_dynamic_service(
app, dynamic_service_stop=dynamic_service_stop
)


@router.expose()
async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
await scheduler_interface.restart_user_services(app, node_id=node_id)
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ async def list_tracked_dynamic_services(
)
return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json())

async def restart_user_services(self, *, node_id: NodeID) -> None:
await self.thin_client.post_restart(node_id=node_id)


def setup_director_v2(app: FastAPI) -> None:
public_client = DirectorV2Client(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,8 @@ async def get_dynamic_services(
"/dynamic_services",
params=as_dict_exclude_unset(user_id=user_id, project_id=project_id),
)

@retry_on_errors()
@expect_status(status.HTTP_204_NO_CONTENT)
async def post_restart(self, *, node_id: NodeID) -> Response:
return await self.client.post(f"/dynamic_services/{node_id}:restart")
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,12 @@ async def stop_dynamic_service(
)

await set_request_as_stopped(app, dynamic_service_stop)


async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
await director_v2_client.restart_user_services(node_id=node_id)
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,25 @@ async def test_stop_dynamic_service_serializes_generic_errors(
),
timeout_s=5,
)


@pytest.fixture
def mock_director_v2_restart_user_services(node_id: NodeID) -> Iterator[None]:
with respx.mock(
base_url="http://director-v2:8000/v2",
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
mock.post(f"/dynamic_services/{node_id}:restart").respond(
status.HTTP_204_NO_CONTENT
)

yield None


async def test_restart_user_services(
mock_director_v2_restart_user_services: None,
rpc_client: RabbitMQRPCClient,
node_id: NodeID,
):
await services.restart_user_services(rpc_client, node_id=node_id, timeout_s=5)
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,6 @@ async def request_retrieve_dyn_service(
)


@log_decorator(logger=_log)
async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None:
"""Restarts the user service(s) started by the the node_uuid's sidecar
NOTE: this operation will NOT restart
sidecar services (``dy-sidecar`` or ``dy-proxy`` services),
but ONLY user services (the ones defined by the compose spec).
"""
settings: DirectorV2Settings = get_plugin_settings(app)
await request_director_v2(
app,
"POST",
url=settings.base_url / f"dynamic_services/{node_uuid}:restart",
expected_status=web.HTTPOk,
timeout=settings.DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT,
)


@log_decorator(logger=_log)
async def update_dynamic_service_networks_in_project(
app: web.Application, project_id: ProjectID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from ._core_dynamic_services import (
get_project_inactivity,
request_retrieve_dyn_service,
restart_dynamic_service,
retrieve,
update_dynamic_service_networks_in_project,
)
Expand All @@ -39,7 +38,6 @@
"is_healthy",
"is_pipeline_running",
"request_retrieve_dyn_service",
"restart_dynamic_service",
"retrieve",
"set_project_run_policy",
"stop_pipeline",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ def base_url(self) -> URL:
# - Mostly in floats (aiohttp.Client/) but sometimes in ints
# - Typically in seconds but occasionally in ms

DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT: PositiveInt = Field(
1 * _MINUTE,
description="timeout of containers restart",
validation_alias=AliasChoices(
"DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT",
),
)

DIRECTOR_V2_STORAGE_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: PositiveInt = Field(
_HOUR,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,20 @@ async def stop_dynamic_services_in_project(
]

await logged_gather(*services_to_stop)


async def restart_user_services(app: web.Application, *, node_id: NodeID) -> None:
"""Restarts the user service(s) started by the the node_uuid's sidecar
NOTE: this operation will NOT restart
sidecar services (``dy-sidecar`` or ``dy-proxy`` services),
but ONLY user services (the ones defined by the compose spec).
"""
settings: DynamicSchedulerSettings = get_plugin_settings(app)
await services.restart_user_services(
get_rabbitmq_rpc_client(app),
node_id=node_id,
timeout_s=int(
settings.DYNAMIC_SCHEDULER_RESTART_DYNAMIC_SERVICE_TIMEOUT.total_seconds()
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class DynamicSchedulerSettings(BaseCustomSettings, MixinServiceSettings):
),
)

DYNAMIC_SCHEDULER_RESTART_DYNAMIC_SERVICE_TIMEOUT: datetime.timedelta = Field(
datetime.timedelta(minutes=1),
description="timeout of containers restart",
validation_alias=AliasChoices(
"DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT",
),
)


def get_plugin_settings(app: web.Application) -> DynamicSchedulerSettings:
settings = app[APP_SETTINGS_KEY].WEBSERVER_DYNAMIC_SCHEDULER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ async def restart_node(request: web.Request) -> web.Response:

path_params = parse_request_path_parameters_as(NodePathParams, request)

await director_v2_api.restart_dynamic_service(request.app, f"{path_params.node_id}")
await dynamic_scheduler_api.restart_user_services(
request.app, node_id=path_params.node_id
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)

Expand Down

0 comments on commit 7a74839

Please sign in to comment.