From 95922e5b7a0ab1d2ca2e5c75f619425373cd7150 Mon Sep 17 00:00:00 2001 From: Szymon Szyszkowski Date: Tue, 11 Feb 2025 11:52:17 +0000 Subject: [PATCH] fix: successful run for genetics_etl --- .../dags/config/genetics_etl.yaml | 140 +++++++++--------- src/ot_orchestration/dags/genetics_etl.py | 9 +- 2 files changed, 77 insertions(+), 72 deletions(-) diff --git a/src/ot_orchestration/dags/config/genetics_etl.yaml b/src/ot_orchestration/dags/config/genetics_etl.yaml index f2a37d4..bcd12e5 100644 --- a/src/ot_orchestration/dags/config/genetics_etl.yaml +++ b/src/ot_orchestration/dags/config/genetics_etl.yaml @@ -3,7 +3,7 @@ environment_specs: - name: Staging vars: - gentropy_ref: 2.0.2-rc.4 + gentropy_ref: dev release_dir: gs://ot_orchestration/releases/25.02_freeze1 input_dir: gs://open-targets-pre-data-releases/24.12-uo_test-3/input # NOTE: Patched target index is required, as existing target index does not contain all columns requested by the original gene index - see https://github.com/opentargets/gentropy/pull/946 @@ -23,9 +23,12 @@ env: Staging l2g_gold_standard_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json dataproc: cluster_metadata: - GENTROPY_REF: 'v{gentropy_ref}' + GENTROPY_REF: '{gentropy_ref}' cluster_name: otg-etl - autoscaling_policy: otg-etl + autoscaling_policy: otg-efm + worker_machine_type: n1-highmem-32 + num_workers: 20 + allow_efm: true nodes: - id: biosample_index @@ -102,6 +105,7 @@ nodes: - INVALID_VARIANT_IDENTIFIER - INVALID_CHROMOSOME - TOP_HIT_AND_SUMMARY_STATS + step.session.write_mode: overwrite - id: convert_variants_to_vcf kind: Task @@ -128,7 +132,7 @@ nodes: - convert_variants_to_vcf google_batch: entrypoint: /bin/sh - image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:{gentropy_ref} + image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:2.0.2-rc.4 resource_specs: cpu_milli: 2000 memory_mib: 2000 @@ -151,8 +155,7 @@ nodes: step.vep_output_json_path: '{release_dir}/variants/annotated_variants' step.gnomad_variant_annotations_path: '{gnomad}/v4.1/variant_index' step.variant_index_path: '{release_dir}/variant_index' - step.amino_acid_change_annotations: - - gs://otar013-ppp/OTAR2081_foldx/foldx_variant_annotation + step.amino_acid_change_annotations: [] - id: colocalisation_coloc prerequisites: @@ -163,17 +166,20 @@ nodes: step.coloc_path: '{release_dir}/colocalisation/' step.colocalisation_method: Coloc +step.colocalisation_method_params: '{priorc1: 1e-4, priorc2: 1e-4, priorc12: 1e-5}' - +step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '16g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '2'}" + +step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '2000', spark.executor.memory: '25g',spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}" + step.session.write_mode: overwrite - id: colocalisation_ecaviar prerequisites: - variant_index + - colocalisation_coloc params: step: colocalisation step.credible_set_path: '{release_dir}/credible_set' step.coloc_path: '{release_dir}/colocalisation' step.colocalisation_method: ECaviar - +step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}" + +step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '2000', spark.executor.memory: '25g', spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}" + step.session.write_mode: overwrite - id: l2g_feature_matrix prerequisites: @@ -191,62 +197,62 @@ nodes: +step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}" step.session.write_mode: overwrite -# - id: l2g_train -# prerequisites: -# - l2g_feature_matrix -# params: -# step: locus_to_gene -# step.run_mode: train -# step.wandb_run_name: 24.11_freeze10 -# step.hf_hub_repo_id: opentargets/locus_to_gene -# step.hf_model_commit_message: 'chore: update model based on 24.11_freeze10 run' -# step.model_path: '{release_dir}/locus_to_gene_model/classifier.skops' -# step.credible_set_path: '{release_dir}/credible_set' -# step.variant_index_path: '{release_dir}/variant_index' -# step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix' -# step.gold_standard_curation_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json -# step.gene_interactions_path: gs://genetics_etl_python_playground/static_assets/interaction # OTP 23.12 data -# step.hyperparameters.n_estimators: 100 -# step.hyperparameters.max_depth: 5 -# step.hyperparameters.loss: log_loss -# +step.session.extended_spark_conf: "{spark.kryoserializer.buffer.max:500m, spark.sql.autoBroadcastJoinThreshold:'-1'}" - -# - id: l2g_predict -# prerequisites: -# - l2g_train -# params: -# step: locus_to_gene -# step.run_mode: predict -# step.predictions_path: '{release_dir}/locus_to_gene_predictions' -# step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix' -# step.credible_set_path: '{release_dir}/credible_set' -# step.download_from_hub: false -# step.l2g_threshold: 0.05 -# step.model_path: gs://ot_orchestration/benchmarks/l2g/fm0/v5.1_best_cv/locus_to_gene_model/classifier.skops -# step.hf_hub_repo_id: opentargets/locus_to_gene -# step.session.write_mode: overwrite - -# - id: l2g_evidence -# prerequisites: -# - l2g_predict -# params: -# step: locus_to_gene_evidence -# step.evidence_output_path: '{release_dir}/locus_to_gene_evidence -# step.locus_to_gene_predictions_path: '{release_dir}/locus_to_gene_predictions -# step.credible_set_path: '{release_dir}/credible_set -# step.study_index_path: '{release_dir}/study_index -# step.locus_to_gene_threshold: 0.05 -# step.session.write_mode: overwrite - -# - id: l2g_associations -# params: -# step: locus_to_gene_associations -# step.evidence_input_path: '{release_dir}/locus_to_gene_evidence -# step.disease_index_path: gs://open-targets-pre-data-releases/24.06/output/etl/parquet/diseases -# step.direct_associations_output_path: '{release_dir}/locus_to_gene_associations/direct_associations -# step.indirect_associations_output_path: '{release_dir}/locus_to_gene_associations/indirect_associations -# step.session.spark_uri: yarn -# step.session.write_mode: overwrite - -# prerequisites: -# - l2g_evidence + - id: l2g_train + prerequisites: + - l2g_feature_matrix + params: + step: locus_to_gene + step.run_mode: train + step.wandb_run_name: 24.11_freeze10 + step.hf_hub_repo_id: opentargets/locus_to_gene + step.hf_model_commit_message: 'chore: update model based on 24.11_freeze10 run' + step.model_path: '{release_dir}/locus_to_gene_model/classifier.skops' + step.credible_set_path: '{release_dir}/credible_set' + step.variant_index_path: '{release_dir}/variant_index' + step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix' + step.gold_standard_curation_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json + step.gene_interactions_path: gs://genetics_etl_python_playground/static_assets/interaction # OTP 23.12 data + step.hyperparameters.n_estimators: 100 + step.hyperparameters.max_depth: 5 + step.hyperparameters.loss: log_loss + +step.session.extended_spark_conf: "{spark.kryoserializer.buffer.max:500m, spark.sql.autoBroadcastJoinThreshold:'-1'}" + + - id: l2g_predict + prerequisites: + - l2g_train + params: + step: locus_to_gene + step.run_mode: predict + step.predictions_path: '{release_dir}/locus_to_gene_predictions' + step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix' + step.credible_set_path: '{release_dir}/credible_set' + step.download_from_hub: false + step.l2g_threshold: 0.05 + step.model_path: gs://ot_orchestration/benchmarks/l2g/fm0/v5.1_best_cv/locus_to_gene_model/classifier.skops + step.hf_hub_repo_id: opentargets/locus_to_gene + step.session.write_mode: overwrite + + - id: l2g_evidence + prerequisites: + - l2g_predict + params: + step: locus_to_gene_evidence + step.evidence_output_path: '{release_dir}/locus_to_gene_evidence' + step.locus_to_gene_predictions_path: '{release_dir}/locus_to_gene_predictions' + step.credible_set_path: '{release_dir}/credible_sets/' + step.study_index_path: '{release_dir}/study_index' + step.locus_to_gene_threshold: 0.05 + step.session.write_mode: overwrite + + - id: l2g_associations + params: + step: locus_to_gene_associations + step.evidence_input_path: '{release_dir}/locus_to_gene_evidence' + step.disease_index_path: '{output_path}/etl/parquet/diseases' + step.direct_associations_output_path: '{release_dir}/locus_to_gene_associations/direct_associations' + step.indirect_associations_output_path: '{release_dir}/locus_to_gene_associations/indirect_associations' + step.session.spark_uri: yarn + step.session.write_mode: overwrite + + prerequisites: + - l2g_evidence diff --git a/src/ot_orchestration/dags/genetics_etl.py b/src/ot_orchestration/dags/genetics_etl.py index 953efd2..92ffaf8 100644 --- a/src/ot_orchestration/dags/genetics_etl.py +++ b/src/ot_orchestration/dags/genetics_etl.py @@ -18,14 +18,12 @@ find_node_in_config, read_yaml_config, ) -from ot_orchestration.utils.common import ( - shared_dag_args, - shared_dag_kwargs, -) +from ot_orchestration.utils.common import shared_dag_args, shared_dag_kwargs from ot_orchestration.utils.dataproc import ( generate_dataproc_task_chain, submit_gentropy_step, ) +from ot_orchestration.utils.labels import GentropyDagLabels SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "config" / "genetics_etl.yaml" config = read_yaml_config(SOURCE_CONFIG_FILE_PATH) @@ -42,7 +40,7 @@ description="Open Targets Genetics ETL workflow", default_args=shared_dag_args, **shared_dag_kwargs, -): +) as genetics_etl_dag: with TaskGroup(group_id="genetics_etl") as genetics_etl: task_config = find_node_in_config(nodes, "variant_annotation") if task_config: @@ -74,4 +72,5 @@ generate_dataproc_task_chain( tasks=list(node_map.values()), **config["dataproc"], + labels=GentropyDagLabels(gentropy_dag="genetics_etl", run_id="airflow"), )