Skip to content

Commit

Permalink
fix: successful run for genetics_etl
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Szyszkowski committed Feb 11, 2025
1 parent 0f78c80 commit 95922e5
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 72 deletions.
140 changes: 73 additions & 67 deletions src/ot_orchestration/dags/config/genetics_etl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
environment_specs:
- name: Staging
vars:
gentropy_ref: 2.0.2-rc.4
gentropy_ref: dev
release_dir: gs://ot_orchestration/releases/25.02_freeze1
input_dir: gs://open-targets-pre-data-releases/24.12-uo_test-3/input
# NOTE: Patched target index is required, as existing target index does not contain all columns requested by the original gene index - see https://github.com/opentargets/gentropy/pull/946
Expand All @@ -23,9 +23,12 @@ env: Staging
l2g_gold_standard_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json
dataproc:
cluster_metadata:
GENTROPY_REF: 'v{gentropy_ref}'
GENTROPY_REF: '{gentropy_ref}'
cluster_name: otg-etl
autoscaling_policy: otg-etl
autoscaling_policy: otg-efm
worker_machine_type: n1-highmem-32
num_workers: 20
allow_efm: true

nodes:
- id: biosample_index
Expand Down Expand Up @@ -102,6 +105,7 @@ nodes:
- INVALID_VARIANT_IDENTIFIER
- INVALID_CHROMOSOME
- TOP_HIT_AND_SUMMARY_STATS
step.session.write_mode: overwrite

- id: convert_variants_to_vcf
kind: Task
Expand All @@ -128,7 +132,7 @@ nodes:
- convert_variants_to_vcf
google_batch:
entrypoint: /bin/sh
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:{gentropy_ref}
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:2.0.2-rc.4
resource_specs:
cpu_milli: 2000
memory_mib: 2000
Expand All @@ -151,8 +155,7 @@ nodes:
step.vep_output_json_path: '{release_dir}/variants/annotated_variants'
step.gnomad_variant_annotations_path: '{gnomad}/v4.1/variant_index'
step.variant_index_path: '{release_dir}/variant_index'
step.amino_acid_change_annotations:
- gs://otar013-ppp/OTAR2081_foldx/foldx_variant_annotation
step.amino_acid_change_annotations: []

- id: colocalisation_coloc
prerequisites:
Expand All @@ -163,17 +166,20 @@ nodes:
step.coloc_path: '{release_dir}/colocalisation/'
step.colocalisation_method: Coloc
+step.colocalisation_method_params: '{priorc1: 1e-4, priorc2: 1e-4, priorc12: 1e-5}'
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '16g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '2'}"
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '2000', spark.executor.memory: '25g',spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}"
step.session.write_mode: overwrite

- id: colocalisation_ecaviar
prerequisites:
- variant_index
- colocalisation_coloc
params:
step: colocalisation
step.credible_set_path: '{release_dir}/credible_set'
step.coloc_path: '{release_dir}/colocalisation'
step.colocalisation_method: ECaviar
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}"
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '2000', spark.executor.memory: '25g', spark.executor.memoryOverhead: '1g', spark.executor.cores: '4'}"
step.session.write_mode: overwrite

- id: l2g_feature_matrix
prerequisites:
Expand All @@ -191,62 +197,62 @@ nodes:
+step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}"
step.session.write_mode: overwrite

# - id: l2g_train
# prerequisites:
# - l2g_feature_matrix
# params:
# step: locus_to_gene
# step.run_mode: train
# step.wandb_run_name: 24.11_freeze10
# step.hf_hub_repo_id: opentargets/locus_to_gene
# step.hf_model_commit_message: 'chore: update model based on 24.11_freeze10 run'
# step.model_path: '{release_dir}/locus_to_gene_model/classifier.skops'
# step.credible_set_path: '{release_dir}/credible_set'
# step.variant_index_path: '{release_dir}/variant_index'
# step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix'
# step.gold_standard_curation_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json
# step.gene_interactions_path: gs://genetics_etl_python_playground/static_assets/interaction # OTP 23.12 data
# step.hyperparameters.n_estimators: 100
# step.hyperparameters.max_depth: 5
# step.hyperparameters.loss: log_loss
# +step.session.extended_spark_conf: "{spark.kryoserializer.buffer.max:500m, spark.sql.autoBroadcastJoinThreshold:'-1'}"

# - id: l2g_predict
# prerequisites:
# - l2g_train
# params:
# step: locus_to_gene
# step.run_mode: predict
# step.predictions_path: '{release_dir}/locus_to_gene_predictions'
# step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix'
# step.credible_set_path: '{release_dir}/credible_set'
# step.download_from_hub: false
# step.l2g_threshold: 0.05
# step.model_path: gs://ot_orchestration/benchmarks/l2g/fm0/v5.1_best_cv/locus_to_gene_model/classifier.skops
# step.hf_hub_repo_id: opentargets/locus_to_gene
# step.session.write_mode: overwrite

