Skip to content

Commit

Permalink
feat: pipeline setup
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Szyszkowski committed Jul 31, 2024
1 parent fbe3e31 commit b4615fd
Show file tree
Hide file tree
Showing 23 changed files with 1,199 additions and 682 deletions.
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ GOOGLE_DOCKER_CREDENTIALS_PATH=/.config/gcloud
GOOGLE_APPLICATION_CREDENTIALS=/.config/gcloud/service_account_credentials.json
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://?extra__google_cloud_platform__key_path=/.config/gcloud/service_account_credentials.json'
GCP_PROJECT_ID=open-targets-genetics-dev
AIRFLOW_UID=503
AIRFLOW__CORE__MAX_MAP_LENGTH=50000
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
AIRFLOW__CORE__PARALLELISM=500
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=500
19 changes: 11 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ APP_NAME ?= $$(cat pyproject.toml| grep -m 1 "name" | cut -d" " -f3 | sed 's/"/
VERSION := $$(grep '^version' pyproject.toml | sed 's%version = "\(.*\)"%\1%')
BUCKET_NAME=gs://genetics_etl_python_playground/initialisation/${VERSION}/
DOCKER_IMAGE := "Orchestration-Airflow"
TEST_COVERAGE := 40

.PHONY: $(shell sed -n -e '/^$$/ { n ; /^[^ .\#][^ ]*:/ { s/:.*$$// ; p ; } ; }' $(MAKEFILE_LIST))
.DEFAULT_GOAL := help
Expand All @@ -29,17 +30,18 @@ format: ## run formatting
@poetry run python -m ruff check --fix src/$(APP_NAME) tests

test: ## run unit tests
@poetry run python -m pytest tests/*.py
@poetry run coverage run -m pytest tests/*.py -s -p no:warnings
@poetry run coverage report --omit="tests/*" --fail-under=$(TEST_COVERAGE)

check: format check-types test ## run all checks

generate-requirements: ## generate requirements.txt from poetry dependencies to install in the docker image
poetry export --without-hashes --with dev --format=requirements.txt > requirements.txt
poetry export --without-hashes --with dev --format=requirements.txt > docker/airflow-dev/requirements.txt

build-airflow-image: generate-requirements ## build local airflow image for the infrastructure
docker build . \
docker build docker/airflow-dev \
--tag extending_airflow:latest \
-f Dockerfile \
-f docker/airflow-dev/Dockerfile \
--no-cache

build-whl: ## build ot-orchestration package wheel
Expand All @@ -53,13 +55,14 @@ build-genetics-etl-image: build-whl ## build local genetics-etl image for the te
--build-arg DIST=$(shell find dist -name 'ot_orchestration*')

test-gwas-catalog-batch-script: ## test harmonisation task
# mkdir -p test_batch
# gsutil -m rsync -r gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600 test_batch
mkdir -p test_batch
gsutil -m rsync -r gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600 test_batch
docker run \
-v $(HOME)/.config/gcloud:/root/.config/gcloud \
-e MANIFEST_PATH=gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600/manifest.json \
-e GOOGLE_APPLICATION_CREDENTIALS=/root/.config/gcloud/service_account_credentials.json \
-ti \
--rm \
genetics_etl:test
# gsutil -m rsync -r test_batch gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600
genetics_etl:test \
-c "ot gwas-catalog-process-in-batch"
gsutil -m rsync -r test_batch gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions docker/genetics_etl/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore generated credentials from google-github-actions/auth
gha-creds-*.json
File renamed without changes.
3 changes: 3 additions & 0 deletions docker/genetics_etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# genetics_etl docker image

This is the genetics_etl docker image that is currently used by the GWAS_Catalog_dag batch processing. The image is based on gentropy image from [gentropy repository](https://github.com/opentargets/gentropy)
242 changes: 242 additions & 0 deletions docs/gwas_catalog_dag.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 0 additions & 3 deletions images/genetics_etl/README.md

This file was deleted.

638 changes: 321 additions & 317 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pyyaml = "^6.0.1"
google = "^3.0.0"
pendulum = "^3.0.0"
apache-airflow-providers-apache-beam = "^5.7.1"
pandas = "<2.2"
typing-extensions = "^4.12.2"
requests = "^2.32.3"
pandas = "<2.2"

[tool.poetry.group.dev.dependencies]
ruff = "^0.4.9"
Expand Down Expand Up @@ -95,13 +95,14 @@ ignore = [

]


[tool.ruff.lint.flake8-quotes]
docstring-quotes = "double"

[tool.ruff.lint.pydocstyle]
convention = "google"


[tool.pytest]
pytonpath = "src/"

[tool.pytest.ini_options]
markers = ["integration_test"]
4 changes: 2 additions & 2 deletions src/ot_orchestration/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from ot_orchestration.cli.fetch_raw_sumstat_paths import fetch_raw_sumstat_paths
from ot_orchestration.cli.generate_dotenv import generate_dotenv
from ot_orchestration.cli.process_in_batch import gwas_catalog_process_in_batch
from ot_orchestration.cli.process_in_batch import gwas_catalog_pipeline

logging.basicConfig(level=logging.INFO)
asci_art = """
Expand All @@ -24,7 +24,7 @@ def cli():

cli.add_command(fetch_raw_sumstat_paths)
cli.add_command(generate_dotenv)
cli.add_command(gwas_catalog_process_in_batch)
cli.add_command(gwas_catalog_pipeline)


__all__ = ["cli"]
138 changes: 61 additions & 77 deletions src/ot_orchestration/cli/process_in_batch.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
"""Process GWAS Catalog summary statistics in batch job."""

import os
from ot_orchestration.utils import GCSIOManager
from ot_orchestration.types import Manifest_Object
from ot_orchestration.utils import IOManager, GWASCatalogPipelineManifest
import logging
import subprocess
import click
import os
import sys

MANIFEST_PATH_ENV_VAR = "MANIFEST_PATH"

def harmonise(manifest: Manifest_Object) -> Manifest_Object:

def harmonise_step(
manifest: GWASCatalogPipelineManifest,
) -> GWASCatalogPipelineManifest:
"""Run Harmonisation."""
raw_path = manifest["rawPath"]
harmonised_path = manifest["harmonisedPath"]
study_id = manifest["studyId"]
manifest_path = manifest["manifestPath"]
pass_harmonisation = manifest["passHarmonisation"]
logging.info("Running %s for %s", "harmonisation", study_id)

command = [
"poetry",
"run",
"gentropy",
"step=gwas_catalog_sumstat_preprocess",
f'step.raw_sumstats_path="{raw_path}"',
Expand All @@ -26,34 +32,38 @@ def harmonise(manifest: Manifest_Object) -> Manifest_Object:
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
]
if GCSIOManager().exists(harmonised_path):
logging.info("Harmonisation result exists for %s. Skipping", study_id)
manifest["passHarmonisation"] = True
if IOManager().resolve(harmonised_path).exists():
if not pass_harmonisation:
logging.info("Harmonisation result exists for %s. Skipping", study_id)
manifest["passHarmonisation"] = True
return manifest

logging.info("Running command %s", " ".join(command))
command = ["echo", "RUNNING!"]
result = subprocess.run(args=command, capture_output=True)
logging.info(result)
if result.returncode != 0:
logging.error("Harmonisation for study %s failed!", study_id)
error_msg = result.stderr.decode()
logging.error(error_msg)
manifest["passHarmonisation"] = False
logging.info("Dumping manifest to %s", manifest["manifestPath"])
GCSIOManager().dump(manifest["manifestPath"], manifest)
exit(1)
logging.info("Dumping manifest to %s", manifest_path)
IOManager().resolve(manifest_path).dump(manifest)
sys.exit(1)

logging.info("Harmonisation for study %s succeded!", study_id)
manifest["passHarmonisation"] = True
return manifest


def qc(manifest: Manifest_Object) -> Manifest_Object:
def qc_step(manifest: GWASCatalogPipelineManifest) -> GWASCatalogPipelineManifest:
"""Run QC."""
harmonised_path = manifest["harmonisedPath"]
qc_path = manifest["qcPath"]
study_id = manifest["studyId"]
manifest_path = manifest["manifestPath"]

command = [
"poetry",
"run",
"gentropy",
"step=summary_statistics_qc",
f'step.gwas_path="{harmonised_path}"',
Expand All @@ -65,9 +75,8 @@ def qc(manifest: Manifest_Object) -> Manifest_Object:
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
]
result_exists = GCSIOManager().exists(qc_path)
logging.info("Result exists: %s", result_exists)
if GCSIOManager().exists(qc_path):
result_exists = IOManager().resolve(qc_path).exists()
if result_exists:
logging.info("QC result exists for %s. Skipping", study_id)
manifest["passQC"] = True
return manifest
Expand All @@ -78,76 +87,51 @@ def qc(manifest: Manifest_Object) -> Manifest_Object:
error_msg = result.stderr.decode()
logging.error(error_msg)
manifest["passQC"] = False
logging.info("Dumping manifest to %s", manifest["manifestPath"])
GCSIOManager().dump(manifest["manifestPath"], manifest)
logging.info("Dumping manifest to %s", manifest_path)
IOManager().resolve(manifest_path).dump(manifest)
exit(1)

logging.info("QC for study %s succeded!", study_id)
manifest["passQC"] = True
return manifest


def qc_consolidation(manifest: Manifest_Object) -> Manifest_Object:
pass
def qc_consolidation_step(
manifest: GWASCatalogPipelineManifest,
) -> GWASCatalogPipelineManifest:
"""Check if sumstats pass qc thresholds."""
return manifest


def clumping(manifest: Manifest_Object) -> Manifest_Object:
def clump_step(manifest: GWASCatalogPipelineManifest) -> GWASCatalogPipelineManifest:
"""Run Clumping."""
harmonised_path = manifest["harmonisedPath"]
clumping_path = manifest["clumpingPath"]
study_id = manifest["studyId"]
command = [
"poetry",
"run",
"gentropy",
"step=clumping",
f'step.gwas_path="{harmonised_path}"',
f'step.output_path="{clumping_path}"',
f'step.study_id="{study_id}"',
"+step.session.extended_spark_conf={spark.jars:'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar'}",
"+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:'false'}",
"+step.session.extended_spark_conf={spark.driver.memory:'30g'}",
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
]
if GCSIOManager().exists(clumping_path):
logging.info("Clumping result exists for %s. Skipping", study_id)
manifest["passClumping"] = True
return manifest

result = subprocess.run(args=command, capture_output=True)
if result.returncode != 0:
logging.error("Clumping for study %s failed!", study_id)
error_msg = result.stderr.decode()
logging.error(error_msg)
manifest["passClumping"] = False
logging.info("Dumping manifest to %s", manifest["manifestPath"])
GCSIOManager().dump(manifest["manifestPath"], manifest)
exit(1)
return manifest


@click.command()
def gwas_catalog_process_in_batch():
"""Run gwas catalog processing of summary statistics in batch. This includes harmonisation, QC and clumping."""
PROCESSING_ORDER = ["harmonisation"]
MANIFEST_PATH = os.environ.get("MANIFEST_PATH")
if MANIFEST_PATH is None:
logging.error("MANIFEST_PATH not set!")
exit(1)

manifest = GCSIOManager().load(MANIFEST_PATH)
study = manifest["studyId"]
PROCESSING_STEPS = {"harmonisation": harmonise, "qc": qc, "clumping": clumping}
for step in PROCESSING_ORDER:
if manifest[f"pass{step.capitalize()}"]:
logging.info("Skipping %s", step)
continue
logging.info("Running %s for %s", step, study)
manifest = PROCESSING_STEPS[step](manifest)
logging.info("Finished %s for %s", step, study)

GCSIOManager().dump(MANIFEST_PATH, manifest)


__all__ = ["gwas_catalog_process_in_batch"]
def gwas_catalog_pipeline():
"""Run gwas catalog processing of summary statistics in batch.
This includes harmonisation, QC and clumping.
This command requires setting the `MANIFEST_PATH` in the
environment. The variable should be the reference to the path with the
manifest file.
"""
logging.debug("Reading MANIFEST_PATH env variable")
manifest_path = os.getenv(MANIFEST_PATH_ENV_VAR)
if not manifest_path:
logging.error("MANIFEST_PATH environment variable is missing")
sys.exit(1)
logging.debug("MANIFEST_PATH: %s", manifest_path)
manifest = GWASCatalogPipelineManifest.from_file(manifest_path)
logging.debug("MANIFEST: %s", manifest)
# for now dummy implementatin of the pipeline processing order
manifest = harmonise_step(manifest)
manifest = qc_step(manifest)
manifest = qc_consolidation_step(manifest)
manifest = clump_step(manifest)

IOManager().resolve(manifest_path).dump(manifest)


__all__ = ["gwas_catalog_pipeline"]
Loading

0 comments on commit b4615fd

Please sign in to comment.