Skip to content

Commit

Permalink
cleanup create omics workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jerowe committed Jan 22, 2024
1 parent 0680303 commit 5a59298
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 37 deletions.
6 changes: 3 additions & 3 deletions bioanalyze_omics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def create_ecr_repos(
help="Output manifest file",
default="container_image_manifest.json",
),
],
] = "container_image_manifest.json",
output_config_file: Annotated[
Optional[str], typer.Option(help="Output config file", default="omics.config")
],
] = "omics.config",
nf_workflow: Annotated[
str,
typer.Option(help="Nextflow workflow", default=os.getcwd()),
],
] = os.getcwd(),
aws_region: Annotated[
Optional[str],
typer.Option(help="AWS Region", default=AWS_REGION),
Expand Down
89 changes: 55 additions & 34 deletions bioanalyze_omics/resources/workflows.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import boto3
import tempfile
import time
import argparse
import json
Expand Down Expand Up @@ -38,80 +39,100 @@ def __init__(self, aws_region="us-east-1", client=None):
else:
self.omics_client = client

def parse_nextflow_schema(self, nextflow_dir: str) -> Dict:
def parse_nextflow_schema(self, nextflow_dir: str = os.getcwd()) -> Dict:
"""
parse the nextflow_schema.json and return parameters in the format expected by omics.create_workflow
:param nextflow_dir:
:return:
"""
omics_parameters_definitions = {}
omics_parameters_defaults = {}
# omics_parameters_defaults = {}
additional_parameters = """"""
additional_parameters = [p.strip() for p in additional_parameters.split(",")]

for param_name in additional_parameters:
if len(param_name):
omics_parameters_definitions[param_name] = {
"optional": True,
"description": param_name,
}
# for param_name in additional_parameters:
# if len(param_name):
# omics_parameters_definitions[param_name] = {
# "optional": True,
# "description": param_name,
# }
omics_parameters_definitions["omics"] = {
"optional": True,
"description": "Include omics config.",
"default": True,
# "default": True,
}

nextflow_schema = json.loads(
open(os.path.join(nextflow_dir, "nextflow_schema.json")).read()
)
ignore_keys = ['institutional_config_options']
for key in nextflow_schema["definitions"].keys():
properties = nextflow_schema["definitions"][key]["properties"]
for param_name in properties.keys():
omics_parameters_definitions[param_name] = {"optional": True}
if "description" in properties[param_name]:
description = properties[param_name]["description"]
description = description.replace("(", " ")
description = description.replace(")", " ")
description = description.replace("\n", " ")
if not len(description):
description = param_name
omics_parameters_definitions[param_name][
"description"
] = description
if "default" in properties[param_name]:
default = properties[param_name]["default"]
omics_parameters_defaults[param_name] = default

return {
"omics_parameters_definition": omics_parameters_definitions,
"omics_parameters_defaults": omics_parameters_defaults,
}
if key not in ignore_keys:
for param_name in properties.keys():
if param_name:
hidden = properties[param_name].get('hidden', False)
omics_parameters_definitions[param_name] = {"optional": True}
if "description" in properties[param_name]:
description = properties[param_name]["description"]
description = description.replace("(", " ")
description = description.replace(")", " ")
description = description.replace("\n", " ")
if not len(description):
description = param_name
omics_parameters_definitions[param_name][
"description"
] = description
else:
description = param_name
omics_parameters_definitions[param_name][
"description"
] = description
# if "default" in properties[param_name]:
# default = properties[param_name]["default"]
# omics_parameters_definitions[param_name] = default

# return {
# "omics_parameters_definition": omics_parameters_definitions,
# "omics_parameters_defaults": omics_parameters_defaults,
# }
# log.info(omics_parameters_definitions)
return omics_parameters_definitions

def create_omics_workflow(
self,
workflow_root_dir,
workflow_root_dir=os.getcwd(),
parameters={"param_name": {"description": "param_desc"}},
name=None,
description=None,
main=None,
main="main.nf",
):
if not description:
description = name
buffer = io.BytesIO()
print("creating zip file:")
ignore_dirs = ['.github', '.devcontainer', 'docs']
with ZipFile(buffer, mode="w", compression=ZIP_DEFLATED) as zf:
for file in glob.iglob(
os.path.join(workflow_root_dir, "**/*"), recursive=True
):
if os.path.isfile(file):
arcname = file.replace(os.path.join(workflow_root_dir, ""), "")
# print(f".. adding: {file} -> {arcname}")
zf.write(file, arcname=arcname)
i = False
for ignore_dir in ignore_dirs:
if ignore_dir in file:
i = True
if not i:
arcname = file.replace(os.path.join(workflow_root_dir, ""), "")
print(f".. adding: {file} -> {arcname}")
zf.write(file, arcname=arcname)

response = self.omics_client.create_workflow(
name=name,
description=description,
definitionZip=buffer.getvalue(), # this argument needs bytes
main=main,
parameterTemplate=parameters,
engine="NEXTFLOW",
)

workflow_id = response["id"]
Expand Down

0 comments on commit 5a59298

Please sign in to comment.