From 15fd3658c04dbbd0f1ed514093aeb7bf93de7e19 Mon Sep 17 00:00:00 2001 From: Sebastian Allard Date: Wed, 14 Jun 2023 14:26:19 +0200 Subject: [PATCH] Transfer flow cell data to housekeeper (#2119)(minor) ### Added - Transfer of flow cell data to housekeeper in the new `demultiplex finish flowcell` command --- cg/constants/constants.py | 1 + cg/meta/demultiplex/demux_post_processing.py | 126 ++++++++- .../demultiplex/test_demux_post_processing.py | 258 ++++++++++++++++++ 3 files changed, 378 insertions(+), 7 deletions(-) diff --git a/cg/constants/constants.py b/cg/constants/constants.py index fc4cd3a9e2..568720c7ee 100644 --- a/cg/constants/constants.py +++ b/cg/constants/constants.py @@ -135,6 +135,7 @@ class FileExtensions(StrEnum): BED: str = ".bed" CRAM: str = ".cram" CSV: str = ".csv" + FASTQ: str = ".fastq" GPG: str = ".gpg" GZIP: str = ".gz" JSON: str = ".json" diff --git a/cg/meta/demultiplex/demux_post_processing.py b/cg/meta/demultiplex/demux_post_processing.py index ccb3777ed0..84b32a5e61 100644 --- a/cg/meta/demultiplex/demux_post_processing.py +++ b/cg/meta/demultiplex/demux_post_processing.py @@ -1,24 +1,25 @@ """Post-processing Demultiiplex API.""" -import datetime import logging -import os -import shutil import re +import shutil from contextlib import redirect_stdout from pathlib import Path from typing import Iterable, List, Optional -from cg.apps.sequencing_metrics_parser.api import ( - create_sample_lane_sequencing_metrics_for_flow_cell, -) + +from housekeeper.store.models import Version from cg.apps.cgstats.crud import create from cg.apps.cgstats.stats import StatsAPI from cg.apps.demultiplex.demultiplex_api import DemultiplexingAPI from cg.apps.demultiplex.demux_report import create_demux_report from cg.apps.housekeeper.hk import HousekeeperAPI +from cg.apps.sequencing_metrics_parser.api import ( + create_sample_lane_sequencing_metrics_for_flow_cell, +) from cg.constants.cgstats import STATS_HEADER -from cg.constants.constants import FlowCellStatus +from cg.constants.constants import FileExtensions from cg.constants.demultiplexing import BclConverter, DemultiplexingDirsAndFiles +from cg.constants.housekeeper_tags import SequencingFileTag from cg.exc import FlowCellError from cg.meta.demultiplex import files from cg.meta.transfer import TransferFlowCell @@ -126,10 +127,121 @@ def finish_flow_cell_temp(self, flow_cell_name: str) -> None: self.status_db.session.commit() # 5. Store flow cell data in housekeeper. + self.add_flow_cell_data_to_housekeeper( + flow_cell_name=flow_cell_name, flow_cell_directory=flow_cell_dir + ) # 6. Create sequencing metrics self.add_sample_lane_sequencing_metrics_for_flow_cell(flow_cell_name=flow_cell_name) + def add_flow_cell_data_to_housekeeper( + self, flow_cell_name: str, flow_cell_directory: Path + ) -> None: + """Add flow cell data to Housekeeper.""" + LOG.info(f"Add flow cell data to Housekeeper for {flow_cell_name}") + + self.add_bundle_and_version_if_non_existent(flow_cell_name=flow_cell_name) + + tags: List[str] = [SequencingFileTag.FASTQ, SequencingFileTag.SAMPLE_SHEET, flow_cell_name] + self.add_tags_if_non_existent(tag_names=tags) + + self.add_sample_sheet( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + self.add_sample_fastq_files( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + def add_sample_fastq_files(self, flow_cell_directory: Path, flow_cell_name: str) -> None: + """Add sample fastq files from flow cell to Housekeeper.""" + fastq_file_paths: List[Path] = self.get_sample_fastq_file_paths( + flow_cell_directory=flow_cell_directory + ) + + for fastq_file_path in fastq_file_paths: + sample_id: str = self.get_sample_id_from_sample_fastq_file_path( + fastq_file_path=fastq_file_path + ) + + if sample_id: + self.add_file_if_non_existent( + file_path=fastq_file_path, + flow_cell_name=flow_cell_name, + tag_names=[SequencingFileTag.FASTQ, sample_id], + ) + + def add_sample_sheet(self, flow_cell_directory: Path, flow_cell_name: str) -> None: + """Add sample sheet to Housekeeper.""" + self.add_file_if_non_existent( + file_path=Path(flow_cell_directory, DemultiplexingDirsAndFiles.SAMPLE_SHEET_FILE_NAME), + flow_cell_name=flow_cell_name, + tag_names=[SequencingFileTag.SAMPLE_SHEET, flow_cell_name], + ) + + def is_valid_sample_fastq_filename(self, fastq_file_name: str) -> bool: + """Validate the file name and discard any undetermined fastq files.""" + return "Undetermined" not in fastq_file_name + + def get_sample_fastq_file_paths(self, flow_cell_directory: Path) -> List[Path]: + """Get fastq file paths for flow cell.""" + valid_sample_fastq_file_paths = [ + file_path + for file_path in flow_cell_directory.glob( + f"**/*{FileExtensions.FASTQ}{FileExtensions.GZIP}" + ) + if self.is_valid_sample_fastq_filename(file_path.name) + ] + return valid_sample_fastq_file_paths + + def get_sample_id_from_sample_fastq_file_path(self, fastq_file_path: Path) -> str: + """Extract sample id from fastq file path.""" + sample_directory: str = fastq_file_path.parent.name + directory_parts: str = sample_directory.split("_") + + if len(directory_parts) > 1: + sample_internal_id = directory_parts[1] + else: + sample_internal_id = directory_parts[0] + + return sample_internal_id + + def add_bundle_and_version_if_non_existent(self, flow_cell_name: str) -> None: + """Add bundle if it does not exist.""" + if not self.hk_api.bundle(name=flow_cell_name): + self.hk_api.create_new_bundle_and_version(name=flow_cell_name) + + def add_tags_if_non_existent(self, tag_names: List[str]) -> None: + """Ensure that tags exist in Housekeeper.""" + for tag_name in tag_names: + if self.hk_api.get_tag(name=tag_name) is None: + self.hk_api.add_tag(name=tag_name) + + def add_file_if_non_existent( + self, file_path: Path, flow_cell_name: str, tag_names: List[str] + ) -> None: + """Add file to Housekeeper if it has not already been added.""" + if not file_path.exists(): + LOG.warning(f"File does not exist: {file_path}") + return + + if not self.file_exists_in_latest_version_for_bundle( + file_path=file_path, flow_cell_name=flow_cell_name + ): + self.hk_api.add_and_include_file_to_latest_version( + bundle_name=flow_cell_name, + file=file_path, + tags=tag_names, + ) + + def file_exists_in_latest_version_for_bundle( + self, file_path: Path, flow_cell_name: str + ) -> bool: + """Check if file exists in latest version for bundle.""" + latest_version: Version = self.hk_api.get_latest_bundle_version(bundle_name=flow_cell_name) + return any( + file_path.name == Path(bundle_file.path).name for bundle_file in latest_version.files + ) + def create_flow_cell(self, parsed_flow_cell: FlowCellDirectoryData) -> Flowcell: """Create flow cell from the parsed and validated flow cell data.""" return Flowcell( diff --git a/tests/meta/demultiplex/test_demux_post_processing.py b/tests/meta/demultiplex/test_demux_post_processing.py index b973788af3..d60cfc0eb8 100644 --- a/tests/meta/demultiplex/test_demux_post_processing.py +++ b/tests/meta/demultiplex/test_demux_post_processing.py @@ -2,7 +2,11 @@ from pathlib import Path from typing import Generator +from mock import MagicMock, call + from cg.constants.demultiplexing import DemultiplexingDirsAndFiles, BclConverter +from cg.constants.housekeeper_tags import SequencingFileTag +from cg.meta.demultiplex import demux_post_processing from cg.meta.demultiplex.demux_post_processing import ( DemuxPostProcessingAPI, DemuxPostProcessingHiseqXAPI, @@ -517,3 +521,257 @@ def test_is_not_bcl2fastq_folder_structure( # THEN it should not be a bcl2fastq folder structure assert is_bcl2fastq_folder_structure is False + + +def test_add_flow_cell_data_to_housekeeper(demultiplex_context: CGConfig): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + demux_post_processing_api.add_bundle_and_version_if_non_existent = MagicMock() + demux_post_processing_api.add_tags_if_non_existent = MagicMock() + demux_post_processing_api.add_sample_sheet = MagicMock() + demux_post_processing_api.add_sample_fastq_files = MagicMock() + + flow_cell_name: str = "flow_cell_name" + flow_cell_directory: Path = Path("some/path/to/flow/cell/directory") + + # WHEN the flow cell data is added to housekeeper + demux_post_processing_api.add_flow_cell_data_to_housekeeper( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + # THEN the bundle and version is added + demux_post_processing_api.add_bundle_and_version_if_non_existent.assert_called_once_with( + flow_cell_name=flow_cell_name + ) + + # THEN the correct tags are added + demux_post_processing_api.add_tags_if_non_existent.assert_called_once_with( + tag_names=[SequencingFileTag.FASTQ, SequencingFileTag.SAMPLE_SHEET, flow_cell_name] + ) + + # THEN the sample sheet is added + demux_post_processing_api.add_sample_sheet.assert_called_once_with( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + # THEN the fastq files are added + demux_post_processing_api.add_sample_fastq_files.assert_called_once_with( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + +def test_add_bundle_and_version_if_non_existent(demultiplex_context: CGConfig): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + demux_post_processing_api.hk_api.bundle = MagicMock(return_value=None) + demux_post_processing_api.hk_api.create_new_bundle_and_version = MagicMock() + + # WHEN adding a bundle and version which does not exist + flow_cell_name: str = "flow_cell_name" + demux_post_processing_api.add_bundle_and_version_if_non_existent(flow_cell_name=flow_cell_name) + + # THEN that the expected methods were called with the expected arguments + demux_post_processing_api.hk_api.bundle.assert_called_once_with(name=flow_cell_name) + demux_post_processing_api.hk_api.create_new_bundle_and_version.assert_called_once_with( + name=flow_cell_name + ) + + +def test_add_bundle_and_version_if_already_exists(demultiplex_context: CGConfig): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + mock_bundle = MagicMock() + demux_post_processing_api.hk_api.bundle = MagicMock(return_value=mock_bundle) + demux_post_processing_api.hk_api.create_new_bundle_and_version = MagicMock() + + # WHEN adding a bundle and version which already exists + flow_cell_name: str = "flow_cell_name" + demux_post_processing_api.add_bundle_and_version_if_non_existent(flow_cell_name=flow_cell_name) + + # THEN the bundle was retrieved + demux_post_processing_api.hk_api.bundle.assert_called_once_with(name=flow_cell_name) + + # THEN a new bundle and version was not created + demux_post_processing_api.hk_api.create_new_bundle_and_version.assert_not_called() + + +def test_add_tags_if_non_existent(demultiplex_context: CGConfig): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + # GIVEN that the tags do not exist + demux_post_processing_api.hk_api.get_tag = MagicMock(return_value=None) + demux_post_processing_api.hk_api.add_tag = MagicMock() + + # WHEN adding new tags + tag_names = ["tag1", "tag2"] + demux_post_processing_api.add_tags_if_non_existent(tag_names=tag_names) + + # THEN the expected housekeeper API methods were called to create the tags + demux_post_processing_api.hk_api.get_tag.assert_has_calls( + [call(name="tag1"), call(name="tag2")] + ) + demux_post_processing_api.hk_api.add_tag.assert_has_calls( + [call(name="tag1"), call(name="tag2")] + ) + + +def test_add_tags_if_all_exist(demultiplex_context: CGConfig): + # Given a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + # Mock the methods in the housekeeper API + demux_post_processing_api.hk_api.get_tag = MagicMock(return_value=MagicMock()) + demux_post_processing_api.hk_api.add_tag = MagicMock() + + # Call the add_tags_if_non_existent method with two tag names + tag_names = ["tag1", "tag2"] + demux_post_processing_api.add_tags_if_non_existent(tag_names=tag_names) + + # Assert that the expected methods were called with the expected arguments + demux_post_processing_api.hk_api.get_tag.assert_has_calls( + [call(name="tag1"), call(name="tag2")] + ) + demux_post_processing_api.hk_api.add_tag.assert_not_called() + + +def test_add_sample_sheet(demultiplex_context: CGConfig, tmpdir_factory): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + demux_post_processing_api.add_file_if_non_existent = MagicMock() + + # GIVEN a flow cell directory and name + flow_cell_directory: Path = Path(tmpdir_factory.mktemp("flow_cell_directory")) + flow_cell_name = "flow_cell_name" + + # WHEN a sample sheet is added + demux_post_processing_api.add_sample_sheet( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + # THEN add_file_if_non_existent was called with expected arguments + expected_file_path = Path( + flow_cell_directory, DemultiplexingDirsAndFiles.SAMPLE_SHEET_FILE_NAME + ) + expected_tag_names = [SequencingFileTag.SAMPLE_SHEET, flow_cell_name] + + demux_post_processing_api.add_file_if_non_existent.assert_called_once_with( + file_path=expected_file_path, + flow_cell_name=flow_cell_name, + tag_names=expected_tag_names, + ) + + +def test_add_fastq_files_with_sample_id(demultiplex_context: CGConfig, tmpdir_factory): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + demux_post_processing_api.get_sample_fastq_file_paths = MagicMock() + demux_post_processing_api.get_sample_id_from_sample_fastq_file_path = MagicMock() + demux_post_processing_api.add_file_if_non_existent = MagicMock() + + mock_fastq_paths = [ + Path(tmpdir_factory.mktemp("first_file.fastq.gz")), + Path(tmpdir_factory.mktemp("second_file.fastq.gz")), + ] + demux_post_processing_api.get_sample_fastq_file_paths.return_value = mock_fastq_paths + + sample_id = "sample1" + demux_post_processing_api.get_sample_id_from_sample_fastq_file_path.return_value = sample_id + + # GIVEN a flow cell directory and name + flow_cell_directory: Path = Path(tmpdir_factory.mktemp("flow_cell_directory")) + flow_cell_name = "flow_cell_name" + + # WHEN add_fastq_files is called + demux_post_processing_api.add_sample_fastq_files( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + # THEN add_file_if_non_existent was called with expected arguments for each file + expected_calls = [ + call( + file_path=file_path, + flow_cell_name=flow_cell_name, + tag_names=[SequencingFileTag.FASTQ, sample_id], + ) + for file_path in mock_fastq_paths + ] + + demux_post_processing_api.add_file_if_non_existent.assert_has_calls(expected_calls) + + +def test_add_fastq_files_without_sample_id(demultiplex_context: CGConfig, tmpdir_factory): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + demux_post_processing_api.get_sample_id_from_sample_fastq_file_path = MagicMock() + demux_post_processing_api.get_sample_id_from_sample_fastq_file_path.return_value = None + + demux_post_processing_api.add_file_if_non_existent = MagicMock() + + flow_cell_directory: Path = Path(tmpdir_factory.mktemp("flow_cell_directory")) + flow_cell_name = "flow_cell_name" + + # WHEN add_fastq_files is called + demux_post_processing_api.add_sample_fastq_files( + flow_cell_directory=flow_cell_directory, flow_cell_name=flow_cell_name + ) + + # THEN add_file_if_non_existent was not called + demux_post_processing_api.add_file_if_non_existent.assert_not_called() + + +def test_is_valid_sample_fastq_filename(demultiplex_context: CGConfig): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + # WHEN checking a filename containing "Undetermined" + file_name = "Undetermined_file.fastq" + assert not demux_post_processing_api.is_valid_sample_fastq_filename(file_name) + + # WHEN checking a valid filename + file_name = "valid_file.fastq" + assert demux_post_processing_api.is_valid_sample_fastq_filename(file_name) + + +def test_get_sample_fastq_file_paths(demultiplex_context: CGConfig, tmpdir_factory): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + # GIVEN some files in temporary directory + tmp_dir = Path(tmpdir_factory.mktemp("data")) + valid_file = tmp_dir / "file.fastq.gz" + invalid_file = tmp_dir / "Undetermined_file.fastq.gz" + valid_file.touch() + invalid_file.touch() + + # WHEN we get sample fastq file paths + result = demux_post_processing_api.get_sample_fastq_file_paths(tmp_dir) + + # THEN we should only get the valid file + assert len(result) == 1 + assert valid_file in result + assert invalid_file not in result + + +def test_get_sample_id_from_sample_fastq_file_path(demultiplex_context: CGConfig, tmpdir_factory): + # GIVEN a DemuxPostProcessing API + demux_post_processing_api = DemuxPostProcessingAPI(demultiplex_context) + + # GIVEN a sample directory and file + tmp_dir = Path(tmpdir_factory.mktemp("flow_cell_directory")) + sample_id = "sampleid" + sample_dir = tmp_dir / f"prefix_{sample_id}" + sample_dir.mkdir() + sample_file = sample_dir / "file.fastq.gz" + sample_file.touch() + + # WHEN we get sample id from sample fastq file path + result = demux_post_processing_api.get_sample_id_from_sample_fastq_file_path(sample_file) + + # THEN we should get the correct sample id + assert result == sample_id