From a16f3128f7da439c5133601310c77ecf8a831602 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 4 Feb 2025 12:12:15 +0100 Subject: [PATCH] Revert "Revert "Attempt removing `test_failing_workflow_JobExecutionError`"" This reverts commit fc14e240b8356567dfeea86228a7094c1bc4f697. --- .../test_full_workflow_slurm_v2.py | 156 ------------------ 1 file changed, 156 deletions(-) diff --git a/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py b/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py index fddfc3e115..ff6c868b80 100644 --- a/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py +++ b/tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py @@ -1,13 +1,9 @@ import logging -import shlex -import subprocess from common_functions import failing_workflow_UnknownError from common_functions import full_workflow from common_functions import full_workflow_TaskExecutionError from common_functions import non_executable_task_command -from common_functions import PREFIX -from devtools import debug from fractal_server.app.runner.executors.slurm.sudo._subprocess_run_as_user import ( # noqa _run_command_as_user, @@ -116,158 +112,6 @@ async def test_full_workflow_TaskExecutionError_slurm( _reset_permissions_for_user_folder(project_dir) -# Tested with 'slurm' backend only. -async def test_failing_workflow_JobExecutionError( - client, - MockCurrentUser, - testdata_path, - tmp777_path, - project_factory_v2, - dataset_factory_v2, - workflow_factory_v2, - override_settings_factory, - tmp_path_factory, - fractal_tasks_mock_db, - relink_python_interpreter_v2, # before 'monkey_slurm' (#1462) - monkey_slurm, - tmp_path, -): - # Use a session-scoped FRACTAL_TASKS_DIR folder - override_settings_factory( - FRACTAL_RUNNER_BACKEND=FRACTAL_RUNNER_BACKEND, - FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / "artifacts", - FRACTAL_TASKS_DIR=tmp_path_factory.getbasetemp() / "FRACTAL_TASKS_DIR", - FRACTAL_SLURM_CONFIG_FILE=testdata_path / "slurm_config.json", - ) - - project_dir = str(tmp777_path / "user_project_dir-slurm") - user_kwargs = dict(is_verified=True) - async with MockCurrentUser( - user_kwargs=user_kwargs, - user_settings_dict=dict( - slurm_user=SLURM_USER, - slurm_accounts=[], - project_dir=project_dir, - ), - ) as user: - project = await project_factory_v2(user) - project_id = project.id - dataset = await dataset_factory_v2( - project_id=project_id, - name="dataset", - ) - dataset_id = dataset.id - - # Create workflow - workflow = await workflow_factory_v2( - name="test_wf", project_id=project_id - ) - workflow_id = workflow.id - - # Retrieve relevant task ID - task_id_0 = fractal_tasks_mock_db["create_ome_zarr_compound"].id - task_id_1 = fractal_tasks_mock_db["generic_task_parallel"].id - - # Add a short task, which will be run successfully - res = await client.post( - f"{PREFIX}/project/{project_id}/workflow/{workflow_id}/wftask/" - f"?task_id={task_id_0}", - json=dict( - args_non_parallel=dict( - image_dir="/fake-path", - num_images=3, - ) - ), - ) - assert res.status_code == 201 - wftask0_id = res.json()["id"] - - # Add a long *parallel* task, which will be stopped while running - res = await client.post( - f"{PREFIX}/project/{project_id}/workflow/{workflow_id}/wftask/" - f"?task_id={task_id_1}", - json=dict( - args_parallel=dict( - sleep_time=200, - ) - ), - ) - assert res.status_code == 201 - wftask1_id = res.json()["id"] - - # NOTE: the client.post call below is blocking, due to the way we are - # running tests. For this reason, we call the scancel function from a - # `subprocess.Popen`, so that we can make it happen during execution. - scancel_sleep_time = 12 - slurm_user = SLURM_USER - - tmp_script = (tmp_path / "script.sh").as_posix() - debug(tmp_script) - with open(tmp_script, "w") as f: - f.write(f"sleep {scancel_sleep_time}\n") - f.write( - ( - f"sudo --non-interactive -u {slurm_user} " - f"scancel -u {slurm_user} -v" - "\n" - ) - ) - - tmp_stdout = open((tmp_path / "stdout").as_posix(), "w") - tmp_stderr = open((tmp_path / "stderr").as_posix(), "w") - subprocess.Popen( - shlex.split(f"bash {tmp_script}"), - stdout=tmp_stdout, - stderr=tmp_stderr, - ) - - # Submit the workflow - res = await client.post( - f"{PREFIX}/project/{project_id}/job/submit/" - f"?{workflow_id=}&{dataset_id=}", - json={}, - ) - job_data = res.json() - debug(job_data) - assert res.status_code == 202 - job_id = job_data["id"] - debug(job_id) - - # Query status of the job - rs = await client.get(f"{PREFIX}/project/{project_id}/job/{job_id}/") - assert rs.status_code == 200 - job_status_data = rs.json() - debug(job_status_data) - print(job_status_data["log"]) - assert job_status_data["status"] == "failed" - assert job_status_data["end_timestamp"] - assert "id: None" not in job_status_data["log"] - assert "JOB ERROR" in job_status_data["log"] - assert "CANCELLED" in job_status_data["log"] - assert "\\n" not in job_status_data["log"] - - # Test get_workflowtask_status endpoint - res = await client.get( - ( - f"{PREFIX}/project/{project_id}/status/?" - f"dataset_id={dataset_id}&workflow_id={workflow_id}" - ) - ) - debug(res.status_code) - assert res.status_code == 200 - statuses = res.json()["status"] - debug(statuses) - assert statuses == { - str(wftask0_id): "done", - str(wftask1_id): "failed", - } - - tmp_stdout.close() - tmp_stderr.close() - - _reset_permissions_for_user_folder(project_dir) - - async def test_non_executable_task_command_slurm( client, MockCurrentUser,