# - id: l2g_evidence
# prerequisites:
# - l2g_predict
# params:
# step: locus_to_gene_evidence
# step.evidence_output_path: '{release_dir}/locus_to_gene_evidence
# step.locus_to_gene_predictions_path: '{release_dir}/locus_to_gene_predictions
# step.credible_set_path: '{release_dir}/credible_set
# step.study_index_path: '{release_dir}/study_index
# step.locus_to_gene_threshold: 0.05
# step.session.write_mode: overwrite

# - id: l2g_associations
# params:
# step: locus_to_gene_associations
# step.evidence_input_path: '{release_dir}/locus_to_gene_evidence
# step.disease_index_path: gs://open-targets-pre-data-releases/24.06/output/etl/parquet/diseases
# step.direct_associations_output_path: '{release_dir}/locus_to_gene_associations/direct_associations
# step.indirect_associations_output_path: '{release_dir}/locus_to_gene_associations/indirect_associations
# step.session.spark_uri: yarn
# step.session.write_mode: overwrite

# prerequisites:
# - l2g_evidence
- id: l2g_train
prerequisites:
- l2g_feature_matrix
params:
step: locus_to_gene
step.run_mode: train
step.wandb_run_name: 24.11_freeze10
step.hf_hub_repo_id: opentargets/locus_to_gene
step.hf_model_commit_message: 'chore: update model based on 24.11_freeze10 run'
step.model_path: '{release_dir}/locus_to_gene_model/classifier.skops'
step.credible_set_path: '{release_dir}/credible_set'
step.variant_index_path: '{release_dir}/variant_index'
step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix'
step.gold_standard_curation_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json
step.gene_interactions_path: gs://genetics_etl_python_playground/static_assets/interaction # OTP 23.12 data
step.hyperparameters.n_estimators: 100
step.hyperparameters.max_depth: 5
step.hyperparameters.loss: log_loss
+step.session.extended_spark_conf: "{spark.kryoserializer.buffer.max:500m, spark.sql.autoBroadcastJoinThreshold:'-1'}"

- id: l2g_predict
prerequisites:
- l2g_train
params:
step: locus_to_gene
step.run_mode: predict
step.predictions_path: '{release_dir}/locus_to_gene_predictions'
step.feature_matrix_path: '{release_dir}/locus_to_gene_feature_matrix'
step.credible_set_path: '{release_dir}/credible_set'
step.download_from_hub: false
step.l2g_threshold: 0.05
step.model_path: gs://ot_orchestration/benchmarks/l2g/fm0/v5.1_best_cv/locus_to_gene_model/classifier.skops
step.hf_hub_repo_id: opentargets/locus_to_gene
step.session.write_mode: overwrite

- id: l2g_evidence
prerequisites:
- l2g_predict
params:
step: locus_to_gene_evidence
step.evidence_output_path: '{release_dir}/locus_to_gene_evidence'
step.locus_to_gene_predictions_path: '{release_dir}/locus_to_gene_predictions'
step.credible_set_path: '{release_dir}/credible_sets/'
step.study_index_path: '{release_dir}/study_index'
step.locus_to_gene_threshold: 0.05
step.session.write_mode: overwrite

- id: l2g_associations
params:
step: locus_to_gene_associations
step.evidence_input_path: '{release_dir}/locus_to_gene_evidence'
step.disease_index_path: '{output_path}/etl/parquet/diseases'
step.direct_associations_output_path: '{release_dir}/locus_to_gene_associations/direct_associations'
step.indirect_associations_output_path: '{release_dir}/locus_to_gene_associations/indirect_associations'
step.session.spark_uri: yarn
step.session.write_mode: overwrite

prerequisites:
- l2g_evidence
9 changes: 4 additions & 5 deletions src/ot_orchestration/dags/genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
find_node_in_config,
read_yaml_config,
)
from ot_orchestration.utils.common import (
shared_dag_args,
shared_dag_kwargs,
)
from ot_orchestration.utils.common import shared_dag_args, shared_dag_kwargs
from ot_orchestration.utils.dataproc import (
generate_dataproc_task_chain,
submit_gentropy_step,
)
from ot_orchestration.utils.labels import GentropyDagLabels

SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "config" / "genetics_etl.yaml"
config = read_yaml_config(SOURCE_CONFIG_FILE_PATH)
Expand All @@ -42,7 +40,7 @@
description="Open Targets Genetics ETL workflow",
default_args=shared_dag_args,
**shared_dag_kwargs,
):
) as genetics_etl_dag:
with TaskGroup(group_id="genetics_etl") as genetics_etl:
task_config = find_node_in_config(nodes, "variant_annotation")
if task_config:
Expand Down Expand Up @@ -74,4 +72,5 @@
generate_dataproc_task_chain(
tasks=list(node_map.values()),
**config["dataproc"],
labels=GentropyDagLabels(gentropy_dag="genetics_etl", run_id="airflow"),
)

0 comments on commit 95922e5

Please sign in to comment.