Skip to content

Commit

Permalink
Merge pull request #142 from niaid/scale_clust
Browse files Browse the repository at this point in the history
Creates Dask cluster that scales upon demand.
  • Loading branch information
philipmac authored Jan 12, 2023
2 parents 2c71afb + 28bcbc0 commit 4e27bfe
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 8 deletions.
19 changes: 18 additions & 1 deletion docs/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,25 @@ Insure `$HOME/code/hedwig` exists. Runs on Linux.
# generates venv qa.
./image_portal_workflows/helper_scripts/generate_venv.sh qa
Updating venvs:
---------------
To update your python virtual environment:
Change into the correct venv directory, e.g. `~/code/hedwig/dev/image_portal_workflows`.
Ensure environment is active and run something like:

`pip install -e . -r requirements.txt --upgrade --find-links https://github.com/niaid/tomojs-pytools/releases/tag/v1.3`


Prefect Agent:
--------------
The prefect agent, the thing that reaches out to the prefect API machine, is daemonized on HPC.
See `image_portal_workflows/helper_scripts/hedwig_reg_listen.sh` and
`image_portal_workflows/helper_scripts/hedwig_listener_prod.service` etc.


Register Workflows:
-------------------
To register a new workflow, or update an existing one (in `qa` environment):
(The workflow needs to be registered every time the source is updated.)
`image_portal_workflows/helper_scripts/hedwig_reg_listen.sh qa register`
Expand All @@ -79,7 +94,9 @@ Currently dask jobqueue is configured with a yaml file.
project: null
walltime: '10:00:00'
- Note, although unused above, BigSky also has Spack available, e.g.::
- Note, although unused above, BigSky also has Spack available.

.. code-block:: sh
$ source /gs1/apps/user/rmlspack/share/spack/setup-env.sh
$ spack load -r python@3.8.6/eg2vaag
Expand Down
2 changes: 1 addition & 1 deletion em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def gen_recon_movie(file_path: FilePath) -> dict:
"""
ffmpeg -f image2 -framerate 8 -i WORKDIR/hedwig/BASENAME_mp4.%04d.jpg -vcodec libx264 -pix_fmt yuv420p -s 1024,1024 WORKDIR/hedwig/keyMov_BASENAME.mp4
"""
# bit of a hack - want to find out if
# bit of a hack - want to find out if
test_p = Path(f"{file_path.working_dir}/{file_path.base}_mp4.1000.jpg")
mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%03d.jpg"
if test_p.exists():
Expand Down
17 changes: 16 additions & 1 deletion em_workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,22 @@


def SLURM_exec():
cluster = SLURMCluster(n_workers=4)
"""
brings up a dynamically sized cluster.
For some reason processes > 1 crash BRT. Be careful optimizing this.
"""
cluster = SLURMCluster(
name="dask-worker",
cores=16,
memory="32G",
processes=1,
death_timeout=121,
local_directory="/gs1/home/macmenaminpe/tmp/",
queue="gpu",
walltime="4:00:00",
job_extra_directives=["--gres=gpu:1"],
)
cluster.adapt(minimum=1, maximum=6)
logging = prefect.context.get("logger")
logging.debug(f"Dask cluster started")
logging.debug(f"see dashboard {cluster.dashboard_link}")
Expand Down
17 changes: 13 additions & 4 deletions em_workflows/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from prefect.tasks.control_flow.filter import FilterTask

filter_results = FilterTask(
filter_func=lambda x: not isinstance(x, (BaseException, TRIGGERFAIL, SKIP, type(None)))
filter_func=lambda x: not isinstance(
x, (BaseException, TRIGGERFAIL, SKIP, type(None))
)
)


Expand Down Expand Up @@ -672,7 +674,9 @@ def add_assets_entry(
"neuroglancerPrecomputed",
]
if asset_type not in valid_typs:
raise signals.FAIL(f"Asset type: {asset_type} is not a valid type. {valid_typs}")
raise signals.FAIL(
f"Asset type: {asset_type} is not a valid type. {valid_typs}"
)
fp_no_mount_point = path.relative_to(Config.assets_dir(env=get_environment()))
if metadata:
asset = {
Expand Down Expand Up @@ -774,7 +778,10 @@ def send_callback_body(
if prefect.context.parameters.get("no_api"):
log("no_api flag used, not interacting with API")
elif callback_url and token:
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
response = requests.post(callback_url, headers=headers, data=json.dumps(data))
log(response.url)
log(response.status_code)
Expand All @@ -786,4 +793,6 @@ def send_callback_body(
log(msg=msg)
raise signals.FAIL(msg)
else:
raise signals.FAIL(f"Invalid state - need callback_url and token, OR set no_api to True.")
raise signals.FAIL(
f"Invalid state - need callback_url and token, OR set no_api to True."
)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="em_workflows",
version='1.0.1',
version='1.0.2',
author="Philip MacMenamin",
author_email="bioinformatics@niaid.nih.gov",
description="Workflows for Microscopy related file processing.",
Expand Down

0 comments on commit 4e27bfe

Please sign in to comment.