From caad88dc7d8b2d9596c663230aea0c2d51af04b8 Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Wed, 14 Dec 2022 16:16:09 +0000 Subject: [PATCH 1/8] switch off zipping to speed test --- em_workflows/utils/neuroglancer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/em_workflows/utils/neuroglancer.py b/em_workflows/utils/neuroglancer.py index e49a6c58..ff3e723a 100644 --- a/em_workflows/utils/neuroglancer.py +++ b/em_workflows/utils/neuroglancer.py @@ -54,6 +54,7 @@ def gen_pyramids(fp_in: FilePath) -> Dict: "volume-to-precomputed-pyramid", "--downscaling-method=average", "--flat", + "--no-gzip", nifti.as_posix(), outdir.as_posix(), ] From 8c6ee0a5c821b65cff2fa2eb2943368c513b7ef2 Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Thu, 15 Dec 2022 17:32:31 +0000 Subject: [PATCH 2/8] Adds no_api mode - can run without API now --- em_workflows/brt/flow.py | 8 ++- em_workflows/dm_conversion/flow.py | 5 +- em_workflows/sem_tomo/flow.py | 8 ++- em_workflows/utils/utils.py | 85 +++++++++++++++++------------- test/test_brt.py | 3 +- test/test_dm.py | 3 +- test/test_sem.py | 3 +- 7 files changed, 63 insertions(+), 52 deletions(-) diff --git a/em_workflows/brt/flow.py b/em_workflows/brt/flow.py index 135b9462..c84a9366 100644 --- a/em_workflows/brt/flow.py +++ b/em_workflows/brt/flow.py @@ -364,9 +364,13 @@ def list_paired_files(fnames: List[Path]) -> List[Path]: adoc_template = Parameter("adoc_template", default="plastic_brt") input_dir = Parameter("input_dir") - callback_url = Parameter("callback_url")() - token = Parameter("token")() + callback_url = Parameter("callback_url", default=None)() + token = Parameter("token", default=None)() file_name = Parameter("file_name", default=None) + + # run workflow without an api. + no_api = Parameter("no_api", default=False)() + # a single input_dir will have n tomograms input_dir_fp = utils.get_input_dir(input_dir=input_dir) # input_dir_fp = utils.get_input_dir(input_dir=input_dir) diff --git a/em_workflows/dm_conversion/flow.py b/em_workflows/dm_conversion/flow.py index def3d5ac..4404bd39 100644 --- a/em_workflows/dm_conversion/flow.py +++ b/em_workflows/dm_conversion/flow.py @@ -218,8 +218,9 @@ def get_environment() -> str: """ input_dir = Parameter("input_dir") file_name = Parameter("file_name", default=None) - callback_url = Parameter("callback_url")() - token = Parameter("token")() + callback_url = Parameter("callback_url", default=None)() + token = Parameter("token", default=None)() + no_api = Parameter("no_api", default=None)() input_dir_fp = utils.get_input_dir(input_dir=input_dir) input_fps = utils.list_files( diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index 1b59a939..881a0364 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -1,12 +1,9 @@ from em_workflows.file_path import FilePath -import subprocess import glob import math from typing import List, Dict -from pathlib import Path from prefect import Flow, task, Parameter, unmapped from prefect.run_configs import LocalRun -from prefect.tasks.control_flow import merge from em_workflows.config import Config from em_workflows.shell_task_echo import ShellTaskEcho from em_workflows.utils import utils @@ -250,9 +247,10 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict: ) as flow: input_dir = Parameter("input_dir") file_name = Parameter("file_name", default=None) - callback_url = Parameter("callback_url")() - token = Parameter("token")() + callback_url = Parameter("callback_url", default=None)() + token = Parameter("token", default=None)() tilt_angle = Parameter("tilt_angle", default=None)() + no_api = Parameter("no_api", default=False)() # dir to read from. input_dir_fp = utils.get_input_dir(input_dir=input_dir) diff --git a/em_workflows/utils/utils.py b/em_workflows/utils/utils.py index 3902fc7f..2f473431 100644 --- a/em_workflows/utils/utils.py +++ b/em_workflows/utils/utils.py @@ -480,16 +480,19 @@ def notify_api_running(flow: Flow, old_state, new_state) -> State: tells API the workflow has started to run. """ if new_state.is_running(): - callback_url = prefect.context.parameters.get("callback_url") - token = prefect.context.parameters.get("token") - headers = { - "Authorization": "Bearer " + token, - "Content-Type": "application/json", - } - response = requests.post( - callback_url, headers=headers, data=json.dumps({"status": "running"}) - ) - log(response.text) + if prefect.context.parameters.get("no_api"): + log("no_api flag used, not interacting with API") + else: + callback_url = prefect.context.parameters.get("callback_url") + token = prefect.context.parameters.get("token") + headers = { + "Authorization": "Bearer " + token, + "Content-Type": "application/json", + } + response = requests.post( + callback_url, headers=headers, data=json.dumps({"status": "running"}) + ) + log(response.text) return new_state @@ -523,10 +526,13 @@ def custom_terminal_state_handler( else: message = "error" ns = state - response = requests.post( - callback_url, headers=headers, data=json.dumps({"status": message}) - ) - log(f"Pipeline status is:{message}, {response.text}") + if prefect.context.parameters.get("no_api"): + log(f"no_api flag used, terminal: {message}") + else: + response = requests.post( + callback_url, headers=headers, data=json.dumps({"status": message}) + ) + log(f"Pipeline status is:{message}, {response.text}") return ns @@ -541,22 +547,24 @@ def notify_api_completion(flow: Flow, old_state, new_state) -> State: """ if new_state.is_finished(): - status = "" if new_state.is_successful(): status = "success" else: status = "error" - callback_url = prefect.context.parameters.get("callback_url") - token = prefect.context.parameters.get("token") - headers = { - "Authorization": "Bearer " + token, - "Content-Type": "application/json", - } - response = requests.post( - callback_url, headers=headers, data=json.dumps({"status": status}) - ) - log(f"Pipeline status is:{status}") - log(response.text) + if prefect.context.parameters.get("no_api"): + log(f"no_api flag used, completion: {status}") + else: + callback_url = prefect.context.parameters.get("callback_url") + token = prefect.context.parameters.get("token") + headers = { + "Authorization": "Bearer " + token, + "Content-Type": "application/json", + } + response = requests.post( + callback_url, headers=headers, data=json.dumps({"status": status}) + ) + log(f"Pipeline status is:{status}") + log(response.text) return new_state @@ -748,14 +756,17 @@ def send_callback_body( } """ data = {"files": files_elts} - headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"} - response = requests.post(callback_url, headers=headers, data=json.dumps(data)) - log(response.url) - log(response.status_code) - log(json.dumps(data)) - log(response.text) - log(response.headers) - if response.status_code != 204: - msg = f"Bad response code on callback: {response}" - log(msg=msg) - raise ValueError(msg) + if prefect.context.parameters.get("no_api"): + log("no_api flag used, not interacting with API") + else: + headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"} + response = requests.post(callback_url, headers=headers, data=json.dumps(data)) + log(response.url) + log(response.status_code) + log(json.dumps(data)) + log(response.text) + log(response.headers) + if response.status_code != 204: + msg = f"Bad response code on callback: {response}" + log(msg=msg) + raise ValueError(msg) diff --git a/test/test_brt.py b/test/test_brt.py index 0d604c71..43c1bd60 100644 --- a/test/test_brt.py +++ b/test/test_brt.py @@ -78,7 +78,6 @@ def test_brt(hpc_env): LocalAlignments=0, THICKNESS=30, input_dir="test/input_files/brt_inputs/Projects/", - token="the_token", - callback_url="https://ptsv2.com/t/", + no_api=True ) assert result.is_successful() diff --git a/test/test_dm.py b/test/test_dm.py index 77a21786..6c1daa80 100644 --- a/test/test_dm.py +++ b/test/test_dm.py @@ -47,8 +47,7 @@ def test_input_fname(mock_nfs_mount): state = flow.run( input_dir="/test/input_files/dm_inputs/Projects/Lab/PI", file_name="20210525_1416_A000_G000.dm4", - token="the_token", - callback_url="https://ptsv2.com/t/", + no_api=True, ) assert state.is_successful() diff --git a/test/test_sem.py b/test/test_sem.py index 65cf378e..01cb6d4e 100644 --- a/test/test_sem.py +++ b/test/test_sem.py @@ -32,7 +32,6 @@ def test_sem(mock_nfs_mount): result = flow.run( input_dir="/test/input_files/sem_inputs/Projects/", tilt_angle="30.2", - token="the_token", - callback_url="https://ptsv2.com/t/", + no_api=True ) assert result.is_successful() From 1dfaa2dd2bc6b850d98f28a864b37b3839d2ff3d Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Thu, 15 Dec 2022 19:49:36 +0000 Subject: [PATCH 3/8] Appears adoc binning should be set as 4, not 1. Implies need to change binvol to 2. --- em_workflows/brt/flow.py | 2 +- em_workflows/templates/cryo_brt.adoc | 2 +- em_workflows/templates/plastic_brt.adoc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/em_workflows/brt/flow.py b/em_workflows/brt/flow.py index c84a9366..18d061a7 100644 --- a/em_workflows/brt/flow.py +++ b/em_workflows/brt/flow.py @@ -273,7 +273,7 @@ def gen_ave_8_vol(file_path: FilePath) -> dict: """ ave_8_mrc = f"{file_path.working_dir}/avebin8_{file_path.base}.mrc" ave_mrc = f"{file_path.working_dir}/ave_{file_path.base}.mrc" - cmd = [Config.binvol, "-binning", "8", ave_mrc, ave_8_mrc] + cmd = [Config.binvol, "-binning", "2", ave_mrc, ave_8_mrc] log_file = f"{file_path.working_dir}/ave_8_mrc.log" FilePath.run(cmd=cmd, log_file=log_file) asset_fp = file_path.copy_to_assets_dir(fp_to_cp=Path(ave_8_mrc)) diff --git a/em_workflows/templates/cryo_brt.adoc b/em_workflows/templates/cryo_brt.adoc index 29badf08..315d587c 100644 --- a/em_workflows/templates/cryo_brt.adoc +++ b/em_workflows/templates/cryo_brt.adoc @@ -94,7 +94,7 @@ runtime.Positioning.any.thickness = {{ rpa_thickness }} #Aligned Stack Choices #Aligned Stack Parameters runtime.AlignedStack.any.linearInterpolation = 0 -runtime.AlignedStack.any.binByFactor = 1 +runtime.AlignedStack.any.binByFactor = 4 comparam.golderaser.ccderaser.ExpandCircleIterations = 3 #CTF Correction Parameters diff --git a/em_workflows/templates/plastic_brt.adoc b/em_workflows/templates/plastic_brt.adoc index 41cfebf1..1c970140 100644 --- a/em_workflows/templates/plastic_brt.adoc +++ b/em_workflows/templates/plastic_brt.adoc @@ -93,7 +93,7 @@ runtime.Positioning.any.thickness = {{ rpa_thickness }} #Aligned Stack Choices #Aligned Stack Parameters runtime.AlignedStack.any.linearInterpolation = 0 -runtime.AlignedStack.any.binByFactor = 1 +runtime.AlignedStack.any.binByFactor = 4 comparam.golderaser.ccderaser.ExpandCircleIterations = 3 #CTF Correction Parameters From cb71ddcc9e3c8f9c75ce434c88090d66af6c058e Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Mon, 19 Dec 2022 16:34:26 +0000 Subject: [PATCH 4/8] Adds filter step to remove fails from callback --- em_workflows/dm_conversion/flow.py | 2 ++ em_workflows/sem_tomo/flow.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/em_workflows/dm_conversion/flow.py b/em_workflows/dm_conversion/flow.py index 4404bd39..41d17afc 100644 --- a/em_workflows/dm_conversion/flow.py +++ b/em_workflows/dm_conversion/flow.py @@ -252,6 +252,8 @@ def get_environment() -> str: callback_with_keyimgs = utils.add_asset.map( prim_fp=callback_with_thumbs, asset=keyimg_assets ) + # finally filter error states, and convert to JSON and send. + filtered_callback = utils.filter_results(callback_with_keyimgs) cp_wd_to_assets = utils.copy_workdirs.map( fps, upstream_tasks=[callback_with_keyimgs] ) diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index 881a0364..18d7db25 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -309,6 +309,8 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict: prim_fp=callback_with_pyramids, asset=corrected_mrc_assets ) + # finally filter error states, and convert to JSON and send. + filtered_callback = utils.filter_results(callback_with_corr_mrcs) cp_wd_to_assets = utils.copy_workdirs.map( fps, upstream_tasks=[callback_with_corr_mrcs] ) From 685831cdff47f562dbcbc439ecc7df9a573bca2a Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Mon, 19 Dec 2022 18:10:48 +0000 Subject: [PATCH 5/8] send filtered callbacks --- em_workflows/dm_conversion/flow.py | 2 +- em_workflows/sem_tomo/flow.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/em_workflows/dm_conversion/flow.py b/em_workflows/dm_conversion/flow.py index 41d17afc..3113ef64 100644 --- a/em_workflows/dm_conversion/flow.py +++ b/em_workflows/dm_conversion/flow.py @@ -261,5 +261,5 @@ def get_environment() -> str: callback_sent = utils.send_callback_body( token=token, callback_url=callback_url, - files_elts=callback_with_keyimgs, + files_elts=filtered_callback, ) diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index 18d7db25..4b07a15f 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -315,5 +315,5 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict: fps, upstream_tasks=[callback_with_corr_mrcs] ) cb = utils.send_callback_body( - token=token, callback_url=callback_url, files_elts=callback_with_corr_mrcs + token=token, callback_url=callback_url, files_elts=filtered_callback ) From 96efbf34a8ed291f6a059bbf98ee0c9ebcd0a729 Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Mon, 19 Dec 2022 18:44:06 +0000 Subject: [PATCH 6/8] close up edge case for no_api --- em_workflows/utils/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/em_workflows/utils/utils.py b/em_workflows/utils/utils.py index 2f473431..5713e972 100644 --- a/em_workflows/utils/utils.py +++ b/em_workflows/utils/utils.py @@ -725,9 +725,9 @@ def copy_to_assets_dir(fp: Path, assets_dir: Path, prim_fp: Path = None) -> Path @task(max_retries=3, retry_delay=datetime.timedelta(minutes=1), trigger=any_successful) def send_callback_body( - token: str, - callback_url: str, files_elts: List[Dict], + token: str = None, + callback_url: str = None, ) -> None: """ Upon completion of file conversion a callback is made to the calling @@ -758,7 +758,7 @@ def send_callback_body( data = {"files": files_elts} if prefect.context.parameters.get("no_api"): log("no_api flag used, not interacting with API") - else: + elif callback_url and token: headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"} response = requests.post(callback_url, headers=headers, data=json.dumps(data)) log(response.url) @@ -770,3 +770,5 @@ def send_callback_body( msg = f"Bad response code on callback: {response}" log(msg=msg) raise ValueError(msg) + else: + raise signals.FAIL(f"Invalid state - need callback_url and token, OR set no_api to True.") From c39d71a5436f52e0ff61c24d8fb6c8f8030ed153 Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Mon, 19 Dec 2022 18:59:12 +0000 Subject: [PATCH 7/8] close up edge case for no_api - log for now --- em_workflows/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/em_workflows/utils/utils.py b/em_workflows/utils/utils.py index 5713e972..7ce40344 100644 --- a/em_workflows/utils/utils.py +++ b/em_workflows/utils/utils.py @@ -771,4 +771,4 @@ def send_callback_body( log(msg=msg) raise ValueError(msg) else: - raise signals.FAIL(f"Invalid state - need callback_url and token, OR set no_api to True.") + log(f"Invalid state - need callback_url and token, OR set no_api to True.") From 8f40be306611eb58e803a9f1df79135906aa76b5 Mon Sep 17 00:00:00 2001 From: Philip MacMenamin Date: Mon, 19 Dec 2022 19:28:21 +0000 Subject: [PATCH 8/8] close up edge case for no_api in terminal state handler --- em_workflows/utils/utils.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/em_workflows/utils/utils.py b/em_workflows/utils/utils.py index 7ce40344..e3ded4fb 100644 --- a/em_workflows/utils/utils.py +++ b/em_workflows/utils/utils.py @@ -509,12 +509,6 @@ def custom_terminal_state_handler( for task_state in reference_task_states: if task_state.is_successful(): success = True - callback_url = prefect.context.parameters.get("callback_url") - token = prefect.context.parameters.get("token") - headers = { - "Authorization": "Bearer " + token, - "Content-Type": "application/json", - } if success: message = "success" ns = Success( @@ -527,8 +521,14 @@ def custom_terminal_state_handler( message = "error" ns = state if prefect.context.parameters.get("no_api"): - log(f"no_api flag used, terminal: {message}") + log(f"no_api flag used, terminal: success is {message}") else: + callback_url = prefect.context.parameters.get("callback_url") + token = prefect.context.parameters.get("token") + headers = { + "Authorization": "Bearer " + token, + "Content-Type": "application/json", + } response = requests.post( callback_url, headers=headers, data=json.dumps({"status": message}) ) @@ -771,4 +771,4 @@ def send_callback_body( log(msg=msg) raise ValueError(msg) else: - log(f"Invalid state - need callback_url and token, OR set no_api to True.") + raise signals.FAIL(f"Invalid state - need callback_url and token, OR set no_api to True.")