From 9e9ca757b34b45f3a607ac3cb6a2b5fb00838639 Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Tue, 9 Jul 2024 16:15:56 +0100 Subject: [PATCH 01/10] feat: template for creating finemapping jobs --- .../dags/ukb_ppp_finemapping.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/ot_orchestration/dags/ukb_ppp_finemapping.py diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py new file mode 100644 index 0000000..3bf774b --- /dev/null +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -0,0 +1,63 @@ +"""Airflow DAG that uses Google Cloud Batch to run the SuSie Finemapper step for UKB PPP.""" + +from __future__ import annotations + +import time +from pathlib import Path + +import common_airflow as common +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.cloud_batch import ( + CloudBatchSubmitJobOperator, +) +from templates.finemapping import finemapping_batch_job + +STUDY_LOCUS_BASE_PATH = ( + "gs://gentropy-tmp/tskir/ukb_ppp_eur_data_collected_patched_2024_07_09" +) +OUTPUT_BASE_PATH = "gs://gentropy-tmp/tskir/finemapping_2024_07_29" +STUDY_INDEX_PATH = "gs://ukb_ppp_eur_data/study_index" +OUTPUT_PATH = "gs://gentropy-tmp/test_finemapped_out" + +# Temporary: defining 10 loci in order to test the DAG. +LOCI = [ + "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=-4875420583494530062/part-00132-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=2449331910204577420/part-00058-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=2882125006476788651/part-00077-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=5149163189737967785/part-00069-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=7530607523033270690/part-00045-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=7817416827048695229/part-00095-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=-3603332164695210634/part-00061-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=-3727530566487910400/part-00029-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=-771229199423266821/part-00067-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", + "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=7906770497215611142/part-00074-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", +] +INPUT_PATHS = [f"{STUDY_LOCUS_BASE_PATH}/{L}" for L in LOCI] +OUTPUT_PATHS = [f"{OUTPUT_BASE_PATH}/{L}" for L in LOCI] + +@task(task_id="finemapping_task") +def finemapping_task() -> CloudBatchSubmitJobOperator: + """Submit a Batch job to run fine-mapping on a list of study loci.""" + return CloudBatchSubmitJobOperator( + task_id="finemapping_batch_job", + project_id=common.GCP_PROJECT, + region=common.GCP_REGION, + job_name=f"finemapping-job-{time.strftime('%Y%m%d-%H%M%S')}", + job=finemapping_batch_job( + study_locus_paths = [], + output_paths = [], + study_index_path = STUDY_INDEX_PATH + ), + deferrable=False + ) + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — finemap study loci with SuSie", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +) as dag: + ( + finemapping_task() + ) From 0a24cfc6be009d0cedf7bfd5f2fc81408efc2b6b Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Tue, 9 Jul 2024 16:17:08 +0100 Subject: [PATCH 02/10] feat: example DAG for creating finemapping jobs --- src/ot_orchestration/templates/finemapping.py | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 src/ot_orchestration/templates/finemapping.py diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py new file mode 100644 index 0000000..8c5be51 --- /dev/null +++ b/src/ot_orchestration/templates/finemapping.py @@ -0,0 +1,124 @@ +from google.cloud.batch_v1 import ( + AllocationPolicy, + ComputeResource, + Environment, + LifecyclePolicy, + LogsPolicy, + TaskSpec, + TaskGroup, + Runnable, + Job, +) + +import common_airflow as common + + +def finemapping_batch_job( + study_locus_paths: list[str], + output_paths: list[str], + study_index_path: str, + docker_image_url: str = common.GENTROPY_DOCKER_IMAGE, +) -> Job: + """Create a Batch job to run fine-mapping on a list of study loci. + + Args: + study_locus_paths (list[str]): The list of study loci (full gs:// paths) to fine-map. + study_index_path (str): The path to the study index. + output_path (str): The path to store the output. + output_path_log (str): The path to store the finemapping logs. + docker_image_url (str): The URL of the Docker image to use for the job. By default, use a project wide image. + + Returns: + Job: A Batch job to run fine-mapping on the given study loci. + """ + # Check that the input parameters make sense. + assert len(study_locus_paths) == len( + output_paths + ), "The length of study_locus_paths and output_paths must be the same." + + # Define runnable: container and parameters to use. + runnable = Runnable( + container=Runnable.Container( + image_url=docker_image_url, + entrypoint="/bin/sh", + commands=[ + "-c", + ( + "poetry run gentropy " + "step=susie_finemapping " + 'step.study_locus_to_finemap="$INPUTPATH" ' + 'step.output_path="$OUTPUTPATH" ' + f"step.study_index_path={study_index_path} " + "step.max_causal_snps=10 " + "step.primary_signal_pval_threshold=1 " + "step.secondary_signal_pval_threshold=1 " + "step.purity_mean_r2_threshold=0 " + "step.purity_min_r2_threshold=0 " + "step.cs_lbf_thr=2 step.sum_pips=0.99 " + "step.susie_est_tausq=False " + "step.run_carma=False " + "step.run_sumstat_imputation=False " + "step.carma_time_limit=600 " + "step.imputed_r2_threshold=0.9 " + "step.ld_score_threshold=5 " + "+step.session.extended_spark_conf={spark.jars:https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar} " + "+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:false} " + "+step.session.extended_spark_conf={spark.driver.memory:30g} " + "+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:500m} " + "+step.session.extended_spark_conf={spark.driver.maxResultSize:5g} " + "step.session.write_mode=overwrite" + ), + ], + options="-e HYDRA_FULL_ERROR=1", + ) + ) + + # Define task spec: runnable, compute resources, retry and lifecycle policies; shared between all tasks. + task_spec = TaskSpec( + runnables=[runnable], + resources=ComputeResource(cpu_milli=4000, memory_mib=25000), + max_run_duration="7200s", + max_retry_count=5, + lifecycle_policies=[ + LifecyclePolicy( + action=LifecyclePolicy.Action.FAIL_TASK, + action_condition=LifecyclePolicy.ActionCondition( + exit_codes=[50005] # Execution time exceeded. + ), + ) + ], + ) + + # Define task environments: individual configuration for each of the tasks. + task_environments = [ + Environment(variables={"INPUTPATH": input_path, "OUTPUTPATH": output_path}) + for input_path, output_path in zip(study_locus_paths, output_paths) + ] + + # Define task group: collection of parameterised tasks. + task_group = TaskGroup( + task_spec=task_spec, + task_environments=task_environments, + parallelism=2000, + task_count=len(study_locus_paths), + ) + + # Define allocation policy: method of mapping a task group to compute resources. + allocation_policy = AllocationPolicy( + instances=[ + AllocationPolicy.InstancePolicyOrTemplate( + policy=AllocationPolicy.InstancePolicy( + machine_type="n2-highmem-4", + provisioning_model=AllocationPolicy.ProvisioningModel.SPOT, + boot_disk=AllocationPolicy.Disk(size_gb=60), + ) + ) + ] + ) + + # Define and return job: a complete description of the workload, ready to be submitted to Google Batch. + return Job( + task_groups=[task_group], + allocation_policy=allocation_policy, + logs_policy=LogsPolicy(destination=LogsPolicy.Destination.CLOUD_LOGGING), + ) From c64baa2fa4508bacbfeba4a40534b12181b1fe9a Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Tue, 9 Jul 2024 17:03:49 +0100 Subject: [PATCH 03/10] fix: quote parameters containing = for Hydra --- src/ot_orchestration/templates/finemapping.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index 8c5be51..44dc0b9 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -91,7 +91,12 @@ def finemapping_batch_job( # Define task environments: individual configuration for each of the tasks. task_environments = [ - Environment(variables={"INPUTPATH": input_path, "OUTPUTPATH": output_path}) + Environment( + variables={ + "INPUTPATH": '"' + input_path + '"', + "OUTPUTPATH": '"' + output_path + '"', + } + ) for input_path, output_path in zip(study_locus_paths, output_paths) ] From 6662014360450c441c6e43b042b2e93a85f08cee Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 10:17:26 +0100 Subject: [PATCH 04/10] chore: add GENTROPY_DOCKER_IMAGE to common layer --- src/ot_orchestration/dags/ukb_ppp_finemapping.py | 5 +++-- src/ot_orchestration/templates/finemapping.py | 14 +++++++------- src/ot_orchestration/utils/common.py | 5 +++++ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py index 3bf774b..23eaf5a 100644 --- a/src/ot_orchestration/dags/ukb_ppp_finemapping.py +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -5,13 +5,14 @@ import time from pathlib import Path -import common_airflow as common from airflow.decorators import task from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.cloud_batch import ( CloudBatchSubmitJobOperator, ) -from templates.finemapping import finemapping_batch_job + +from ot_orchestration.templates.finemapping import finemapping_batch_job +from ot_orchestration.utils import common STUDY_LOCUS_BASE_PATH = ( "gs://gentropy-tmp/tskir/ukb_ppp_eur_data_collected_patched_2024_07_09" diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index 44dc0b9..2ab4bc3 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -1,16 +1,17 @@ +"""A reusable template for finemapping jobs.""" + from google.cloud.batch_v1 import ( AllocationPolicy, ComputeResource, Environment, + Job, LifecyclePolicy, LogsPolicy, - TaskSpec, - TaskGroup, Runnable, - Job, + TaskGroup, + TaskSpec, ) - -import common_airflow as common +from ot_orchestration.utils import common def finemapping_batch_job( @@ -23,9 +24,8 @@ def finemapping_batch_job( Args: study_locus_paths (list[str]): The list of study loci (full gs:// paths) to fine-map. + output_paths (list[str]): The list of output locations, corresponding to study locus paths. study_index_path (str): The path to the study index. - output_path (str): The path to store the output. - output_path_log (str): The path to store the finemapping logs. docker_image_url (str): The URL of the Docker image to use for the job. By default, use a project wide image. Returns: diff --git a/src/ot_orchestration/utils/common.py b/src/ot_orchestration/utils/common.py index 76f66a6..421b259 100644 --- a/src/ot_orchestration/utils/common.py +++ b/src/ot_orchestration/utils/common.py @@ -16,6 +16,11 @@ GCP_DATAPROC_IMAGE = "2.1" GCP_AUTOSCALING_POLICY = "otg-etl" +# Image configuration. +GENTROPY_DOCKER_IMAGE = ( + "europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/gentropy:dev" +) + # Cluster init configuration. INITIALISATION_BASE_PATH = ( f"gs://genetics_etl_python_playground/initialisation/{GENTROPY_VERSION}" From 260ab6a60896a988561ef5961ad031d420b206ea Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 10:28:23 +0100 Subject: [PATCH 05/10] feat: always use a list of jobs in the DAG --- .../dags/ukb_ppp_finemapping.py | 33 ++++++++++--------- src/ot_orchestration/templates/finemapping.py | 12 +++---- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py index 23eaf5a..1df160f 100644 --- a/src/ot_orchestration/dags/ukb_ppp_finemapping.py +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -11,7 +11,7 @@ CloudBatchSubmitJobOperator, ) -from ot_orchestration.templates.finemapping import finemapping_batch_job +from ot_orchestration.templates.finemapping import finemapping_batch_jobs from ot_orchestration.utils import common STUDY_LOCUS_BASE_PATH = ( @@ -38,20 +38,23 @@ OUTPUT_PATHS = [f"{OUTPUT_BASE_PATH}/{L}" for L in LOCI] @task(task_id="finemapping_task") -def finemapping_task() -> CloudBatchSubmitJobOperator: - """Submit a Batch job to run fine-mapping on a list of study loci.""" - return CloudBatchSubmitJobOperator( - task_id="finemapping_batch_job", - project_id=common.GCP_PROJECT, - region=common.GCP_REGION, - job_name=f"finemapping-job-{time.strftime('%Y%m%d-%H%M%S')}", - job=finemapping_batch_job( - study_locus_paths = [], - output_paths = [], +def finemapping_tasks() -> list[CloudBatchSubmitJobOperator]: + """Generate a list of Batch job operators to submit finemapping processing.""" + return [ + CloudBatchSubmitJobOperator( + task_id="finemapping_batch_job_{i}", + project_id=common.GCP_PROJECT, + region=common.GCP_REGION, + job_name=f"finemapping-job-{i}-{time.strftime('%Y%m%d-%H%M%S')}", + job=batch_job, + deferrable=False + ) + for i, batch_job in enumerate(finemapping_batch_jobs( + study_locus_paths = INPUT_PATHS, + output_paths = OUTPUT_PATHS, study_index_path = STUDY_INDEX_PATH - ), - deferrable=False - ) + )) + ] with DAG( dag_id=Path(__file__).stem, @@ -60,5 +63,5 @@ def finemapping_task() -> CloudBatchSubmitJobOperator: **common.shared_dag_kwargs, ) as dag: ( - finemapping_task() + finemapping_tasks() ) diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index 2ab4bc3..66ff731 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -14,13 +14,13 @@ from ot_orchestration.utils import common -def finemapping_batch_job( +def finemapping_batch_jobs( study_locus_paths: list[str], output_paths: list[str], study_index_path: str, docker_image_url: str = common.GENTROPY_DOCKER_IMAGE, -) -> Job: - """Create a Batch job to run fine-mapping on a list of study loci. +) -> list[Job]: + """Create a list of Batch jobs to run fine-mapping based on a list of study loci. Args: study_locus_paths (list[str]): The list of study loci (full gs:// paths) to fine-map. @@ -29,7 +29,7 @@ def finemapping_batch_job( docker_image_url (str): The URL of the Docker image to use for the job. By default, use a project wide image. Returns: - Job: A Batch job to run fine-mapping on the given study loci. + list[Job]: A Batch job to run fine-mapping on the given study loci. """ # Check that the input parameters make sense. assert len(study_locus_paths) == len( @@ -122,8 +122,8 @@ def finemapping_batch_job( ) # Define and return job: a complete description of the workload, ready to be submitted to Google Batch. - return Job( + return [Job( task_groups=[task_group], allocation_policy=allocation_policy, logs_policy=LogsPolicy(destination=LogsPolicy.Destination.CLOUD_LOGGING), - ) + )] From 4943cc9ebd95b79538760db7b38d7762a7c9b068 Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 10:38:54 +0100 Subject: [PATCH 06/10] refactor: use manifest as input --- src/ot_orchestration/templates/finemapping.py | 44 ++++++------------- 1 file changed, 13 insertions(+), 31 deletions(-) diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index 66ff731..0e8a013 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -3,7 +3,6 @@ from google.cloud.batch_v1 import ( AllocationPolicy, ComputeResource, - Environment, Job, LifecyclePolicy, LogsPolicy, @@ -14,28 +13,23 @@ from ot_orchestration.utils import common -def finemapping_batch_jobs( - study_locus_paths: list[str], - output_paths: list[str], +def finemapping_batch_job( study_index_path: str, + study_locus_manifest_path: str, + task_count: int, docker_image_url: str = common.GENTROPY_DOCKER_IMAGE, -) -> list[Job]: - """Create a list of Batch jobs to run fine-mapping based on a list of study loci. +) -> Job: + """Create a Batch job to run fine-mapping based on an input-output manifest. Args: - study_locus_paths (list[str]): The list of study loci (full gs:// paths) to fine-map. - output_paths (list[str]): The list of output locations, corresponding to study locus paths. study_index_path (str): The path to the study index. + study_locus_manifest_path (str): Path to the CSV manifest containing all study locus input and output locations. Should contain two columns: study_locus_input and study_locus_output + task_count (int): Total number of tasks in a job to run. docker_image_url (str): The URL of the Docker image to use for the job. By default, use a project wide image. Returns: - list[Job]: A Batch job to run fine-mapping on the given study loci. + Job: A Batch job to run fine-mapping on the given study loci. """ - # Check that the input parameters make sense. - assert len(study_locus_paths) == len( - output_paths - ), "The length of study_locus_paths and output_paths must be the same." - # Define runnable: container and parameters to use. runnable = Runnable( container=Runnable.Container( @@ -46,9 +40,9 @@ def finemapping_batch_jobs( ( "poetry run gentropy " "step=susie_finemapping " - 'step.study_locus_to_finemap="$INPUTPATH" ' - 'step.output_path="$OUTPUTPATH" ' f"step.study_index_path={study_index_path} " + f"step.study_locus_manifest_path={study_locus_manifest_path} " + "step.study_locus_index=$BATCH_TASK_INDEX " "step.max_causal_snps=10 " "step.primary_signal_pval_threshold=1 " "step.secondary_signal_pval_threshold=1 " @@ -89,23 +83,11 @@ def finemapping_batch_jobs( ], ) - # Define task environments: individual configuration for each of the tasks. - task_environments = [ - Environment( - variables={ - "INPUTPATH": '"' + input_path + '"', - "OUTPUTPATH": '"' + output_path + '"', - } - ) - for input_path, output_path in zip(study_locus_paths, output_paths) - ] - # Define task group: collection of parameterised tasks. task_group = TaskGroup( task_spec=task_spec, - task_environments=task_environments, parallelism=2000, - task_count=len(study_locus_paths), + task_count=task_count, ) # Define allocation policy: method of mapping a task group to compute resources. @@ -122,8 +104,8 @@ def finemapping_batch_jobs( ) # Define and return job: a complete description of the workload, ready to be submitted to Google Batch. - return [Job( + return Job( task_groups=[task_group], allocation_policy=allocation_policy, logs_policy=LogsPolicy(destination=LogsPolicy.Destination.CLOUD_LOGGING), - )] + ) From 300425b0162030ebf75bfc3599f25850e61f2d2d Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 11:27:50 +0100 Subject: [PATCH 07/10] feat: implement generate_manifests_for_finemapping --- .../dags/ukb_ppp_finemapping.py | 4 +- src/ot_orchestration/templates/finemapping.py | 79 +++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py index 1df160f..a7012f1 100644 --- a/src/ot_orchestration/dags/ukb_ppp_finemapping.py +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -11,7 +11,7 @@ CloudBatchSubmitJobOperator, ) -from ot_orchestration.templates.finemapping import finemapping_batch_jobs +from ot_orchestration.templates.finemapping import finemapping_batch_job from ot_orchestration.utils import common STUDY_LOCUS_BASE_PATH = ( @@ -49,7 +49,7 @@ def finemapping_tasks() -> list[CloudBatchSubmitJobOperator]: job=batch_job, deferrable=False ) - for i, batch_job in enumerate(finemapping_batch_jobs( + for i, batch_job in enumerate(finemapping_batch_job( study_locus_paths = INPUT_PATHS, output_paths = OUTPUT_PATHS, study_index_path = STUDY_INDEX_PATH diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index 0e8a013..ddef22f 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -1,5 +1,7 @@ """A reusable template for finemapping jobs.""" + +from google.cloud import storage from google.cloud.batch_v1 import ( AllocationPolicy, ComputeResource, @@ -109,3 +111,80 @@ def finemapping_batch_job( allocation_policy=allocation_policy, logs_policy=LogsPolicy(destination=LogsPolicy.Destination.CLOUD_LOGGING), ) + + +def upload_strings_to_gcs(strings_list: list[str], csv_upload_path: str) -> None: + """Upload a list of strings directly to Google Cloud Storage as a single blob. + + Args: + strings_list (List[str]): The list of strings to be uploaded. + csv_upload_path (str): The full Google Storage path (gs://bucket_name/path/to/file.csv) where the data will be uploaded. + + Returns: + None + """ + # Join the list of strings with newlines to form the content. + content = "\n".join(strings_list) + + # Extract bucket and path from csv_upload_path (format: gs://bucket_name/path/to/file.csv). + bucket_name, file_path = csv_upload_path.replace("gs://", "").split("/", 1) + + # Initialise the Google Cloud Storage client. + client = storage.Client() + bucket = client.get_bucket(bucket_name) + blob = bucket.blob(file_path) + + # Upload the joined content directly. + blob.upload_from_string(content, content_type="text/plain") + + +def generate_manifests_for_finemapping( + collected_loci: str, + manifest_prefix: str, + output_path: str, + max_records_per_chunk: int = 100_000, +) -> list[(str, int)]: + """Starting from collected_loci, generate manifests for finemapping, splitting in chunks of at most 100,000 records. + + Args: + collected_loci (str): Google Storage path for collected loci. + manifest_prefix (str): Google Storage path prefix for uploading the manifests. + output_path (str): Google Storage path to store the finemapping results. + max_records_per_chunk (int): Maximum number of records per one chunk. Defaults to 100,000, which is the maximum number of tasks per job that Google Batch supports. + + Return: + list[(str, int)]: List of tuples, where the first value is a path to manifest, and second is the number of records in that manifest. + """ + # Get list of loci from the input Google Storage path. + client = storage.Client() + bucket_name, prefix = collected_loci.replace("gs://", "").split("/", 1) + bucket = client.get_bucket(bucket_name) + blobs = bucket.list_blobs(prefix=prefix) + all_loci = [ + blob.name[:-1] + for blob in blobs + if "studyLocusId" in blob.name and blob.name.endswith("/") + ] + + # Generate full list of input-output file paths. + inputs_outputs = [ + f"{collected_loci}/{locus},{output_path}/{locus}" for locus in all_loci + ] + + # Split into chunks of max size, as specified. + split_inputs_outputs = [ + inputs_outputs[i : i + max_records_per_chunk] + for i in range(0, len(inputs_outputs), max_records_per_chunk) + ] + + # Generate and upload manifests. + all_manifests = [] + for i, chunk in enumerate(split_inputs_outputs): + lines = ["study_locus_input,study_locus_output"] + chunk + manifest_path = f"{manifest_prefix}/chunk_{i}" + upload_strings_to_gcs(lines, manifest_path) + all_manifests.append( + (manifest_path, len(chunk)), + ) + + return all_manifests From c877191fe983effc71249ae30be02d07d90fd054 Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 11:39:14 +0100 Subject: [PATCH 08/10] refactor: rewrite the DAG to use new functions --- .../dags/ukb_ppp_finemapping.py | 67 +++++++------------ 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py index a7012f1..45cc088 100644 --- a/src/ot_orchestration/dags/ukb_ppp_finemapping.py +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -5,56 +5,24 @@ import time from pathlib import Path -from airflow.decorators import task from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.cloud_batch import ( CloudBatchSubmitJobOperator, ) -from ot_orchestration.templates.finemapping import finemapping_batch_job +from ot_orchestration.templates.finemapping import ( + finemapping_batch_job, + generate_manifests_for_finemapping, +) from ot_orchestration.utils import common -STUDY_LOCUS_BASE_PATH = ( - "gs://gentropy-tmp/tskir/ukb_ppp_eur_data_collected_patched_2024_07_09" +COLLECTED_LOCI = ( + "gs://genetics-portal-dev-analysis/dc16/output/ukb_ppp/clean_loci.parquet" ) -OUTPUT_BASE_PATH = "gs://gentropy-tmp/tskir/finemapping_2024_07_29" +MANIFEST_PREFIX = "gs://gentropy-tmp/ukb/manifest" +OUTPUT_BASE_PATH = "gs://gentropy-tmp/ukb/output" STUDY_INDEX_PATH = "gs://ukb_ppp_eur_data/study_index" -OUTPUT_PATH = "gs://gentropy-tmp/test_finemapped_out" - -# Temporary: defining 10 loci in order to test the DAG. -LOCI = [ - "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=-4875420583494530062/part-00132-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=2449331910204577420/part-00058-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=2882125006476788651/part-00077-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=5149163189737967785/part-00069-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=7530607523033270690/part-00045-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_A1BG_P04217_OID30771_v1/studyLocusId=7817416827048695229/part-00095-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=-3603332164695210634/part-00061-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=-3727530566487910400/part-00029-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=-771229199423266821/part-00067-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", - "studyId=UKB_PPP_EUR_AAMDC_Q9H7C9_OID30236_v1/studyLocusId=7906770497215611142/part-00074-a492c618-2414-44cc-81bb-853f3cf40ce7.c000.snappy.parquet", -] -INPUT_PATHS = [f"{STUDY_LOCUS_BASE_PATH}/{L}" for L in LOCI] -OUTPUT_PATHS = [f"{OUTPUT_BASE_PATH}/{L}" for L in LOCI] -@task(task_id="finemapping_task") -def finemapping_tasks() -> list[CloudBatchSubmitJobOperator]: - """Generate a list of Batch job operators to submit finemapping processing.""" - return [ - CloudBatchSubmitJobOperator( - task_id="finemapping_batch_job_{i}", - project_id=common.GCP_PROJECT, - region=common.GCP_REGION, - job_name=f"finemapping-job-{i}-{time.strftime('%Y%m%d-%H%M%S')}", - job=batch_job, - deferrable=False - ) - for i, batch_job in enumerate(finemapping_batch_job( - study_locus_paths = INPUT_PATHS, - output_paths = OUTPUT_PATHS, - study_index_path = STUDY_INDEX_PATH - )) - ] with DAG( dag_id=Path(__file__).stem, @@ -62,6 +30,21 @@ def finemapping_tasks() -> list[CloudBatchSubmitJobOperator]: default_args=common.shared_dag_args, **common.shared_dag_kwargs, ) as dag: - ( - finemapping_tasks() + manifests = generate_manifests_for_finemapping( + collected_loci=COLLECTED_LOCI, + manifest_prefix=MANIFEST_PREFIX, + output_path=OUTPUT_BASE_PATH, ) + for i, (manifest_path, num_of_tasks) in enumerate(manifests): + task = CloudBatchSubmitJobOperator( + task_id="finemapping_batch_job_{i}", + project_id=common.GCP_PROJECT, + region=common.GCP_REGION, + job_name=f"finemapping-job-{i}-{time.strftime('%Y%m%d-%H%M%S')}", + job=finemapping_batch_job( + study_index_path=STUDY_INDEX_PATH, + study_locus_manifest_path=manifest_path, + task_count=num_of_tasks, + ), + deferrable=False + ) From a7ff30a08ec936a76fab89c04242d8e5fa0d92b8 Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 12:10:30 +0100 Subject: [PATCH 09/10] fix: import errors in DAG --- src/ot_orchestration/dags/ukb_ppp_finemapping.py | 4 ++-- src/ot_orchestration/templates/finemapping.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py index 45cc088..1c92719 100644 --- a/src/ot_orchestration/dags/ukb_ppp_finemapping.py +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -37,7 +37,7 @@ ) for i, (manifest_path, num_of_tasks) in enumerate(manifests): task = CloudBatchSubmitJobOperator( - task_id="finemapping_batch_job_{i}", + task_id=f"finemapping_batch_job_{i}", project_id=common.GCP_PROJECT, region=common.GCP_REGION, job_name=f"finemapping-job-{i}-{time.strftime('%Y%m%d-%H%M%S')}", @@ -46,5 +46,5 @@ study_locus_manifest_path=manifest_path, task_count=num_of_tasks, ), - deferrable=False + deferrable=False, ) diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index ddef22f..20e8192 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -1,6 +1,5 @@ """A reusable template for finemapping jobs.""" - from google.cloud import storage from google.cloud.batch_v1 import ( AllocationPolicy, @@ -35,7 +34,7 @@ def finemapping_batch_job( # Define runnable: container and parameters to use. runnable = Runnable( container=Runnable.Container( - image_url=docker_image_url, + image_uri=docker_image_url, entrypoint="/bin/sh", commands=[ "-c", @@ -72,7 +71,7 @@ def finemapping_batch_job( # Define task spec: runnable, compute resources, retry and lifecycle policies; shared between all tasks. task_spec = TaskSpec( runnables=[runnable], - resources=ComputeResource(cpu_milli=4000, memory_mib=25000), + compute_resource=ComputeResource(cpu_milli=4000, memory_mib=25000), max_run_duration="7200s", max_retry_count=5, lifecycle_policies=[ From 080cc786e1858a5760f44d8fe64ca75763a03b39 Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Wed, 18 Sep 2024 15:24:32 +0100 Subject: [PATCH 10/10] fix: multiple fixes following test runs --- .../dags/ukb_ppp_finemapping.py | 38 ++++++++----------- src/ot_orchestration/templates/finemapping.py | 31 +++++++++++++-- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/src/ot_orchestration/dags/ukb_ppp_finemapping.py b/src/ot_orchestration/dags/ukb_ppp_finemapping.py index 1c92719..36e1d9b 100644 --- a/src/ot_orchestration/dags/ukb_ppp_finemapping.py +++ b/src/ot_orchestration/dags/ukb_ppp_finemapping.py @@ -2,16 +2,13 @@ from __future__ import annotations -import time from pathlib import Path +from airflow.decorators import task from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.cloud_batch import ( - CloudBatchSubmitJobOperator, -) from ot_orchestration.templates.finemapping import ( - finemapping_batch_job, + FinemappingBatchOperator, generate_manifests_for_finemapping, ) from ot_orchestration.utils import common @@ -24,27 +21,24 @@ STUDY_INDEX_PATH = "gs://ukb_ppp_eur_data/study_index" +@task +def generate_manifests(): + return generate_manifests_for_finemapping( + collected_loci=COLLECTED_LOCI, + manifest_prefix=MANIFEST_PREFIX, + output_path=OUTPUT_BASE_PATH, + max_records_per_chunk=100_000, + ) + + with DAG( dag_id=Path(__file__).stem, description="Open Targets Genetics — finemap study loci with SuSie", default_args=common.shared_dag_args, **common.shared_dag_kwargs, ) as dag: - manifests = generate_manifests_for_finemapping( - collected_loci=COLLECTED_LOCI, - manifest_prefix=MANIFEST_PREFIX, - output_path=OUTPUT_BASE_PATH, + ( + FinemappingBatchOperator.partial( + task_id="finemapping_batch_job", study_index_path=STUDY_INDEX_PATH + ).expand(manifest=generate_manifests()) ) - for i, (manifest_path, num_of_tasks) in enumerate(manifests): - task = CloudBatchSubmitJobOperator( - task_id=f"finemapping_batch_job_{i}", - project_id=common.GCP_PROJECT, - region=common.GCP_REGION, - job_name=f"finemapping-job-{i}-{time.strftime('%Y%m%d-%H%M%S')}", - job=finemapping_batch_job( - study_index_path=STUDY_INDEX_PATH, - study_locus_manifest_path=manifest_path, - task_count=num_of_tasks, - ), - deferrable=False, - ) diff --git a/src/ot_orchestration/templates/finemapping.py b/src/ot_orchestration/templates/finemapping.py index 20e8192..5e24b6d 100644 --- a/src/ot_orchestration/templates/finemapping.py +++ b/src/ot_orchestration/templates/finemapping.py @@ -1,5 +1,10 @@ """A reusable template for finemapping jobs.""" +import time + +from airflow.providers.google.cloud.operators.cloud_batch import ( + CloudBatchSubmitJobOperator, +) from google.cloud import storage from google.cloud.batch_v1 import ( AllocationPolicy, @@ -56,6 +61,7 @@ def finemapping_batch_job( "step.carma_time_limit=600 " "step.imputed_r2_threshold=0.9 " "step.ld_score_threshold=5 " + "step.carma_tau=0.15 " "+step.session.extended_spark_conf={spark.jars:https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar} " "+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:false} " "+step.session.extended_spark_conf={spark.driver.memory:30g} " @@ -142,7 +148,7 @@ def generate_manifests_for_finemapping( manifest_prefix: str, output_path: str, max_records_per_chunk: int = 100_000, -) -> list[(str, int)]: +) -> list[(int, str, int)]: """Starting from collected_loci, generate manifests for finemapping, splitting in chunks of at most 100,000 records. Args: @@ -152,7 +158,7 @@ def generate_manifests_for_finemapping( max_records_per_chunk (int): Maximum number of records per one chunk. Defaults to 100,000, which is the maximum number of tasks per job that Google Batch supports. Return: - list[(str, int)]: List of tuples, where the first value is a path to manifest, and second is the number of records in that manifest. + list[(int, str, int)]: List of tuples, where the first value is index of the manifest, the second value is a path to manifest, and the third is the number of records in that manifest. """ # Get list of loci from the input Google Storage path. client = storage.Client() @@ -160,7 +166,7 @@ def generate_manifests_for_finemapping( bucket = client.get_bucket(bucket_name) blobs = bucket.list_blobs(prefix=prefix) all_loci = [ - blob.name[:-1] + blob.name[:-1].split("/")[-1] for blob in blobs if "studyLocusId" in blob.name and blob.name.endswith("/") ] @@ -183,7 +189,24 @@ def generate_manifests_for_finemapping( manifest_path = f"{manifest_prefix}/chunk_{i}" upload_strings_to_gcs(lines, manifest_path) all_manifests.append( - (manifest_path, len(chunk)), + (i, manifest_path, len(chunk)), ) return all_manifests + + +class FinemappingBatchOperator(CloudBatchSubmitJobOperator): + def __init__(self, manifest: list[int, str, int], study_index_path: str, **kwargs): + i, manifest_path, num_of_tasks = manifest + super().__init__( + project_id=common.GCP_PROJECT, + region=common.GCP_REGION, + job_name=f"finemapping-job-{i}-{time.strftime('%Y%m%d-%H%M%S')}", + job=finemapping_batch_job( + study_index_path=study_index_path, + study_locus_manifest_path=manifest_path, + task_count=num_of_tasks, + ), + deferrable=False, + **kwargs, + )