Skip to content

Commit

Permalink
Transfer flow cell data to housekeeper (#2119)(minor)
Browse files Browse the repository at this point in the history
### Added
- Transfer of flow cell data to housekeeper in the new `demultiplex finish flowcell` command
  • Loading branch information
seallard authored Jun 14, 2023
1 parent 652f5c5 commit 15fd365
Show file tree
Hide file tree
Showing 3 changed files with 378 additions and 7 deletions.
1 change: 1 addition & 0 deletions cg/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class FileExtensions(StrEnum):
BED: str = ".bed"
CRAM: str = ".cram"
CSV: str = ".csv"
FASTQ: str = ".fastq"
GPG: str = ".gpg"
GZIP: str = ".gz"
JSON: str = ".json"
Expand Down
126 changes: 119 additions & 7 deletions cg/meta/demultiplex/demux_post_processing.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
"""Post-processing Demultiiplex API."""
import datetime
import logging
import os
import shutil
import re
import shutil
from contextlib import redirect_stdout
from pathlib import Path
from typing import Iterable, List, Optional
from cg.apps.sequencing_metrics_parser.api import (
create_sample_lane_sequencing_metrics_for_flow_cell,
)

from housekeeper.store.models import Version

from cg.apps.cgstats.crud import create
from cg.apps.cgstats.stats import StatsAPI
from cg.apps.demultiplex.demultiplex_api import DemultiplexingAPI
from cg.apps.demultiplex.demux_report import create_demux_report
from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.apps.sequencing_metrics_parser.api import (
create_sample_lane_sequencing_metrics_for_flow_cell,
)
from cg.constants.cgstats import STATS_HEADER
from cg.constants.constants import FlowCellStatus
from cg.constants.constants import FileExtensions
from cg.constants.demultiplexing import BclConverter, DemultiplexingDirsAndFiles
from cg.constants.housekeeper_tags import SequencingFileTag
from cg.exc import FlowCellError
from cg.meta.demultiplex import files
from cg.meta.transfer import TransferFlowCell
Expand Down Expand Up @@ -126,10 +127,121 @@ def finish_flow_cell_temp(self, flow_cell_name: str) -> None:
self.status_db.session.commit()

# 5. Store flow cell data in housekeeper.
self.add_flow_cell_data_to_housekeeper(
flow_cell_name=flow_cell_name, flow_cell_directory=flow_cell_dir
)

# 6. Create sequencing metrics
self.add_sample_lane_sequencing_metrics_for_flow_cell(flow_cell_name=flow_cell_name)

def add_flow_cell_data_to_housekeeper(
self, flow_cell_name: str, flow_cell_directory: Path
) -> None:
"""Add flow cell data to Housekeeper."""
LOG.info(f"Add flow cell data to Housekeeper for {flow_cell_name}")

self.add_bundle_and_version_if_non_existent(flow_cell_name=flow_cell_name)

tags: List[str] = [SequencingFileTag.FASTQ, SequencingFileTag.SAMPLE_SHEET, flow_cell_name]
self.add_tags_if_non_existent(tag_names=tags)

self.add_sample_sheet(
flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name
)
self.add_sample_fastq_files(
flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name
)

def add_sample_fastq_files(self, flow_cell_directory: Path, flow_cell_name: str) -> None:
"""Add sample fastq files from flow cell to Housekeeper."""
fastq_file_paths: List[Path] = self.get_sample_fastq_file_paths(
flow_cell_directory=flow_cell_directory
)

for fastq_file_path in fastq_file_paths:
sample_id: str = self.get_sample_id_from_sample_fastq_file_path(
fastq_file_path=fastq_file_path
)

if sample_id:
self.add_file_if_non_existent(
file_path=fastq_file_path,
flow_cell_name=flow_cell_name,
tag_names=[SequencingFileTag.FASTQ, sample_id],
)

def add_sample_sheet(self, flow_cell_directory: Path, flow_cell_name: str) -> None:
"""Add sample sheet to Housekeeper."""
self.add_file_if_non_existent(
file_path=Path(flow_cell_directory, DemultiplexingDirsAndFiles.SAMPLE_SHEET_FILE_NAME),
flow_cell_name=flow_cell_name,
tag_names=[SequencingFileTag.SAMPLE_SHEET, flow_cell_name],
)

def is_valid_sample_fastq_filename(self, fastq_file_name: str) -> bool:
"""Validate the file name and discard any undetermined fastq files."""
return "Undetermined" not in fastq_file_name

def get_sample_fastq_file_paths(self, flow_cell_directory: Path) -> List[Path]:
"""Get fastq file paths for flow cell."""
valid_sample_fastq_file_paths = [
file_path
for file_path in flow_cell_directory.glob(
f"**/*{FileExtensions.FASTQ}{FileExtensions.GZIP}"
)
if self.is_valid_sample_fastq_filename(file_path.name)
]
return valid_sample_fastq_file_paths

def get_sample_id_from_sample_fastq_file_path(self, fastq_file_path: Path) -> str:
"""Extract sample id from fastq file path."""
sample_directory: str = fastq_file_path.parent.name
directory_parts: str = sample_directory.split("_")

if len(directory_parts) > 1:
sample_internal_id = directory_parts[1]
else:
sample_internal_id = directory_parts[0]

return sample_internal_id

def add_bundle_and_version_if_non_existent(self, flow_cell_name: str) -> None:
"""Add bundle if it does not exist."""
if not self.hk_api.bundle(name=flow_cell_name):
self.hk_api.create_new_bundle_and_version(name=flow_cell_name)

def add_tags_if_non_existent(self, tag_names: List[str]) -> None:
"""Ensure that tags exist in Housekeeper."""
for tag_name in tag_names:
if self.hk_api.get_tag(name=tag_name) is None:
self.hk_api.add_tag(name=tag_name)

def add_file_if_non_existent(
self, file_path: Path, flow_cell_name: str, tag_names: List[str]
) -> None:
"""Add file to Housekeeper if it has not already been added."""
if not file_path.exists():
LOG.warning(f"File does not exist: {file_path}")
return

if not self.file_exists_in_latest_version_for_bundle(
file_path=file_path, flow_cell_name=flow_cell_name
):
self.hk_api.add_and_include_file_to_latest_version(
bundle_name=flow_cell_name,
file=file_path,
tags=tag_names,
)

def file_exists_in_latest_version_for_bundle(
self, file_path: Path, flow_cell_name: str
) -> bool:
"""Check if file exists in latest version for bundle."""
latest_version: Version = self.hk_api.get_latest_bundle_version(bundle_name=flow_cell_name)
return any(
file_path.name == Path(bundle_file.path).name for bundle_file in latest_version.files
)

def create_flow_cell(self, parsed_flow_cell: FlowCellDirectoryData) -> Flowcell:
"""Create flow cell from the parsed and validated flow cell data."""
return Flowcell(
Expand Down
Loading

0 comments on commit 15fd365

Please sign in to comment.