Skip to content

Commit

Permalink
Revert "Revert "Attempt removing `test_failing_workflow_JobExecutionE…
Browse files Browse the repository at this point in the history
…rror`""

This reverts commit fc14e24.
  • Loading branch information
tcompa committed Feb 4, 2025
1 parent fc14e24 commit a16f312
Showing 1 changed file with 0 additions and 156 deletions.
156 changes: 0 additions & 156 deletions tests/v2/08_full_workflow/test_full_workflow_slurm_v2.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import logging
import shlex
import subprocess

from common_functions import failing_workflow_UnknownError
from common_functions import full_workflow
from common_functions import full_workflow_TaskExecutionError
from common_functions import non_executable_task_command
from common_functions import PREFIX
from devtools import debug

from fractal_server.app.runner.executors.slurm.sudo._subprocess_run_as_user import ( # noqa
_run_command_as_user,
Expand Down Expand Up @@ -116,158 +112,6 @@ async def test_full_workflow_TaskExecutionError_slurm(
_reset_permissions_for_user_folder(project_dir)


# Tested with 'slurm' backend only.
async def test_failing_workflow_JobExecutionError(
client,
MockCurrentUser,
testdata_path,
tmp777_path,
project_factory_v2,
dataset_factory_v2,
workflow_factory_v2,
override_settings_factory,
tmp_path_factory,
fractal_tasks_mock_db,
relink_python_interpreter_v2, # before 'monkey_slurm' (#1462)
monkey_slurm,
tmp_path,
):
# Use a session-scoped FRACTAL_TASKS_DIR folder
override_settings_factory(
FRACTAL_RUNNER_BACKEND=FRACTAL_RUNNER_BACKEND,
FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / "artifacts",
FRACTAL_TASKS_DIR=tmp_path_factory.getbasetemp() / "FRACTAL_TASKS_DIR",
FRACTAL_SLURM_CONFIG_FILE=testdata_path / "slurm_config.json",
)

project_dir = str(tmp777_path / "user_project_dir-slurm")
user_kwargs = dict(is_verified=True)
async with MockCurrentUser(
user_kwargs=user_kwargs,
user_settings_dict=dict(
slurm_user=SLURM_USER,
slurm_accounts=[],
project_dir=project_dir,
),
) as user:
project = await project_factory_v2(user)
project_id = project.id
dataset = await dataset_factory_v2(
project_id=project_id,
name="dataset",
)
dataset_id = dataset.id

# Create workflow
workflow = await workflow_factory_v2(
name="test_wf", project_id=project_id
)
workflow_id = workflow.id

# Retrieve relevant task ID
task_id_0 = fractal_tasks_mock_db["create_ome_zarr_compound"].id
task_id_1 = fractal_tasks_mock_db["generic_task_parallel"].id

# Add a short task, which will be run successfully
res = await client.post(
f"{PREFIX}/project/{project_id}/workflow/{workflow_id}/wftask/"
f"?task_id={task_id_0}",
json=dict(
args_non_parallel=dict(
image_dir="/fake-path",
num_images=3,
)
),
)
assert res.status_code == 201
wftask0_id = res.json()["id"]

# Add a long *parallel* task, which will be stopped while running
res = await client.post(
f"{PREFIX}/project/{project_id}/workflow/{workflow_id}/wftask/"
f"?task_id={task_id_1}",
json=dict(
args_parallel=dict(
sleep_time=200,
)
),
)
assert res.status_code == 201
wftask1_id = res.json()["id"]

# NOTE: the client.post call below is blocking, due to the way we are
# running tests. For this reason, we call the scancel function from a
# `subprocess.Popen`, so that we can make it happen during execution.
scancel_sleep_time = 12
slurm_user = SLURM_USER

tmp_script = (tmp_path / "script.sh").as_posix()
debug(tmp_script)
with open(tmp_script, "w") as f:
f.write(f"sleep {scancel_sleep_time}\n")
f.write(
(
f"sudo --non-interactive -u {slurm_user} "
f"scancel -u {slurm_user} -v"
"\n"
)
)

tmp_stdout = open((tmp_path / "stdout").as_posix(), "w")
tmp_stderr = open((tmp_path / "stderr").as_posix(), "w")
subprocess.Popen(
shlex.split(f"bash {tmp_script}"),
stdout=tmp_stdout,
stderr=tmp_stderr,
)

# Submit the workflow
res = await client.post(
f"{PREFIX}/project/{project_id}/job/submit/"
f"?{workflow_id=}&{dataset_id=}",
json={},
)
job_data = res.json()
debug(job_data)
assert res.status_code == 202
job_id = job_data["id"]
debug(job_id)

# Query status of the job
rs = await client.get(f"{PREFIX}/project/{project_id}/job/{job_id}/")
assert rs.status_code == 200
job_status_data = rs.json()
debug(job_status_data)
print(job_status_data["log"])
assert job_status_data["status"] == "failed"
assert job_status_data["end_timestamp"]
assert "id: None" not in job_status_data["log"]
assert "JOB ERROR" in job_status_data["log"]
assert "CANCELLED" in job_status_data["log"]
assert "\\n" not in job_status_data["log"]

# Test get_workflowtask_status endpoint
res = await client.get(
(
f"{PREFIX}/project/{project_id}/status/?"
f"dataset_id={dataset_id}&workflow_id={workflow_id}"
)
)
debug(res.status_code)
assert res.status_code == 200
statuses = res.json()["status"]
debug(statuses)
assert statuses == {
str(wftask0_id): "done",
str(wftask1_id): "failed",
}

tmp_stdout.close()
tmp_stderr.close()

_reset_permissions_for_user_folder(project_dir)


async def test_non_executable_task_command_slurm(
client,
MockCurrentUser,
Expand Down

0 comments on commit a16f312

Please sign in to comment.