diff --git a/covigator/__init__.py b/covigator/__init__.py index 159a5729..86da1bff 100644 --- a/covigator/__init__.py +++ b/covigator/__init__.py @@ -1,5 +1,5 @@ -VERSION = "v0.9.0" -ANALYSIS_PIPELINE_VERSION = "v0.13.0" +VERSION = "v1.0.0.dev1" +ANALYSIS_PIPELINE_VERSION = "v0.14.0" MISSENSE_VARIANT = "missense_variant" SYNONYMOUS_VARIANT = "synonymous_variant" diff --git a/covigator/accessor/abstract_accessor.py b/covigator/accessor/abstract_accessor.py new file mode 100644 index 00000000..7f4337c0 --- /dev/null +++ b/covigator/accessor/abstract_accessor.py @@ -0,0 +1,47 @@ +from datetime import date, datetime +from typing import Union +import requests +from covigator.misc.country_parser import CountryParser +from covigator.database.model import SampleEna +from covigator.accessor import MINIMUM_DATE +from covigator.exceptions import CovigatorExcludedSampleTooEarlyDateException +from covigator.misc import backoff_retrier + +NUMBER_RETRIES = 5 + + +class SampleCovid19: + pass + + +class AbstractAccessor: + + def __init__(self): + self.country_parser = CountryParser() + + # this ensures there is a retry mechanism in place with a limited number of retries + self.session = requests.session() + self.get_with_retries = backoff_retrier.wrapper(self.session.get, NUMBER_RETRIES) + + def _parse_country(self, sample: Union[SampleEna, SampleCovid19]): + parsed_country = self.country_parser.parse_country( + sample.country.split(":")[0] if sample.country else "") + sample.country_raw = sample.country + sample.country = parsed_country.country + sample.country_alpha_2 = parsed_country.country_alpha_2 + sample.country_alpha_3 = parsed_country.country_alpha_3 + sample.continent_alpha_2 = parsed_country.continent_alpha_2 + sample.continent = parsed_country.continent + + def _parse_dates(self, sample: Union[SampleEna, SampleCovid19]): + sample.collection_date = self._parse_abstract(sample.collection_date, date.fromisoformat) + sample.first_created = self._parse_abstract(sample.first_created, date.fromisoformat) + if sample.collection_date is not None and sample.collection_date < MINIMUM_DATE: + raise CovigatorExcludedSampleTooEarlyDateException + + def _parse_abstract(self, value, type): + try: + value = type(value) + except (ValueError, TypeError): + value = None + return value diff --git a/covigator/accessor/covid19_portal_accessor.py b/covigator/accessor/covid19_portal_accessor.py new file mode 100644 index 00000000..fd6c4234 --- /dev/null +++ b/covigator/accessor/covid19_portal_accessor.py @@ -0,0 +1,306 @@ +import gzip +import os +import pathlib +import shutil +from datetime import datetime +from io import StringIO +from json import JSONDecodeError +from Bio import SeqIO +from covigator.accessor import MINIMUM_DATE +from covigator.configuration import Configuration +from requests import Response +from sqlalchemy.orm import Session +import covigator +from covigator.accessor.abstract_accessor import AbstractAccessor, SampleCovid19 +from covigator.exceptions import CovigatorExcludedSampleTooEarlyDateException, CovigatorExcludedFailedDownload, \ + CovigatorExcludedTooManyEntries, CovigatorExcludedEmptySequence, CovigatorExcludedHorizontalCoverage, \ + CovigatorExcludedBadBases, CovigatorExcludedSampleException, CovigatorExcludedMissingDateException +from covigator.database.model import DataSource, Log, CovigatorModule, SampleCovid19Portal, JobStatus +from covigator.database.database import Database +from logzero import logger + +NUMBER_RETRIES = 5 +BATCH_SIZE = 1000 +THRESHOLD_NON_VALID_BASES_RATIO = 0.2 +THRESHOLD_GENOME_COVERAGE = 0.2 +GENOME_LENGTH = 29903 + + +class Covid19PortalAccessor(AbstractAccessor): + + API_URL_BASE = "https://www.covid19dataportal.org/api/backend/viral-sequences/sequences" + FASTA_URL_BASE = "https://www.ebi.ac.uk/ena/browser/api/fasta" + # while ENA API is in principle organism agnostic, this is SARS-CoV-2 specific, thus taxon is hard-coded + HOST = "Homo sapiens" + TAX_ID = "2697049" + + def __init__(self, database: Database, storage_folder): + super().__init__() + logger.info("Initialising Covid19 portal accessor") + self.start_time = datetime.now() + self.has_error = False + self.error_message = None + self.storage_folder = storage_folder + + self.database = database + assert self.database is not None, "Empty database" + + self.excluded_samples_by_host_tax_id = {} + self.excluded_samples_by_tax_id = {} + self.excluded_existing = 0 + self.included = 0 + self.excluded = 0 + self.excluded_by_date = 0 + self.excluded_missing_date = 0 + self.excluded_failed_download = 0 + self.excluded_empty_sequence = 0 + self.excluded_too_many_entries = 0 + self.excluded_horizontal_coverage = 0 + self.excluded_bad_bases = 0 + + def access(self): + logger.info("Starting Covid19 portal accessor") + session = self.database.get_database_session() + # NOTE: holding in memory the whole list of existing ids is much faster than querying every time + # it assumes there will be no repetitions + existing_sample_ids = [value for value, in session.query(SampleCovid19Portal.run_accession).all()] + try: + logger.info("Reading...") + page = 1 # it has to start with 1 + list_runs = self._get_page(page=page, size=BATCH_SIZE) + num_entries = len(list_runs.get('entries')) + count = num_entries + # gets total expected number of results from first page + logger.info("Processing {} Covid19 Portal samples...".format(num_entries)) + self._process_runs(list_runs, existing_sample_ids, session) + + while True: + page += 1 + list_runs = self._get_page(page=page, size=BATCH_SIZE) + entries = list_runs.get('entries') + if entries is None or entries == []: + break + num_entries = len(entries) + count += num_entries + self._process_runs(list_runs, existing_sample_ids, session) + logger.info("Processed {} of Covid19 Portal samples...".format(count)) + + logger.info("All samples processed!") + except Exception as e: + logger.exception(e) + session.rollback() + self.has_error = True + self.error_message = str(e) + finally: + self._write_execution_log(session) + session.close() + self._log_results() + logger.info("Finished Covid19 Portal accessor") + + def _get_page(self, page, size) -> dict: + # as communicated by ENA support we use limit=0 and offset=0 to get all records in one query + response: Response = self.get_with_retries( + "{url_base}?page={page}&size={size}".format(url_base=self.API_URL_BASE, page=page, size=size)) + try: + json = response.json() + except JSONDecodeError as e: + logger.exception(e) + raise e + return json + + def _process_runs(self, list_samples, existing_sample_ids, session: Session): + + included_samples = [] + for sample_dict in list_samples.get('entries'): + if isinstance(sample_dict, dict): + if sample_dict.get("acc") in existing_sample_ids: + self.excluded_existing += 1 + continue # skips runs already registered in the database + if not self._complies_with_inclusion_criteria(sample_dict): + continue # skips runs not complying with inclusion criteria + # NOTE: this parse operation is costly + try: + # parses sample into DB model + sample = self._parse_covid19_portal_sample(sample_dict) + # downloads FASTA file + sample = self._download_fasta(sample=sample) + self.included += 1 + included_samples.append(sample) + except CovigatorExcludedSampleTooEarlyDateException: + self.excluded_by_date += 1 + self.excluded += 1 + except CovigatorExcludedMissingDateException: + self.excluded_missing_date += 1 + self.excluded += 1 + except CovigatorExcludedFailedDownload: + self.excluded_failed_download += 1 + self.excluded += 1 + except CovigatorExcludedEmptySequence: + self.excluded_empty_sequence += 1 + self.excluded += 1 + except CovigatorExcludedTooManyEntries: + self.excluded_too_many_entries += 1 + self.excluded += 1 + except CovigatorExcludedHorizontalCoverage: + self.excluded_horizontal_coverage += 1 + self.excluded += 1 + except CovigatorExcludedBadBases: + self.excluded_bad_bases += 1 + self.excluded += 1 + except CovigatorExcludedSampleException: + self.excluded += 1 + else: + logger.error("Sample without the expected format") + + if len(included_samples) > 0: + session.add_all(included_samples) + session.commit() + + def _parse_covid19_portal_sample(self, sample: dict) -> SampleCovid19Portal: + sample = SampleCovid19Portal( + run_accession=sample.get('acc'), + first_created=next(iter(sample.get('fields').get('creation_date')), None), + collection_date=next(iter(sample.get('fields').get('collection_date')), None), + last_modification_date=next(iter(sample.get('fields').get('last_modification_date')), None), + center_name=next(iter(sample.get('fields').get('center_name')), None), + isolate=next(iter(sample.get('fields').get('isolate')), None), + molecule_type=next(iter(sample.get('fields').get('molecule_type')), None), + country=next(iter(sample.get('fields').get('country')), None), + # build FASTA URL here + fasta_url="{base}/{acc}".format(base=self.FASTA_URL_BASE, acc=sample.get('acc')) + ) + self._parse_country(sample) + self._parse_dates(sample) + sample.covigator_accessor_version = covigator.VERSION + return sample + + def _complies_with_inclusion_criteria(self, sample: dict): + # NOTE: this uses the original dictionary instead of the parsed SampleEna class for performance reasons + included = True + + host = next(iter(sample.get('fields').get('host')), None) + if host is None or host.strip() == "" or host != self.HOST: + included = False # skips runs where the host is empty or does not match + self.excluded_samples_by_host_tax_id[str(host)] = \ + self.excluded_samples_by_host_tax_id.get(str(host), 0) + 1 + + taxon = next(iter(sample.get('fields').get('TAXON')), None) + if taxon is None or taxon.strip() == "" or taxon != self.TAX_ID: + included = False # skips runs where the host is empty or does not match + self.excluded_samples_by_tax_id[str(taxon)] = \ + self.excluded_samples_by_tax_id.get(str(taxon), 0) + 1 + + if not included: + self.excluded += 1 + return included + + def _log_results(self): + logger.info("Included new runs = {}".format(self.included)) + logger.info("Excluded already existing samples = {}".format(self.excluded_existing)) + logger.info("Total excluded runs by selection criteria = {}".format(self.excluded)) + logger.info("Excluded by host = {}".format(self.excluded_samples_by_host_tax_id)) + logger.info("Excluded by host = {}".format(self.excluded_samples_by_tax_id)) + logger.info("Excluded failed download = {}".format(self.excluded_failed_download)) + + def _write_execution_log(self, session: Session): + end_time = datetime.now() + session.add(Log( + start=self.start_time, + end=end_time, + source=DataSource.COVID19_PORTAL, + module=CovigatorModule.ACCESSOR, + has_error=self.has_error, + error_message=self.error_message, + processed=self.included, + data={ + "included": self.included, + "excluded": { + "existing": self.excluded_existing, + "excluded_by_criteria": self.excluded, + "excluded_early_date": self.excluded_by_date, + "excluded_missing_date": self.excluded_missing_date, + "excluded_failed_download": self.excluded_failed_download, + "excluded_bad_bases": self.excluded_bad_bases, + "excluded_horizontal_coverage": self.excluded_horizontal_coverage, + "excluded_too_many_entries": self.excluded_too_many_entries, + "excluded_empty_sequence": self.excluded_empty_sequence, + "host": self.excluded_samples_by_host_tax_id, + "taxon": self.excluded_samples_by_tax_id + } + } + )) + session.commit() + + def _download_fasta(self, sample: SampleCovid19Portal) -> SampleCovid19Portal: + + local_filename = sample.fasta_url.split('/')[-1] + ".fasta" # URL comes without extension + local_folder = sample.get_sample_folder(self.storage_folder) + local_full_path = os.path.join(local_folder, local_filename) + + # avoids downloading the same files over and over + if not os.path.exists(local_full_path): + pathlib.Path(local_folder).mkdir(parents=True, exist_ok=True) + try: + fasta_str = self.get_with_retries(sample.fasta_url).content.decode('utf-8') + fasta_io = StringIO(fasta_str) + records = list(SeqIO.parse(fasta_io, "fasta")) + except Exception as e: + raise CovigatorExcludedFailedDownload(e) + else: + records = list(SeqIO.parse(open(local_full_path, "r"), "fasta")) + + # checks the validity of the FASTA sequence + if len(records) == 0: + raise CovigatorExcludedEmptySequence() + if len(records) > 1: + raise CovigatorExcludedTooManyEntries() + record = records[0] + sequence_length = len(record.seq) + if float(sequence_length) / GENOME_LENGTH < THRESHOLD_GENOME_COVERAGE: + raise CovigatorExcludedHorizontalCoverage() + count_n_bases = record.seq.count("N") + count_ambiguous_bases = sum([record.seq.count(b) for b in "RYWSMKHBVD"]) + if float(count_n_bases + count_ambiguous_bases) / sequence_length > THRESHOLD_NON_VALID_BASES_RATIO: + raise CovigatorExcludedBadBases() + + # compress and writes the file after all checks + if not os.path.exists(local_full_path): + with open(local_full_path, "w") as f: + SeqIO.write(sequences=records, handle=f, format="fasta") + + # stores the reference to the file in the DB and metadata about the sequence + sample.fasta_path = local_full_path + sample.sequence_length = sequence_length + sample.count_n_bases = count_n_bases + sample.count_ambiguous_bases = count_ambiguous_bases + sample.status = JobStatus.DOWNLOADED + + return sample + + def _compress_file(self, uncompressed_file, compressed_file): + with open(uncompressed_file, 'rb') as f_in: + with gzip.open(compressed_file, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(uncompressed_file) + + def _parse_dates(self, sample: SampleCovid19): + format = "%Y%m%d" + try: + sample.collection_date = datetime.strptime(sample.collection_date, format).date() + except Exception: + sample.collection_date = None + try: + sample.first_created = datetime.strptime(sample.first_created, format).date() + except Exception: + sample.first_created = None + + if sample.collection_date is None: + raise CovigatorExcludedMissingDateException() + if sample.collection_date is not None and sample.collection_date < MINIMUM_DATE: + raise CovigatorExcludedSampleTooEarlyDateException() + + +if __name__ == '__main__': + config = Configuration() + covigator.configuration.initialise_logs(config.logfile_accesor) + Covid19PortalAccessor(database=Database(config=config, initialize=True), storage_folder=config.storage_folder).access() \ No newline at end of file diff --git a/covigator/accessor/ena_accessor.py b/covigator/accessor/ena_accessor.py old mode 100644 new mode 100755 index bfc72b9f..2aebf4d6 --- a/covigator/accessor/ena_accessor.py +++ b/covigator/accessor/ena_accessor.py @@ -1,30 +1,25 @@ -from datetime import date, datetime +from datetime import datetime from json import JSONDecodeError - -import requests from requests import Response from sqlalchemy.orm import Session - import covigator -from covigator.accessor import MINIMUM_DATE +from covigator.accessor.abstract_accessor import AbstractAccessor from covigator.exceptions import CovigatorExcludedSampleTooEarlyDateException -from covigator.misc import backoff_retrier from covigator.database.model import SampleEna, DataSource, Log, CovigatorModule from covigator.database.database import Database from logzero import logger -from covigator.misc.country_parser import CountryParser -NUMBER_RETRIES = 5 BATCH_SIZE = 1000 -class EnaAccessor: +class EnaAccessor(AbstractAccessor): ENA_API_URL_BASE = "https://www.ebi.ac.uk/ena/portal/api" # see https://www.ebi.ac.uk/ena/portal/api/returnFields?result=read_run&format=json for all possible fields ENA_FIELDS = [ # data on run "scientific_name", + "sample_accession", "study_accession", "experiment_accession", "first_created", @@ -64,6 +59,8 @@ class EnaAccessor: ] def __init__(self, tax_id: str, host_tax_id: str, database: Database, maximum=None): + + super().__init__() logger.info("Initialising ENA accessor") self.start_time = datetime.now() self.has_error = False @@ -86,10 +83,6 @@ def __init__(self, tax_id: str, host_tax_id: str, database: Database, maximum=No self.included = 0 self.excluded = 0 self.excluded_by_date = 0 - self.country_parser = CountryParser() - - # this ensures there is a retry mechanism in place with a limited number of retries - self.get_with_retries = backoff_retrier.wrapper(requests.get, NUMBER_RETRIES) def access(self): logger.info("Starting ENA accessor") @@ -163,22 +156,6 @@ def _process_runs(self, list_runs, existing_sample_ids, session: Session): logger.info("Added {} new ENA samples".format(len(included_samples))) logger.info("Processed {} ENA samples".format(len(list_runs))) - def _parse_country(self, sample: SampleEna): - parsed_country = self.country_parser.parse_country( - sample.country.split(":")[0] if sample.country else "") - sample.country_raw = sample.country - sample.country = parsed_country.country - sample.country_alpha_2 = parsed_country.country_alpha_2 - sample.country_alpha_3 = parsed_country.country_alpha_3 - sample.continent_alpha_2 = parsed_country.continent_alpha_2 - sample.continent = parsed_country.continent - - def _parse_dates(self, ena_run): - ena_run.collection_date = self._parse_abstract(ena_run.collection_date, date.fromisoformat) - ena_run.first_created = self._parse_abstract(ena_run.first_created, date.fromisoformat) - if ena_run.collection_date is not None and ena_run.collection_date < MINIMUM_DATE: - raise CovigatorExcludedSampleTooEarlyDateException - def _parse_ena_run(self, run): sample = SampleEna(**run) self._parse_country(sample) @@ -197,13 +174,6 @@ def _parse_numeric_fields(self, ena_run): ena_run.lat = self._parse_abstract(ena_run.lat, float) ena_run.lon = self._parse_abstract(ena_run.lon, float) - def _parse_abstract(self, value, type): - try: - value = type(value) - except (ValueError, TypeError): - value = None - return value - def _complies_with_inclusion_criteria(self, ena_run: dict): # NOTE: this uses the original dictionary instead of the parsed SampleEna class for performance reasons included = True diff --git a/covigator/command_line.py b/covigator/command_line.py index c538d43d..4dc3c679 100644 --- a/covigator/command_line.py +++ b/covigator/command_line.py @@ -1,5 +1,6 @@ from argparse import ArgumentParser +from covigator.accessor.covid19_portal_accessor import Covid19PortalAccessor from covigator.database.model import DataSource from covigator.precomputations.load_cooccurrences import CooccurrenceMatrixLoader @@ -12,6 +13,7 @@ from covigator.configuration import Configuration from covigator.database.database import Database from covigator.pipeline.ena_pipeline import Pipeline +from covigator.processor.covid19portal_processor import Covid19PortalProcessor from covigator.processor.ena_downloader import EnaDownloader from covigator.processor.ena_processor import EnaProcessor from logzero import logger @@ -41,9 +43,24 @@ def ena_accessor(): EnaAccessor(tax_id=tax_id, host_tax_id=host_tax_id, database=Database(config=config, initialize=True)).access() +def covid19_portal_accessor(): + parser = ArgumentParser( + description="Covigator {} CoVid19 portal accessor".format(covigator.VERSION)) + + config = Configuration() + covigator.configuration.initialise_logs(config.logfile_accesor) + Covid19PortalAccessor(database=Database(config=config, initialize=True), storage_folder=config.storage_folder).access() + + def processor(): parser = ArgumentParser( description="Covigator {} processor".format(covigator.VERSION)) + parser.add_argument( + "--source", + dest="data_source", + help="Specify data source. This can be either ENA or COVID19_PORTAL", + required=True + ) parser.add_argument( "--num-jobs", dest="num_jobs", @@ -64,25 +81,20 @@ def processor(): help="number of CPUs to be used by the processor when running locally", default=1 ) - parser.add_argument( - "--download", - dest="download", - help="if set it tries to download ENA FASTQs if they are not already in place.", - action='store_true', - default=False - ) args = parser.parse_args() config = Configuration() covigator.configuration.initialise_logs(config.logfile_processor) if args.local: - _start_dask_processor(args, config, args.download, num_local_cpus=int(args.num_local_cpus)) + logger.info("Local processing") + _start_dask_processor(args, config, num_local_cpus=int(args.num_local_cpus)) else: + logger.info("Processing in Slurm cluster") with SLURMCluster( walltime='72:00:00', # hard codes maximum time to 72 hours scheduler_options={"dashboard_address": ':{}'.format(config.dask_port)}) as cluster: cluster.scale(jobs=int(args.num_jobs)) - _start_dask_processor(args, config, args.download, cluster=cluster) + _start_dask_processor(args, config, cluster=cluster) def ena_downloader(): @@ -94,11 +106,21 @@ def ena_downloader(): EnaDownloader(database=Database(config=config, initialize=True), config=config).process() -def _start_dask_processor(args, config, download, cluster=None, num_local_cpus=1): +def _start_dask_processor(args, config, cluster=None, num_local_cpus=1): with Client(cluster) if cluster is not None else Client(n_workers=num_local_cpus, threads_per_worker=1) as client: - EnaProcessor(database=Database(initialize=True, config=config), - dask_client=client, config=config, download=download) \ - .process() + # NOTE: the comparison with DataSource.ENA.name fails for some reason... + if args.data_source == "ENA": + EnaProcessor( + database=Database(initialize=True, config=config), + dask_client=client, config=config) \ + .process() + elif args.data_source == "COVID19_PORTAL": + Covid19PortalProcessor( + database=Database(initialize=True, config=config), + dask_client=client, config=config) \ + .process() + else: + logger.error("Unknown data source. Please choose either ENA or COVID19_PORTAL") def pipeline(): diff --git a/covigator/dashboard/assets/CV19DP_logo_oneliner2.svg b/covigator/dashboard/assets/CV19DP_logo_oneliner2.svg new file mode 100644 index 00000000..0d11ca51 --- /dev/null +++ b/covigator/dashboard/assets/CV19DP_logo_oneliner2.svg @@ -0,0 +1,425 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/covigator/dashboard/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.png b/covigator/dashboard/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.png new file mode 100644 index 00000000..de89d27b Binary files /dev/null and b/covigator/dashboard/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.png differ diff --git a/covigator/dashboard/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.xcf b/covigator/dashboard/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.xcf new file mode 100644 index 00000000..648b2b3c Binary files /dev/null and b/covigator/dashboard/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.xcf differ diff --git a/covigator/dashboard/dashboard.py b/covigator/dashboard/dashboard.py index f4a4d3d7..81350561 100644 --- a/covigator/dashboard/dashboard.py +++ b/covigator/dashboard/dashboard.py @@ -12,6 +12,7 @@ from covigator.configuration import Configuration from covigator.dashboard.tabs.acknowledgements import get_tab_acknowledgements from covigator.dashboard.tabs.dataset_ena import get_tab_dataset_ena +from covigator.dashboard.tabs.dataset_covid19_portal import get_tab_dataset_covid19_portal from covigator.dashboard.tabs.download import set_callbacks_download_tab, get_tab_download from covigator.dashboard.tabs.footer import get_footer from covigator.dashboard.tabs.lineages import set_callbacks_lineages_tab, get_tab_lineages @@ -36,6 +37,7 @@ SAMPLES_TAB_ID = "samples" LINEAGES_TAB_ID = "lineages" MUTATIONS_TAB_ID = "mutation-stats" +COVID19_PORTAL_DATASET_TAB_ID = "covid19-portal-dataset" ENA_DATASET_TAB_ID = "ena-dataset" OVERVIEW_TAB_ID = "overview" @@ -100,6 +102,9 @@ def serve_layout(self): dbc.DropdownMenuItem( "ENA dataset", href="/ena", style={'font-size' : '150%', "color": "#003c78"}), + dbc.DropdownMenuItem( + "Covid19 Data Portal sequences dataset", href="/covid19-portal", + style={'font-size': '150%', "color": "#003c78"}), dbc.DropdownMenuItem( "Documentation", href="https://covigator.readthedocs.io/en/latest", target="_blank", @@ -200,11 +205,14 @@ def set_callbacks(app, session: Session, content_folder): MAIN_PAGE = "main" ENA_PAGE = DataSource.ENA + COVID19_PORTAL_PAGE = DataSource.COVID19_PORTAL ACKNOWLEDGEMENTS_PAGE = "acknowledgements" def _get_page(url): if url in ["", "/"]: return MAIN_PAGE + elif url == "/covid19-portal": + return COVID19_PORTAL_PAGE elif url == "/ena": return ENA_PAGE elif url == "/acknowledgements": @@ -221,6 +229,15 @@ def switch_page(url): if page == MAIN_PAGE: # show overview with links return None, None + elif page == COVID19_PORTAL_PAGE: + # show gisaid tabs + return [ + dbc.Tab(label="Overview", tab_id=COVID19_PORTAL_DATASET_TAB_ID, label_style=TAB_STYLE), + dbc.Tab(label="Samples", tab_id=SAMPLES_TAB_ID, label_style=TAB_STYLE), + dbc.Tab(label="Lineages", tab_id=LINEAGES_TAB_ID, label_style=TAB_STYLE), + dbc.Tab(label="Mutation statistics", tab_id=MUTATIONS_TAB_ID, label_style=TAB_STYLE), + dbc.Tab(label="Recurrent mutations", tab_id=RECURRENT_MUTATIONS_TAB_ID, label_style=TAB_STYLE), + ], COVID19_PORTAL_DATASET_TAB_ID elif page == ENA_PAGE: # show ena tabs return [ @@ -236,6 +253,18 @@ def switch_page(url): return [ dbc.Tab(label="Acknowledgements", tab_id=HELP_TAB_ID, label_style={"color": "#003c78", 'display': 'none'})], HELP_TAB_ID + @app.callback( + Output('logo', "children"), + [Input("url", "pathname")]) + def switch_logo(url): + page = _get_page(url) + if page == MAIN_PAGE or page == ACKNOWLEDGEMENTS_PAGE: + return html.A(html.Img(src="/assets/CoVigator_logo_txt_reg_no_bg.png", height="80px"), href="/") + elif page == COVID19_PORTAL_PAGE: + return html.A(html.Img(src="/assets/CoVigator_logo_txt_reg_no_bg_covid19_portal.png", height="80px"), href="/") + elif page == ENA_PAGE: + return html.A(html.Img(src="/assets/CoVigator_logo_ENA.png", height="80px"), href="/") + @app.callback( Output('top-right-logo', "children"), [Input("url", "pathname")]) @@ -243,6 +272,12 @@ def switch_lat_update(url): page = _get_page(url) if page == MAIN_PAGE: return None + elif page == COVID19_PORTAL_PAGE: + return dbc.Button( + "last updated {date}".format(date=queries.get_last_update(DataSource.COVID19_PORTAL)), + outline=True, color="dark", className="me-1", + # 'background-color': '#b71300', + style={"margin-right": "15px", 'font-size': '85%'}) elif page == ENA_PAGE: return dbc.Button( "last updated {date}".format(date=queries.get_last_update(DataSource.ENA)), @@ -259,6 +294,8 @@ def switch_tab(at, url): return get_tab_overview() elif at == ENA_DATASET_TAB_ID: return get_tab_dataset_ena(queries=queries) + elif at == COVID19_PORTAL_DATASET_TAB_ID: + return get_tab_dataset_covid19_portal(queries=queries) elif at == SAMPLES_TAB_ID: return get_tab_samples(queries=queries, data_source=page) elif at == MUTATIONS_TAB_ID: diff --git a/covigator/dashboard/tabs/acknowledgements.py b/covigator/dashboard/tabs/acknowledgements.py index f67ec7e9..cde90311 100644 --- a/covigator/dashboard/tabs/acknowledgements.py +++ b/covigator/dashboard/tabs/acknowledgements.py @@ -43,9 +43,8 @@ def get_tab_acknowledgements(): ), html.Br(), - html.P(html.B("ENA Data")), html.P([ - """We gratefully acknowledge the European Nucleotide Archive""", + """We gratefully acknowledge the European Nucleotide Archive, the Covid19 Data portal""", html.Sup("2"), " and all data contributors for sharing the raw reads on which this research is based."]), html.P([ diff --git a/covigator/dashboard/tabs/dataset_covid19_portal.py b/covigator/dashboard/tabs/dataset_covid19_portal.py new file mode 100644 index 00000000..40900011 --- /dev/null +++ b/covigator/dashboard/tabs/dataset_covid19_portal.py @@ -0,0 +1,157 @@ +from dash import dcc +import dash_bootstrap_components as dbc +from dash import html +import plotly + +from covigator.dashboard.figures.figures import PLOTLY_CONFIG, MARGIN, TEMPLATE +from covigator.dashboard.tabs import get_mini_container, print_number, print_date +from covigator.database.model import DataSource, SAMPLE_COVID19_PORTAL_TABLE_NAME, JobStatus +from covigator.database.queries import Queries +import pandas as pd +import plotly.express as px + + +def get_tab_dataset_covid19_portal(queries: Queries): + count_samples = queries.count_samples(source=DataSource.COVID19_PORTAL.name) + + return dbc.CardBody( + children=[ + get_covid19_portal_overview_tab_left_bar(queries, count_samples), + html.Div( + className="one column", + children=[html.Br()]), + get_covid19_portal_overview_tab_graphs(queries=queries, count_samples=count_samples) + ]) + + +def get_covid19_portal_overview_tab_graphs(queries, count_samples): + return html.Div( + className="nine columns", + children=get_dataset_covid19_portal_tab_graphs(queries, count_samples=count_samples)) + + +def get_covid19_portal_overview_tab_left_bar(queries: Queries, count_samples): + count_variants = queries.count_variants(source=DataSource.COVID19_PORTAL.name) + count_variant_observations = queries.count_variant_observations(source=DataSource.COVID19_PORTAL.name) + date_of_first_gisaid_sample = queries.get_date_of_first_sample(source=DataSource.COVID19_PORTAL) + date_of_most_recent_gisaid_sample = queries.get_date_of_most_recent_sample(source=DataSource.COVID19_PORTAL) + + return html.Div( + className="two columns", + children=[ + html.Br(), + dcc.Markdown(""" + The Covid19 Data Portal (https://www.covid19dataportal.org/) provides among other things DNA assemblies, + geographical information and other metadata about SARS-CoV-2 samples. + The processing pipeline runs alignment to the reference genome (bioypthon), + variant calling (custom code), normalization (vt and BCFtools), annotation (SnpEff) + and finally lineage determination (pangolin). + """, style={"font-size": 16}), + html.Br(), + html.Div( + html.Span( + children=[ + get_mini_container( + title="Samples", + value=print_number(count_samples)), + html.Br(), + html.Br(), + get_mini_container( + title="First sample", + value=print_date(date_of_first_gisaid_sample)), + html.Br(), + html.Br(), + get_mini_container( + title="Latest sample", + value=print_date(date_of_most_recent_gisaid_sample)), + html.Br(), + html.Br(), + get_mini_container( + title="Unique mutations", + value=print_number(count_variants)), + html.Br(), + html.Br(), + get_mini_container( + title="Mutation calls", + value=print_number(count_variant_observations)), + ]) + ), + html.Br(), + dcc.Markdown(""" + There are two sample exclusion criteria: + * Horizontal coverage below 20 % of the reference genome + * A ratio of ambiguous bases greater than 0.2 + + Furthermore, all mutations involving ambiguous bases are excluded. + """, style={"font-size": 16}), + ]) + + +def get_dataset_covid19_portal_tab_graphs(queries: Queries, count_samples): + return html.Div( + children=[ + html.Br(), + html.Div(children=[ + html.Div(children=get_plot_coverage(queries), + className="six columns", style={"margin-left": 0, "margin-right": "1%", "width": "48%"}), + html.Div(children=get_plot_bad_bases_ratio(queries, count_samples), + className="six columns", style={"margin-left": 0, "margin-right": "1%", "width": "48%"}), + ]), + ]) + + +def get_plot_coverage(queries: Queries): + sql_query = """ + select count(*), (sequence_length::float / 29903 * 100)::int as coverage + from {table} + where status = '{status}' + group by coverage + """.format(table=SAMPLE_COVID19_PORTAL_TABLE_NAME, status=JobStatus.FINISHED.name) + data = pd.read_sql_query(sql_query, queries.session.bind) + fig = px.bar(data_frame=data, x="coverage", y="count", log_y=True, color="count", + color_continuous_scale=plotly.colors.sequential.Brwnyl) + fig.update_layout( + margin=MARGIN, + template=TEMPLATE, + yaxis={'title': "Num. of samples (log)"}, + xaxis={'title': "Horizontal coverage (%)"}, + legend={'title': None} + ) + return [ + dcc.Graph(figure=fig, config=PLOTLY_CONFIG), + dcc.Markdown(""" + **Horizontal coverage (%)** + + Horizontal coverage is estimated as the sequence length divided by the reference genome length. + Some samples have thus an horizontal coverage larger than 100 %. + """) + ] + + +def get_plot_bad_bases_ratio(queries: Queries, count_samples): + sql_query = """ + select count(*), ((count_n_bases + count_ambiguous_bases)::float / sequence_length * 100)::int as bad_bases_ratio + from {table} + where status = '{status}' + group by bad_bases_ratio + """.format(table=SAMPLE_COVID19_PORTAL_TABLE_NAME, status=JobStatus.FINISHED.name) + data = pd.read_sql_query(sql_query, queries.session.bind) + fig = px.bar(data_frame=data, x="bad_bases_ratio", y="count", log_y=True, color="count", + color_continuous_scale=plotly.colors.sequential.Brwnyl) + fig.update_layout( + margin=MARGIN, + template=TEMPLATE, + yaxis={'title': "Num. of samples (log)"}, + xaxis={'title': "Ratio of N and ambiguous bases (%)"}, + legend={'title': None} + ) + + return [ + dcc.Graph(figure=fig, config=PLOTLY_CONFIG), + dcc.Markdown(""" + **Ratio of N and ambiguous bases (%)** + + The ratio of N and ambiguous bases over the whole sequence length measures the quality of the assembly sequence. + {} % of samples have a ratio <= 5 %. + """.format(round(float(data[data["bad_bases_ratio"] <= 5]["count"].sum()) / count_samples), 1)) + ] \ No newline at end of file diff --git a/covigator/dashboard/tabs/download.py b/covigator/dashboard/tabs/download.py index cef584f8..3afeb256 100644 --- a/covigator/dashboard/tabs/download.py +++ b/covigator/dashboard/tabs/download.py @@ -11,14 +11,25 @@ def get_tab_download(content_folder): return dbc.CardBody( children=[ dcc.Markdown(""" - ** Download the raw CoVigator data derived from ENA** + ** Download the raw CoVigator data** + Data releases that may be behind the information shown in the dashboard. + See the README file for more details. + + ENA dataset * `variant_observation` contains the individual variant calls * `subclonal_variant_observation` contains the variant calls with a VAF < 80 % and >= 5 % * `low_frequency_variant_observation` contains the variant calls with a VAF < 5 % * `variant` contains the unique variants without any sample specific information * `variant_cooccurrence` contains the cooccurrence matrix between clonal variants * `sample_ena` contains the samples metadata and some sample level derived data (eg: pangolin, coverage analysis, etc.) + + Covid19 data portal sequences dataset + * `variant_observation_covid19portal` contains the individual variant calls + * `variant_covid19portal` contains the unique variants without any sample specific information + * `sample_covid19_portal` contains the samples metadata and some sample level derived data (eg: pangolin, coverage analysis, etc.) + + Annotations * `conservation` contains the ConsHMM conservation tracks * `gene` contains the gene annotations as provided by Ensembl * `domain` contains the Pfam protein domains diff --git a/covigator/dashboard/tabs/lineages.py b/covigator/dashboard/tabs/lineages.py index ed4a10d9..e80034b7 100644 --- a/covigator/dashboard/tabs/lineages.py +++ b/covigator/dashboard/tabs/lineages.py @@ -94,6 +94,39 @@ def set_callbacks_lineages_tab(app, session: Session): queries = Queries(session=session) figures = LineageFigures(queries) + countries_ena = queries.get_countries(DataSource.ENA.name) + countries_covid19_portal = queries.get_countries(DataSource.COVID19_PORTAL.name) + lineages_ena = queries.get_lineages(DataSource.ENA.name) + lineages_covid19_portal = queries.get_lineages(DataSource.COVID19_PORTAL.name) + + @app.callback( + Output(ID_DROPDOWN_COUNTRY, 'options'), + Input(ID_DROPDOWN_DATA_SOURCE, 'value')) + def set_countries(source): + """ + Updates the country drop down list when the data source is changed + """ + countries = [] + if source == DataSource.ENA.name: + countries = [{'label': c, 'value': c} for c in countries_ena] + elif source == DataSource.COVID19_PORTAL.name: + countries = [{'label': c, 'value': c} for c in countries_covid19_portal] + return countries + + @app.callback( + Output(ID_DROPDOWN_LINEAGE, 'options'), + Input(ID_DROPDOWN_DATA_SOURCE, 'value')) + def set_lineages(source): + """ + Updates the country drop down list when the data source is changed + """ + lineages = [] + if source == DataSource.ENA.name: + lineages = [{'label': c, 'value': c} for c in lineages_ena] + elif source == DataSource.COVID19_PORTAL.name: + lineages = [{'label': c, 'value': c} for c in lineages_covid19_portal] + return lineages + @app.callback( Output(ID_LINEAGES_GRAPH, 'children'), inputs=[Input(ID_APPLY_BUTTOM, 'n_clicks')], diff --git a/covigator/dashboard/tabs/overview.py b/covigator/dashboard/tabs/overview.py index 57a5d258..21494f9b 100644 --- a/covigator/dashboard/tabs/overview.py +++ b/covigator/dashboard/tabs/overview.py @@ -37,10 +37,13 @@ def get_tab_overview(): """), html.Br(), html.P(""" - CoVigator loads publicly available SARS-CoV-2 DNA sequences from - the European Nucleotide Archive (ENA). - ENA provides the raw reads in FASTQ format and thus enables a high resolution analysis into the - SARS-CoV-2 mutations. Intrahost mutations are of particular interest. + CoVigator loads publicly available SARS-CoV-2 raw reads (ie: FASTQs) from + the European Nucleotide Archive (ENA) and sequences from the Covid19 Data Portal. + Some samples are present in both datasets. + ENA enables a high resolution analysis into the SARS-CoV-2 mutations through the raw reads. + Intrahost mutations are of particular interest. + On the other hand, the Covid19 Data Portal sequences have a lower resolution, + but it is a more extensive dataset. """), html.Br(), dbc.CardBody( @@ -63,7 +66,27 @@ def get_tab_overview(): outline=False, style={"width": "40rem", "height": "15rem"}, ) - ) + ), + dbc.Col( + dbc.Card( + [ + dbc.CardImg(src="/assets/CV19DP_logo_oneliner2.svg", top=True, + style={"width": "18rem", "margin-left": "20px", + "margin-top": "10px"}, ), + dbc.CardBody( + [ + dbc.Button( + "Explore data derived from the Covid19 Data Portal sequences", color="warning", + href="/covid19-portal", + style={"margin-left": "20px", "margin-right": "20px", + "font-size": 20}, ), + ] + ), + ], + outline=False, + style={"width": "40rem", "height": "15rem"}, + ) + ), ])), html.Br(), html.Br(), diff --git a/covigator/dashboard/tabs/recurrent_mutations.py b/covigator/dashboard/tabs/recurrent_mutations.py index 8eadaffd..1b8b888c 100644 --- a/covigator/dashboard/tabs/recurrent_mutations.py +++ b/covigator/dashboard/tabs/recurrent_mutations.py @@ -203,6 +203,7 @@ def set_callbacks_variants_tab(app, session: Session): domains_by_gene = {g.name: queries.get_domains_by_gene(g.name) for g in queries.get_genes()} all_domains = queries.get_domains() months_from_db_ena = queries.get_sample_months(MONTH_PATTERN, data_source=DataSource.ENA.name) + months_from_db_covid19_portal = queries.get_sample_months(MONTH_PATTERN, data_source=DataSource.COVID19_PORTAL.name) @app.callback( Output(ID_DROPDOWN_DOMAIN, 'options'), @@ -214,12 +215,17 @@ def set_domains(selected_gene): @app.callback( Output(ID_DROPDOWN_DATE_RANGE_END_DIV, 'children'), - Input(ID_DROPDOWN_DATE_RANGE_START, 'value') + Input(ID_DROPDOWN_DATE_RANGE_START, 'value'), + Input(ID_DROPDOWN_DATA_SOURCE, 'value') ) - def update_dropdown_end_date(start_date): + def update_dropdown_end_date(start_date, data_source): today = datetime.now() today_formatted = today.strftime(MONTH_PATTERN) - months = [m for m in months_from_db_ena if m >= start_date] + months = [] + if data_source == DataSource.ENA.name: + months = [m for m in months_from_db_ena if m >= start_date] + elif data_source == DataSource.COVID19_PORTAL.name: + months = [m for m in months_from_db_covid19_portal if m >= start_date] return dcc.Dropdown( id=ID_DROPDOWN_DATE_RANGE_END, options=[{'label': c, 'value': c} for c in months], diff --git a/covigator/dashboard/tabs/samples.py b/covigator/dashboard/tabs/samples.py index 60a7ab82..80708bec 100644 --- a/covigator/dashboard/tabs/samples.py +++ b/covigator/dashboard/tabs/samples.py @@ -101,6 +101,38 @@ def set_callbacks_samples_tab(app, session: Session): queries = Queries(session=session) figures = SampleFigures(queries) + countries_ena = queries.get_countries(DataSource.ENA.name) + countries_covid19_portal = queries.get_countries(DataSource.COVID19_PORTAL.name) + lineages_ena = queries.get_lineages(DataSource.ENA.name) + lineages_covid19_portal = queries.get_lineages(DataSource.COVID19_PORTAL.name) + + @app.callback( + Output(ID_DROPDOWN_COUNTRY, 'options'), + Input(ID_DROPDOWN_DATA_SOURCE, 'value')) + def set_countries(source): + """ + Updates the country drop down list when the data source is changed + """ + countries = [] + if source == DataSource.ENA.name: + countries = [{'label': c, 'value': c} for c in countries_ena] + elif source == DataSource.COVID19_PORTAL.name: + countries = [{'label': c, 'value': c} for c in countries_covid19_portal] + return countries + + @app.callback( + Output(ID_DROPDOWN_LINEAGE, 'options'), + Input(ID_DROPDOWN_DATA_SOURCE, 'value')) + def set_lineages(source): + """ + Updates the country drop down list when the data source is changed + """ + lineages = [] + if source == DataSource.ENA.name: + lineages = [{'label': c, 'value': c} for c in lineages_ena] + elif source == DataSource.COVID19_PORTAL.name: + lineages = [{'label': c, 'value': c} for c in lineages_covid19_portal] + return lineages @app.callback( Output(ID_SLIDER_MIN_SAMPLES, 'disabled'), diff --git a/covigator/database/model.py b/covigator/database/model.py old mode 100644 new mode 100755 index 48353e72..a148e27f --- a/covigator/database/model.py +++ b/covigator/database/model.py @@ -5,7 +5,6 @@ from sqlalchemy import Column, String, Float, Enum, DateTime, Integer, Boolean, Date, ForeignKey, \ ForeignKeyConstraint, BigInteger, JSON, Index import enum - from covigator.configuration import Configuration @@ -19,17 +18,17 @@ def get_table_versioned_name(basename, config: Configuration): LOG_TABLE_NAME = get_table_versioned_name('log', config=config) LAST_UPDATE_TABLE_NAME = get_table_versioned_name('last_update', config=config) VARIANT_COOCCURRENCE_TABLE_NAME = get_table_versioned_name('variant_cooccurrence', config=config) -GISAID_VARIANT_COOCCURRENCE_TABLE_NAME = get_table_versioned_name('gisaid_variant_cooccurrence', config=config) +COVID19_PORTAL_VARIANT_COOCCURRENCE_TABLE_NAME = get_table_versioned_name('covid19portal_variant_cooccurrence', config=config) VARIANT_OBSERVATION_TABLE_NAME = get_table_versioned_name('variant_observation', config=config) SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME = get_table_versioned_name('subclonal_variant_observation', config=config) LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME = get_table_versioned_name('low_frequency_variant_observation', config=config) -GISAID_VARIANT_OBSERVATION_TABLE_NAME = get_table_versioned_name('gisaid_variant_observation', config=config) +COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME = get_table_versioned_name('variant_observation_covid19portal', config=config) VARIANT_TABLE_NAME = get_table_versioned_name('variant', config=config) SUBCLONAL_VARIANT_TABLE_NAME = get_table_versioned_name('subclonal_variant', config=config) LOW_FREQUENCY_VARIANT_TABLE_NAME = get_table_versioned_name('low_frequency_variant', config=config) -GISAID_VARIANT_TABLE_NAME = get_table_versioned_name('gisaid_variant', config=config) -SAMPLE_GISAID_TABLE_NAME = get_table_versioned_name('sample_gisaid', config=config) +COVID19_PORTAL_VARIANT_TABLE_NAME = get_table_versioned_name('variant_covid19portal', config=config) SAMPLE_ENA_TABLE_NAME = get_table_versioned_name('sample_ena', config=config) +SAMPLE_COVID19_PORTAL_TABLE_NAME = get_table_versioned_name('sample_covid19_portal', config=config) CONSERVATION_TABLE_NAME = get_table_versioned_name('conservation', config=config) PRECOMPUTED_VARIANTS_PER_SAMPLE_TABLE_NAME = get_table_versioned_name('precomputed_variants_per_sample', config=config) PRECOMPUTED_SUBSTITUTIONS_COUNTS_TABLE_NAME = get_table_versioned_name('precomputed_substitutions_counts', config=config) @@ -106,6 +105,7 @@ class DataSource(enum.Enum): ENA = 1 GISAID = 2 + COVID19_PORTAL = 3 class VariantType(enum.Enum): @@ -124,15 +124,16 @@ class SampleEna(Base): __tablename__ = SAMPLE_ENA_TABLE_NAME # data on run - # TODO: add foreign keys to jobs run_accession = Column(String, primary_key=True) # 'ERR4080473', + # DEPRECATED finished = Column(Boolean) + #### + sample_accession = Column(String) # 'SAMEA6798401', scientific_name = Column(String) study_accession = Column(String) experiment_accession = Column(String) - # TODO: store these as dates first_created = Column(Date) collection_date = Column(Date) instrument_platform = Column(String) @@ -207,7 +208,7 @@ class SampleEna(Base): gatk_pangolin_path = Column(String) bcftools_pangolin_path = Column(String) fastp_path = Column(String) - deduplication_metrics_path = Column(String) + deduplication_metrics_path = Column(String) # DEPRECATED horizontal_coverage_path = Column(String) vertical_coverage_path = Column(String) @@ -237,7 +238,7 @@ class SampleEna(Base): pangolin_qc_notes = Column(String) pangolin_note = Column(String) - # Picard deduplicatio output + # Picard deduplicatio output ## DEPRECATED percent_duplication = Column(Float) unpaired_reads_examined = Column(Integer) read_pairs_examined = Column(Integer) @@ -276,6 +277,81 @@ def get_fastq1_and_fastq2(self): return fastq1, fastq2 +class SampleCovid19Portal(Base): + """ + The table that holds all metadata for a ENA sample + """ + __tablename__ = SAMPLE_COVID19_PORTAL_TABLE_NAME + + run_accession = Column(String, primary_key=True) + first_created = Column(Date) + collection_date = Column(Date) + last_modification_date = Column(Date) + center_name = Column(String) + isolate = Column(String) + molecule_type = Column(String) + + # geographical data + country_raw = Column(String) # 'Denmark', + country = Column(String) + country_alpha_2 = Column(String) + country_alpha_3 = Column(String) + continent = Column(String) + continent_alpha_2 = Column(String) + + # FASTA + fasta_url = Column(String, nullable=False) + + # local files storage + sample_folder = Column(String) + fasta_path = Column(String) + vcf_path = Column(String) + pangolin_path = Column(String) + + # sequence information + sequence_length = Column(Integer) + count_n_bases = Column(Integer) + count_ambiguous_bases = Column(Integer) + + # counts of variants + count_snvs = Column(Integer) + count_insertions = Column(Integer) + count_deletions = Column(Integer) + + # pango output + pangolin_lineage = Column(String) + pangolin_conflict = Column(Float) + pangolin_ambiguity_score = Column(Float) + pangolin_scorpio_call = Column(String) + pangolin_scorpio_support = Column(Float) + pangolin_scorpio_conflict = Column(Float) + pangolin_version = Column(String) + pangolin_pangolin_version = Column(String) + pangolin_scorpio_version = Column(String) + pangolin_constellation_version = Column(String) + pangolin_qc_status = Column(String) + pangolin_qc_notes = Column(String) + pangolin_note = Column(String) + + # job status + status = Column(Enum(JobStatus, name=JobStatus.__constraint_name__), default=JobStatus.PENDING) + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.now()) + queued_at = Column(DateTime(timezone=True)) + analysed_at = Column(DateTime(timezone=True)) + loaded_at = Column(DateTime(timezone=True)) + failed_at = Column(DateTime(timezone=True)) + error_message = Column(String) + + covigator_accessor_version = Column(String) + covigator_processor_version = Column(String) + + def get_sample_folder(self, base_folder): + return os.path.join( + base_folder, + self.collection_date.strftime("%Y%m%d") if self.collection_date is not None else "nodate", + self.run_accession) + + class Variant(Base): """ A variant with its specific annotations. THis does not contain any sample specific annotations. @@ -340,6 +416,70 @@ def get_variant_id(self): return "{}:{}>{}".format(self.position, self.reference, self.alternate) +class VariantCovid19Portal(Base): + """ + A variant with its specific annotations. THis does not contain any sample specific annotations. + """ + __tablename__ = COVID19_PORTAL_VARIANT_TABLE_NAME + + variant_id = Column(String, primary_key=True) + chromosome = Column(String) + position = Column(Integer, index=True) + reference = Column(String) + alternate = Column(String) + overlaps_multiple_genes = Column(Boolean, default=False) + """ + SnpEff annotations + ##INFO=C + HGVS.p | p.Lys11Asn + cDNA.pos / cDNA.length | 33/21290 + CDS.pos / CDS.length | 33/21290 + AA.pos / AA.length | 11/7095 + Distance | + ERRORS / WARNINGS / INFO' + """ + annotation = Column(String, index=True) + annotation_highest_impact = Column(String, index=True) + annotation_impact = Column(String, index=True) + gene_name = Column(String, index=True) + gene_id = Column(String) + biotype = Column(String) + hgvs_c = Column(String) + hgvs_p = Column(String) + cdna_pos_length = Column(String) + cds_pos_length = Column(String) + aa_pos_length = Column(String) + + # derived annotations + variant_type = Column(Enum(VariantType, name=VariantType.__constraint_name__)) + length = Column(Integer) + reference_amino_acid = Column(String) + alternate_amino_acid = Column(String) + position_amino_acid = Column(Integer) + + # ConsHMM conservation annotations + cons_hmm_sars_cov_2 = Column(Float) + cons_hmm_sarbecovirus = Column(Float) + cons_hmm_vertebrate_cov = Column(Float) + + # Pfam protein domains + pfam_name = Column(String) + pfam_description = Column(String) + + def get_variant_id(self): + return "{}:{}>{}".format(self.position, self.reference, self.alternate) + + class SubclonalVariant(Base): __tablename__ = SUBCLONAL_VARIANT_TABLE_NAME @@ -488,12 +628,84 @@ class VariantObservation(Base): ForeignKeyConstraint([variant_id], [Variant.variant_id]) __table_args__ = ( - Index("{}_index_annotation_position".format(VARIANT_OBSERVATION_TABLE_NAME), + Index("{}_i1".format(VARIANT_OBSERVATION_TABLE_NAME), + "annotation_highest_impact", "position"), + Index("{}_i2".format(VARIANT_OBSERVATION_TABLE_NAME), "sample"), + Index("{}_i3".format(VARIANT_OBSERVATION_TABLE_NAME), "position"), + Index("{}_i4".format(VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact"), + Index("{}_i5".format(VARIANT_OBSERVATION_TABLE_NAME), "variant_id") + ) + + +class VariantObservationCovid19Portal(Base): + """ + A variant observation in a particular sample. This contains all annotations of a specific observation of a variant. + """ + __tablename__ = COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME + + sample = Column(String, primary_key=True) + variant_id = Column(String, primary_key=True) + chromosome = Column(String) + position = Column(Integer) + reference = Column(String) + alternate = Column(String) + quality = Column(Float) + filter = Column(String) + """ + ##INFO= + ##INFO= + ##INFO= + ##INFO= + ##INFO= + ##INFO= + ##INFO= + """ + dp = Column(Integer) + ac = Column(Integer) + dp4_ref_forward = Column(Integer) + dp4_ref_reverse = Column(Integer) + dp4_alt_forward = Column(Integer) + dp4_alt_reverse = Column(Integer) + vaf = Column(Float) + strand_bias = Column(Integer) + # fields replicated from Variant for performance reasons + annotation = Column(String) + annotation_impact = Column(String) + biotype = Column(String) + annotation_highest_impact = Column(String) + gene_name = Column(String) + hgvs_c = Column(String) + hgvs_p = Column(String) + + # fields replicated from sample for performance reasons + date = Column(Date) + + # derived annotations + variant_type = Column(Enum(VariantType, name=VariantType.__constraint_name__)) + length = Column(Integer) + reference_amino_acid = Column(String) + alternate_amino_acid = Column(String) + position_amino_acid = Column(Integer) + + # ConsHMM conservation annotations + cons_hmm_sars_cov_2 = Column(Float) + cons_hmm_sarbecovirus = Column(Float) + cons_hmm_vertebrate_cov = Column(Float) + + # Pfam protein domains + pfam_name = Column(String) + pfam_description = Column(String) + + ForeignKeyConstraint([sample], [SampleCovid19Portal.run_accession]) + ForeignKeyConstraint([variant_id], [VariantCovid19Portal.variant_id]) + + __table_args__ = ( + Index("{}_i1".format(COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact", "position"), - Index("{}_index_sample".format(VARIANT_OBSERVATION_TABLE_NAME), "sample"), - Index("{}_index_position".format(VARIANT_OBSERVATION_TABLE_NAME), "position"), - Index("{}_index_annotation".format(VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact"), - Index("{}_index_variant_id".format(VARIANT_OBSERVATION_TABLE_NAME), "variant_id") + Index("{}_i2".format(COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME), "sample"), + Index("{}_i3".format(COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME), "position"), + Index("{}_i4".format(COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact"), + Index("{}_i5".format(COVID19_PORTAL_VARIANT_OBSERVATION_TABLE_NAME), "variant_id") ) @@ -552,9 +764,9 @@ class SubclonalVariantObservation(Base): ForeignKeyConstraint([variant_id], [SubclonalVariant.variant_id]) __table_args__ = ( - Index("{}_index_position".format(SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME), "position"), - Index("{}_index_annotation_vaf".format(SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact", "vaf"), - Index("{}_index_vaf".format(SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME), "vaf"), + Index("{}_i1".format(SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME), "position"), + Index("{}_i2".format(SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact", "vaf"), + Index("{}_i3".format(SUBCLONAL_VARIANT_OBSERVATION_TABLE_NAME), "vaf"), ) @@ -613,9 +825,9 @@ class LowFrequencyVariantObservation(Base): ForeignKeyConstraint([variant_id], [LowFrequencyVariant.variant_id]) __table_args__ = ( - Index("{}_index_position".format(LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME), "position"), - Index("{}_index_annotation_vaf".format(LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact", "vaf"), - Index("{}_index_vaf".format(LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME), "vaf"), + Index("{}_i1".format(LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME), "position"), + Index("{}_i2".format(LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME), "annotation_highest_impact", "vaf"), + Index("{}_i3".format(LOW_FREQUENCY_VARIANT_OBSERVATION_TABLE_NAME), "vaf"), ) diff --git a/covigator/database/queries.py b/covigator/database/queries.py index bf69f9af..756fe4e0 100644 --- a/covigator/database/queries.py +++ b/covigator/database/queries.py @@ -13,7 +13,7 @@ SubclonalVariantObservation, PrecomputedVariantsPerSample, PrecomputedSubstitutionsCounts, PrecomputedIndelLength, \ VariantType, PrecomputedAnnotation, PrecomputedOccurrence, PrecomputedTableCounts, \ PrecomputedVariantAbundanceHistogram, PrecomputedSynonymousNonSynonymousCounts, RegionType, Domain, \ - LastUpdate + LastUpdate, SampleCovid19Portal, VariantObservationCovid19Portal, VariantCovid19Portal from covigator.exceptions import CovigatorQueryException, CovigatorDashboardMissingPrecomputedData @@ -26,6 +26,8 @@ def __init__(self, session: Session): def get_variant_observation_klass(source: str): if source == DataSource.ENA.name: klass = VariantObservation + elif source == DataSource.COVID19_PORTAL.name: + klass = VariantObservationCovid19Portal else: raise CovigatorQueryException("Bad data source: {}".format(source)) return klass @@ -34,6 +36,8 @@ def get_variant_observation_klass(source: str): def get_variant_klass(source: str): if source == DataSource.ENA.name: klass = Variant + elif source == DataSource.COVID19_PORTAL.name: + klass = VariantCovid19Portal else: raise CovigatorQueryException("Bad data source: {}".format(source)) return klass @@ -42,6 +46,8 @@ def get_variant_klass(source: str): def get_sample_klass(source: str): if source == DataSource.ENA.name: klass = SampleEna + elif source == DataSource.COVID19_PORTAL.name: + klass = SampleCovid19Portal else: raise CovigatorQueryException("Bad data source: {}".format(source)) return klass @@ -123,7 +129,7 @@ def get_variants_per_sample(self, data_source: str, genes: List[str], variant_ty .groupby(["number_mutations", "variant_type"]).sum().reset_index() def _assert_data_source(self, data_source): - if data_source != DataSource.ENA.name and data_source != DataSource.GISAID.name: + if data_source != DataSource.ENA.name and data_source != DataSource.COVID19_PORTAL.name: raise CovigatorQueryException("Bad data source: {}".format(data_source)) def get_indel_lengths(self, data_source, genes): @@ -336,6 +342,13 @@ def count_samples(self, source: str, cache=True) -> int: PrecomputedTableCounts.table == SampleEna.__name__, PrecomputedTableCounts.factor == PrecomputedTableCounts.FACTOR_SOURCE )) + elif source == DataSource.COVID19_PORTAL.name: + query = query.filter(and_( + PrecomputedTableCounts.table == SampleCovid19Portal.__name__, + PrecomputedTableCounts.factor == PrecomputedTableCounts.FACTOR_SOURCE + )) + else: + raise ValueError("Unknown data source") result = query.first() if result is None: raise CovigatorDashboardMissingPrecomputedData diff --git a/covigator/exceptions.py b/covigator/exceptions.py index fdda5b99..83ec3620 100644 --- a/covigator/exceptions.py +++ b/covigator/exceptions.py @@ -62,3 +62,25 @@ class CovigatorExcludedSampleNarrowCoverage(CovigatorExcludedSampleException): class CovigatorExcludedSampleBadQualityReads(CovigatorExcludedSampleException): pass + + +class CovigatorExcludedFailedDownload(CovigatorExcludedSampleException): + pass + + +class CovigatorExcludedHorizontalCoverage(CovigatorExcludedSampleException): + pass + + +class CovigatorExcludedBadBases(CovigatorExcludedSampleException): + pass + + +class CovigatorExcludedTooManyEntries(CovigatorExcludedSampleException): + pass + +class CovigatorExcludedEmptySequence(CovigatorExcludedSampleException): + pass + +class CovigatorExcludedMissingDateException(CovigatorExcludedSampleException): + pass \ No newline at end of file diff --git a/covigator/pipeline/covid19_portal_pipeline.py b/covigator/pipeline/covid19_portal_pipeline.py new file mode 100644 index 00000000..7cb87b4e --- /dev/null +++ b/covigator/pipeline/covid19_portal_pipeline.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +import os +from dataclasses import dataclass +from covigator.configuration import Configuration +from covigator.database.model import SampleCovid19Portal +from covigator.pipeline.runner import run_command + + +@dataclass +class Covid19PortalPipelineResult: + vcf_path: str + fasta_path: str + pangolin_path: str + + +class Covid19PortalPipeline: + + def __init__(self, config: Configuration): + self.config = config + + def run(self, sample: SampleCovid19Portal) -> Covid19PortalPipelineResult: + # NOTE: sample folder date/run_accession + sample_data_folder = sample.get_sample_folder(self.config.storage_folder) + output_vcf = os.path.join(sample_data_folder, "{name}.assembly.vcf.gz".format(name=sample.run_accession)) + output_pangolin = os.path.join(sample_data_folder, + "{name}.assembly.pangolin.csv".format(name=sample.run_accession)) + input_fasta = sample.fasta_path + + if not os.path.exists(output_vcf) or self.config.force_pipeline: + + command = "{nextflow} run {workflow} " \ + "--fasta {fasta} " \ + "--output {output_folder} " \ + "--name {name} " \ + "--cpus {cpus} " \ + "--memory {memory} " \ + "-profile conda " \ + "-offline " \ + "-work-dir {work_folder} " \ + "-with-trace {trace_file}".format( + nextflow=self.config.nextflow, + fasta=input_fasta, + output_folder=sample_data_folder, + name=sample.run_accession, + work_folder=self.config.temp_folder, + workflow=self.config.workflow, + trace_file=os.path.join(sample_data_folder, "nextflow_traces.txt"), + cpus=self.config.workflow_cpus, + memory=self.config.workflow_memory + ) + run_command(command, sample_data_folder) + + return Covid19PortalPipelineResult( + vcf_path=output_vcf, + fasta_path=input_fasta, + pangolin_path=output_pangolin + ) \ No newline at end of file diff --git a/covigator/pipeline/ena_pipeline.py b/covigator/pipeline/ena_pipeline.py index 344cd836..e59aba20 100644 --- a/covigator/pipeline/ena_pipeline.py +++ b/covigator/pipeline/ena_pipeline.py @@ -1,7 +1,6 @@ import os from dataclasses import dataclass from pathlib import Path -from logzero import logger from covigator.configuration import Configuration from covigator.pipeline.runner import run_command @@ -34,11 +33,8 @@ def __init__(self, config: Configuration): def run(self, run_accession: str, fastq1: str, fastq2: str = None) -> PipelineResult: - logger.info("Processing {} and {}".format(fastq1, fastq2)) sample_data_folder = Path(fastq1).parent - logger.info("Sample data folder: {}".format(sample_data_folder)) - lofreq_vcf = os.path.join(sample_data_folder, "{name}.lofreq.vcf.gz".format(name=run_accession)) ivar_vcf = os.path.join(sample_data_folder, "{name}.ivar.vcf.gz".format(name=run_accession)) gatk_vcf = os.path.join(sample_data_folder, "{name}.gatk.vcf.gz".format(name=run_accession)) diff --git a/covigator/pipeline/vcf_loader.py b/covigator/pipeline/vcf_loader.py index bc5399c6..f80893bd 100644 --- a/covigator/pipeline/vcf_loader.py +++ b/covigator/pipeline/vcf_loader.py @@ -8,7 +8,8 @@ from covigator import MISSENSE_VARIANT from covigator.database.model import Variant as CovigatorVariant, VariantObservation, \ SubclonalVariantObservation, SampleEna, DataSource, VariantType, \ - LowFrequencyVariantObservation, SubclonalVariant, LowFrequencyVariant + LowFrequencyVariantObservation, SubclonalVariant, LowFrequencyVariant, VariantCovid19Portal, \ + VariantObservationCovid19Portal, SampleCovid19Portal from covigator.database.queries import Queries from covigator.exceptions import CovigatorNotSupportedVariant @@ -54,7 +55,13 @@ def load(self, vcf_file: str, run_accession: str, source: DataSource, session: S variant: Variant for variant in variants: if variant.FILTER is None or variant.FILTER in ["LOW_FREQUENCY", "SUBCLONAL"]: - if variant.FILTER is None: # ENA clonal + if source == DataSource.COVID19_PORTAL: + covid19portal_variant = self._parse_variant(variant, VariantCovid19Portal) + observed_variants.append( + self._parse_variant_observation( + variant, specific_sample, covid19portal_variant, VariantObservationCovid19Portal)) + session.add(covid19portal_variant) + elif variant.FILTER is None: # ENA clonal # only stores clonal high quality variants in this table ena_variant = self._parse_variant(variant, CovigatorVariant) observed_variants.append( @@ -138,7 +145,8 @@ def _parse_snpeff_annotations( covigator_variant.position_amino_acid = int(match.group(2)) def _parse_variant_observation( - self, variant: Variant, sample: SampleEna, covigator_variant: CovigatorVariant, klass): + self, variant: Variant, sample: Union[SampleEna, SampleCovid19Portal], covigator_variant: CovigatorVariant, + klass): dp4 = variant.INFO.get("DP4") return klass( diff --git a/covigator/precomputations/load_ns_s_counts.py b/covigator/precomputations/load_ns_s_counts.py index 29007789..dad28700 100644 --- a/covigator/precomputations/load_ns_s_counts.py +++ b/covigator/precomputations/load_ns_s_counts.py @@ -35,12 +35,23 @@ def _load_counts(self, region: RegionType) -> List[PrecomputedSynonymousNonSynon source=DataSource.ENA, annotation=SYNONYMOUS_VARIANT, region=region) data_ns_ena = self._count_variant_observations_by_source_annotation_and_region( source=DataSource.ENA, annotation=MISSENSE_VARIANT, region=region) + data_s_portal = self._count_variant_observations_by_source_annotation_and_region( + source=DataSource.COVID19_PORTAL, annotation=SYNONYMOUS_VARIANT, region=region) + data_ns_portal = self._count_variant_observations_by_source_annotation_and_region( + source=DataSource.COVID19_PORTAL, annotation=MISSENSE_VARIANT, region=region) + data_ena = pd.merge( left=data_s_ena, right=data_ns_ena, on=["month", "region_name", "country"], how='outer').fillna(0) + data_portal = pd.merge( + left=data_s_portal, right=data_ns_portal, on=["month", "region_name", "country"], how='outer').fillna(0) + database_rows = [] if data_ena is not None: database_rows.extend(self._dataframe_to_model( data=data_ena, source=DataSource.ENA, region=region)) + if data_portal is not None: + database_rows.extend(self._dataframe_to_model( + data=data_portal, source=DataSource.GISAID, region=region)) return database_rows diff --git a/covigator/precomputations/load_top_occurrences.py b/covigator/precomputations/load_top_occurrences.py index 7fbe78b0..19ca52d2 100644 --- a/covigator/precomputations/load_top_occurrences.py +++ b/covigator/precomputations/load_top_occurrences.py @@ -38,12 +38,22 @@ def _read_top_occurrent_variants(self): except ValueError as e: logger.exception(e) logger.error("No top occurrences for ENA data") + top_occurring_variants_portal = None + try: + top_occurring_variants_portal = self.get_top_occurring_variants( + top=NUMBER_TOP_OCCURRENCES, source=DataSource.COVID19_PORTAL.name) + except ValueError: + logger.error("No top occurrences for GISAID data") database_rows = [] # stores the precomputed data if top_occurring_variants_ena is not None: for index, row in top_occurring_variants_ena.iterrows(): # add entries per gene database_rows.append(self._row_to_top_occurrence(row, source=DataSource.ENA)) + if top_occurring_variants_portal is not None: + for index, row in top_occurring_variants_portal.iterrows(): + # add entries per gene + database_rows.append(self._row_to_top_occurrence(row, source=DataSource.COVID19_PORTAL)) return database_rows def _row_to_top_occurrence(self, row, source): diff --git a/covigator/precomputations/load_variants_per_lineage.py b/covigator/precomputations/load_variants_per_lineage.py index a5453716..c39ed0a3 100644 --- a/covigator/precomputations/load_variants_per_lineage.py +++ b/covigator/precomputations/load_variants_per_lineage.py @@ -34,14 +34,21 @@ def _read_variants_per_lineage(self): except ValueError as e: logger.exception(e) logger.error("No top occurrences for ENA data") - + variants_per_lineage_portal = None + try: + variants_per_lineage_portal = self.get_variants_per_lineage(source=DataSource.COVID19_PORTAL.name) + except ValueError: + logger.error("No top occurrences for Covid19 Portal data") database_rows = [] # stores the precomputed data if variants_per_lineage_ena is not None: for index, row in variants_per_lineage_ena.iterrows(): # add entries per gene database_rows.append(self._row_to_variants_per_lineage(row, source=DataSource.ENA)) - + if variants_per_lineage_portal is not None: + for index, row in variants_per_lineage_portal.iterrows(): + # add entries per gene + database_rows.append(self._row_to_variants_per_lineage(row, source=DataSource.COVID19_PORTAL)) return database_rows def _row_to_variants_per_lineage(self, row, source): diff --git a/covigator/precomputations/loader.py b/covigator/precomputations/loader.py index b3ae485f..76e6cab0 100644 --- a/covigator/precomputations/loader.py +++ b/covigator/precomputations/loader.py @@ -4,7 +4,7 @@ from covigator.database.model import PrecomputedVariantsPerSample, PrecomputedSubstitutionsCounts, \ PrecomputedIndelLength, PrecomputedAnnotation, VariantObservation, DataSource, \ PrecomputedTableCounts, Variant, SubclonalVariantObservation, PrecomputedVariantAbundanceHistogram, \ - SampleEna + SampleEna, VariantCovid19Portal, VariantObservationCovid19Portal, SampleCovid19Portal from logzero import logger from covigator.database.queries import Queries @@ -47,14 +47,16 @@ def load_counts_variants_per_sample(self): # reads the data from the database database_rows_ena = self._read_count_variants_per_sample(data_source=DataSource.ENA) + database_rows_portal = self._read_count_variants_per_sample(data_source=DataSource.COVID19_PORTAL) # delete all rows before starting self.session.query(PrecomputedVariantsPerSample).delete() self.session.commit() - self.session.add_all(database_rows_ena) + self.session.add_all(database_rows_ena + database_rows_portal) self.session.commit() - logger.info("Added {} entries to {}".format(len(database_rows_ena), PrecomputedVariantsPerSample.__tablename__)) + logger.info("Added {} entries to {}".format(len(database_rows_ena) + len(database_rows_portal), + PrecomputedVariantsPerSample.__tablename__)) def _read_count_variants_per_sample(self, data_source: DataSource): @@ -96,14 +98,15 @@ def _read_count_variants_per_sample(self, data_source: DataSource): def load_count_substitutions(self): database_rows_ena = self._read_count_substitutions(data_source=DataSource.ENA) + database_rows_portal = self._read_count_substitutions(data_source=DataSource.COVID19_PORTAL) # delete all rows before starting self.session.query(PrecomputedSubstitutionsCounts).delete() self.session.commit() - self.session.add_all(database_rows_ena) + self.session.add_all(database_rows_ena + database_rows_portal) self.session.commit() - logger.info("Added {} entries to {}".format(len(database_rows_ena), + logger.info("Added {} entries to {}".format(len(database_rows_ena) + len(database_rows_portal), PrecomputedSubstitutionsCounts.__tablename__)) def _read_count_substitutions(self, data_source: DataSource): @@ -150,14 +153,15 @@ def _read_count_substitutions(self, data_source: DataSource): def load_indel_length(self): database_rows_ena = self._read_indel_length(data_source=DataSource.ENA) + database_rows_portal = self._read_indel_length(data_source=DataSource.COVID19_PORTAL) # delete all rows before starting self.session.query(PrecomputedIndelLength).delete() self.session.commit() - self.session.add_all(database_rows_ena) + self.session.add_all(database_rows_ena + database_rows_portal) self.session.commit() - logger.info("Added {} entries to {}".format(len(database_rows_ena), + logger.info("Added {} entries to {}".format(len(database_rows_ena) + len(database_rows_portal), PrecomputedIndelLength.__tablename__)) def _read_indel_length(self, data_source: DataSource): @@ -202,14 +206,16 @@ def _read_indel_length(self, data_source: DataSource): def load_annotation(self): database_rows_ena = self._read_annotations(data_source=DataSource.ENA) + database_rows_portal = self._read_annotations(data_source=DataSource.COVID19_PORTAL) # delete all rows before starting self.session.query(PrecomputedAnnotation).delete() self.session.commit() - self.session.add_all(database_rows_ena) + self.session.add_all(database_rows_ena + database_rows_portal) self.session.commit() - logger.info("Added {} entries to {}".format(len(database_rows_ena), PrecomputedAnnotation.__tablename__)) + logger.info("Added {} entries to {}".format(len(database_rows_ena) + len(database_rows_portal), + PrecomputedAnnotation.__tablename__)) def _read_annotations(self, data_source: DataSource): @@ -253,13 +259,18 @@ def _read_annotations(self, data_source: DataSource): def load_table_counts(self): count_variants_ena = self.queries.count_variants(cache=False, source=DataSource.ENA.name) + count_variants_portal = self.queries.count_variants(cache=False, source=DataSource.COVID19_PORTAL.name) count_samples_ena = self.queries.count_samples(source=DataSource.ENA.name, cache=False) + count_samples_portal = self.queries.count_samples(source=DataSource.COVID19_PORTAL.name, cache=False) count_variant_observations_ena = self.queries.count_variant_observations( source=DataSource.ENA.name, cache=False) + count_variant_observations_portal = self.queries.count_variant_observations( + source=DataSource.COVID19_PORTAL.name, cache=False) count_subclonal_variant_observations = self.queries.count_subclonal_variant_observations(cache=False) count_subclonal_variant_unique = self.queries.count_unique_subclonal_variant(cache=False) count_subclonal_variant_unique_only_subclonal = self.queries.count_unique_only_subclonal_variant(cache=False) count_countries_ena = self.queries.count_countries(source=DataSource.ENA.name, cache=False) + count_countries_portal = self.queries.count_countries(source=DataSource.COVID19_PORTAL.name, cache=False) # delete all rows before starting self.session.query(PrecomputedTableCounts).delete() @@ -269,9 +280,15 @@ def load_table_counts(self): PrecomputedTableCounts( table=Variant.__name__, count=count_variants_ena, factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.ENA.name), + PrecomputedTableCounts( + table=VariantCovid19Portal.__name__, count=count_variants_portal, + factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.COVID19_PORTAL.name), PrecomputedTableCounts( table=VariantObservation.__name__, count=count_variant_observations_ena, factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.ENA.name), + PrecomputedTableCounts( + table=VariantObservationCovid19Portal.__name__, count=count_variant_observations_portal, + factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.COVID19_PORTAL.name), PrecomputedTableCounts( table=SubclonalVariantObservation.__name__, count=count_subclonal_variant_observations), PrecomputedTableCounts( @@ -282,9 +299,15 @@ def load_table_counts(self): PrecomputedTableCounts( table=SampleEna.__name__, count=count_samples_ena, factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.ENA.name), + PrecomputedTableCounts( + table=SampleCovid19Portal.__name__, count=count_samples_portal, + factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.COVID19_PORTAL.name), PrecomputedTableCounts( table=PrecomputedTableCounts.VIRTUAL_TABLE_COUNTRY, count=count_countries_ena, factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.ENA.name), + PrecomputedTableCounts( + table=PrecomputedTableCounts.VIRTUAL_TABLE_COUNTRY, count=count_countries_portal, + factor=PrecomputedTableCounts.FACTOR_SOURCE, value=DataSource.COVID19_PORTAL.name), ] if len(database_rows) > 0: @@ -315,6 +338,13 @@ def _read_variant_abundance_histogram(self): histogram["bin_size"] = bin_size histogram["source"] = DataSource.ENA histograms.append(histogram) + for bin_size in BIN_SIZE_VALUES: + histogram = self.queries.get_variant_abundance_histogram( + bin_size=bin_size, source=DataSource.COVID19_PORTAL.name, cache=False) + if histogram is not None: + histogram["bin_size"] = bin_size + histogram["source"] = DataSource.COVID19_PORTAL + histograms.append(histogram) database_rows = [] if len(histograms) > 0: for index, row in pd.concat(histograms).iterrows(): diff --git a/covigator/processor/abstract_processor.py b/covigator/processor/abstract_processor.py index 6e02fa0c..adf6f29f 100644 --- a/covigator/processor/abstract_processor.py +++ b/covigator/processor/abstract_processor.py @@ -3,18 +3,16 @@ import traceback import pandas as pd from contextlib import suppress -from datetime import datetime +from datetime import datetime, date from typing import Callable import typing as typing from dask.distributed import Client from distributed import fire_and_forget from logzero import logger -import covigator -import covigator.configuration from covigator.configuration import Configuration from covigator.database.database import Database, session_scope from covigator.database.model import Log, DataSource, CovigatorModule, JobStatus, \ - SampleEna + SampleEna, LastUpdate from covigator.database.queries import Queries from covigator.exceptions import CovigatorExcludedSampleException, CovigatorErrorProcessingPangolinResults from covigator.precomputations.loader import PrecomputationsLoader @@ -23,7 +21,7 @@ class AbstractProcessor: def __init__(self, database: Database, dask_client: Client, data_source: DataSource, config: Configuration, - download : bool = False, wait_time=60): + wait_time=60): self.data_source = data_source self.config = config self.start_time = datetime.now() @@ -36,7 +34,6 @@ def __init__(self, database: Database, dask_client: Client, data_source: DataSou self.session = self.database.get_database_session() self.queries = Queries(self.session) self.wait_time = wait_time - self.download = download def process(self): logger.info("Starting processor") @@ -45,10 +42,7 @@ def process(self): try: while True: # queries 100 jobs every time to make sending to queue faster - jobs = self.queries.find_first_pending_jobs( - self.data_source, n=1000, - # only reads jobs in PENDING status if --download is indicated - status=[JobStatus.PENDING, JobStatus.DOWNLOADED] if self.download else [JobStatus.DOWNLOADED]) + jobs = self.queries.find_first_pending_jobs(self.data_source, n=1000, status=[JobStatus.DOWNLOADED]) if jobs is None or len(jobs) == 0: logger.info("No more jobs to process after sending {} runs to process".format(count)) break @@ -80,6 +74,9 @@ def process(self): # precomputes data right after processor PrecomputationsLoader(session=self.session).load() + # updates the last update entry + self._register_last_update() + except Exception as e: logger.exception(e) self.session.rollback() @@ -96,6 +93,11 @@ def process(self): self.session.close() logger.info("Cluster and database sessions closed") + def _register_last_update(self): + last_update = LastUpdate(source=self.data_source, update_time=date.today()) + self.session.add(last_update) + self.session.commit() + def _wait_for_batch(self): logger.info("Waiting for a batch of jobs...") while (count_pending_jobs := self.queries.count_jobs_in_queue(data_source=self.data_source)) > 0: @@ -112,7 +114,6 @@ def run_job(config: Configuration, run_accession: str, start_status: JobStatus, Runs a function on a job, if anything goes wrong or does not fit in the DB it returns None in order to stop the execution of subsequent jobs. """ - covigator.configuration.initialise_logs(config.logfile_processor, sample_id=run_accession) if run_accession is not None: try: with session_scope(config=config) as session: @@ -124,7 +125,6 @@ def run_job(config: Configuration, run_accession: str, start_status: JobStatus, if end_status is not None: sample.status = end_status else: - logger.warning("Expected ENA job {} in status {}".format(run_accession, start_status)) run_accession = None except CovigatorExcludedSampleException as e: # captures exclusion cases @@ -139,8 +139,6 @@ def run_job(config: Configuration, run_accession: str, start_status: JobStatus, config=config, run_accession=run_accession, exception=e, status=error_status, data_source=data_source) run_accession = None - else: - logger.warning("Error processing a job that does not stop the workflow!") return run_accession @abc.abstractmethod @@ -170,8 +168,6 @@ def _get_traceback_from_exception(e): @staticmethod def _log_error_in_job(config: Configuration, run_accession: str, exception: Exception, status: JobStatus, data_source: DataSource): with session_scope(config=config) as session: - logger.exception(exception) - logger.info("Error on job {} on state {}: {}".format(run_accession, status, str(exception))) sample = Queries(session).find_job_by_accession(run_accession=run_accession, data_source=data_source) sample.status = status sample.failed_at = datetime.now() diff --git a/covigator/processor/covid19portal_processor.py b/covigator/processor/covid19portal_processor.py new file mode 100644 index 00000000..d861e278 --- /dev/null +++ b/covigator/processor/covid19portal_processor.py @@ -0,0 +1,68 @@ +from datetime import datetime + +import covigator +from covigator.configuration import Configuration +from covigator.database.model import JobStatus, DataSource, SampleCovid19Portal +from covigator.database.database import Database +from logzero import logger +from dask.distributed import Client + +from covigator.database.queries import Queries +from covigator.processor.abstract_processor import AbstractProcessor +from covigator.pipeline.covid19_portal_pipeline import Covid19PortalPipeline +from covigator.pipeline.vcf_loader import VcfLoader + + +class Covid19PortalProcessor(AbstractProcessor): + + def __init__(self, database: Database, dask_client: Client, config: Configuration, wait_time=60): + logger.info("Initialising Covid19 Portal processor") + super().__init__(database, dask_client, DataSource.COVID19_PORTAL, config=config, wait_time=wait_time) + + def _process_run(self, run_accession: str): + # NOTE: here we set the priority of each step to ensure a depth first processing + future = self.dask_client.submit(Covid19PortalProcessor.job, self.config, run_accession, priority=1) + return future + + @staticmethod + def job(config: Configuration, run_accession): + return Covid19PortalProcessor.run_job( + config, run_accession, start_status=JobStatus.QUEUED, end_status=JobStatus.FINISHED, + error_status=JobStatus.FAILED_PROCESSING, data_source=DataSource.COVID19_PORTAL, + function=Covid19PortalProcessor.run_all) + + @staticmethod + def run_all(sample: SampleCovid19Portal, queries: Queries, config: Configuration): + sample = Covid19PortalProcessor.run_pipeline(sample=sample, queries=queries, config=config) + sample = Covid19PortalProcessor.load(sample=sample, queries=queries, config=config) + return sample + + @staticmethod + def run_pipeline(sample: SampleCovid19Portal, queries: Queries, config: Configuration) -> SampleCovid19Portal: + pipeline_results = Covid19PortalPipeline(config=config).run(sample=sample) + sample.analysed_at = datetime.now() + sample.sample_folder = sample.get_sample_folder(config.storage_folder) + sample.vcf_path = pipeline_results.vcf_path + sample.pangolin_path = pipeline_results.pangolin_path + sample.fasta_path = pipeline_results.fasta_path + + # stores the covigator version + sample.covigator_processor_version = covigator.VERSION + + # load pangolin results + sample = Covid19PortalProcessor.load_pangolin(sample=sample, path=sample.pangolin_path) + + # NOTE: this is a counterintuititve commit. The VCF loading happening after this may do a legitimate rollback + # but we don't want to rollback changes in the sample, hence this commit + queries.session.commit() + + return sample + + @staticmethod + def load(sample: SampleCovid19Portal, queries: Queries, config: Configuration) -> SampleCovid19Portal: + if not config.skip_vcf_loading: + VcfLoader().load( + vcf_file=sample.vcf_path, run_accession=sample.run_accession, source=DataSource.COVID19_PORTAL, + session=queries.session) + sample.loaded_at = datetime.now() + return sample diff --git a/covigator/processor/ena_processor.py b/covigator/processor/ena_processor.py index ffde9e31..6384910b 100644 --- a/covigator/processor/ena_processor.py +++ b/covigator/processor/ena_processor.py @@ -1,31 +1,26 @@ import json from datetime import datetime - import pandas as pd - import covigator from covigator.configuration import Configuration from covigator.database.queries import Queries from covigator.exceptions import CovigatorErrorProcessingCoverageResults, CovigatorExcludedSampleBadQualityReads, \ CovigatorExcludedSampleNarrowCoverage, \ CovigatorErrorProcessingDeduplicationResults -from covigator.misc import backoff_retrier from covigator.database.model import JobStatus, DataSource, SampleEna from covigator.database.database import Database from logzero import logger from dask.distributed import Client from covigator.processor.abstract_processor import AbstractProcessor -from covigator.pipeline.downloader import Downloader from covigator.pipeline.ena_pipeline import Pipeline from covigator.pipeline.vcf_loader import VcfLoader class EnaProcessor(AbstractProcessor): - def __init__(self, database: Database, dask_client: Client, config: Configuration, download : bool = False, - wait_time=60): + def __init__(self, database: Database, dask_client: Client, config: Configuration, wait_time=60): logger.info("Initialising ENA processor") - super().__init__(database, dask_client, DataSource.ENA, config, download=download, wait_time=wait_time) + super().__init__(database, dask_client, DataSource.ENA, config, wait_time=wait_time) def _process_run(self, run_accession: str): """ @@ -44,22 +39,10 @@ def job(config: Configuration, run_accession): @staticmethod def run_all(sample: SampleEna, queries: Queries, config: Configuration) -> SampleEna: - sample = EnaProcessor.download(sample=sample, queries=queries, config=config) sample = EnaProcessor.run_pipeline(sample=sample, queries=queries, config=config) sample = EnaProcessor.load(sample=sample, queries=queries, config=config) return sample - @staticmethod - def download(sample: SampleEna, queries: Queries, config: Configuration) -> SampleEna: - # ensures that the download is done with retries, even after MD5 check sum failure - downloader = Downloader(config=config) - download_with_retries = backoff_retrier.wrapper(downloader.download, config.retries_download) - paths = download_with_retries(sample_ena=sample) - sample.sample_folder = sample.get_sample_folder(config.storage_folder) - sample.fastq_path = paths - sample.downloaded_at = datetime.now() - return sample - @staticmethod def run_pipeline(sample: SampleEna, queries: Queries, config: Configuration) -> SampleEna: fastq1, fastq2 = sample.get_fastq1_and_fastq2() @@ -79,7 +62,6 @@ def run_pipeline(sample: SampleEna, queries: Queries, config: Configuration) -> sample.fastp_path = pipeline_result.fastp_qc sample.horizontal_coverage_path = pipeline_result.horizontal_coverage sample.vertical_coverage_path = pipeline_result.vertical_coverage - sample.deduplication_metrics_path = pipeline_result.deduplication_metrics # stores the covigator version sample.covigator_processor_version = covigator.VERSION @@ -88,52 +70,12 @@ def run_pipeline(sample: SampleEna, queries: Queries, config: Configuration) -> sample.qc = json.load(open(pipeline_result.fastp_qc)) # load horizontal coverage values in the database sample = EnaProcessor.load_coverage_results(sample) - # load deduplication metrics - sample = EnaProcessor.load_deduplication_metrics(sample) # load pangolin results sample = EnaProcessor.load_pangolin(sample=sample, path=sample.lofreq_pangolin_path) - return sample - - @staticmethod - def load_deduplication_metrics(sample: SampleEna) -> SampleEna: - try: - data = pd.read_csv(sample.deduplication_metrics_path, - sep="\t", - skiprows=6, - nrows=1, - dtype={ - 'PERCENT_DUPLICATION': float, - 'UNPAIRED_READS_EXAMINED': int, - 'READ_PAIRS_EXAMINED': int, - 'SECONDARY_OR_SUPPLEMENTARY_RDS': int, - 'UNMAPPED_READS': int, - 'UNPAIRED_READ_DUPLICATES': int, - 'READ_PAIR_DUPLICATES': int, - 'READ_PAIR_OPTICAL_DUPLICATES': int - }) - - # fill NA values on a per column basis... - data.PERCENT_DUPLICATION.fillna(value=0.0, inplace=True) - data.UNPAIRED_READS_EXAMINED.fillna(value=0, inplace=True) - data.READ_PAIRS_EXAMINED.fillna(value=0, inplace=True) - data.SECONDARY_OR_SUPPLEMENTARY_RDS.fillna(value=0, inplace=True) - data.UNMAPPED_READS.fillna(value=0, inplace=True) - data.UNPAIRED_READ_DUPLICATES.fillna(value=0, inplace=True) - data.READ_PAIR_DUPLICATES.fillna(value=0, inplace=True) - data.READ_PAIR_OPTICAL_DUPLICATES.fillna(value=0, inplace=True) - - sample.percent_duplication = data.PERCENT_DUPLICATION.loc[0] - # NOTE: SQLalchemy does not play well with int64 - sample.unpaired_reads_examined = int(data.UNPAIRED_READS_EXAMINED.loc[0]) - sample.read_pairs_examined = int(data.READ_PAIRS_EXAMINED.loc[0]) - sample.secondary_or_supplementary_reads = int(data.SECONDARY_OR_SUPPLEMENTARY_RDS.loc[0]) - sample.unmapped_reads = int(data.UNMAPPED_READS.loc[0]) - sample.unpaired_read_duplicates = int(data.UNPAIRED_READ_DUPLICATES.loc[0]) - sample.read_pair_duplicates = int(data.READ_PAIR_DUPLICATES.loc[0]) - sample.read_pair_optical_duplicates = int(data.READ_PAIR_OPTICAL_DUPLICATES.loc[0]) - except Exception as e: - raise CovigatorErrorProcessingDeduplicationResults(e) + # NOTE: this is a counterintuititve commit. The VCF loading happening after this may do a legitimate rollback + # but we don't want to rollback changes in the sample, hence this commit + queries.session.commit() return sample diff --git a/covigator/tests/resources/test.another_pangolin.csv b/covigator/tests/resources/test.another_pangolin.csv new file mode 100644 index 00000000..40310e6d --- /dev/null +++ b/covigator/tests/resources/test.another_pangolin.csv @@ -0,0 +1,2 @@ +taxon,lineage,conflict,ambiguity_score,scorpio_call,scorpio_support,scorpio_conflict,scorpio_notes,version,pangolin_version,scorpio_version,constellation_version,is_designated,qc_status,qc_notes,note +"ENA|MZ844877|MZ844877.1 Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/USA/TX-CDC-FG-066244/2021 ORF1ab polyprotein (ORF1ab) and ORF1a polyprotein (ORF1ab) genes, partial cds; surface glycoprotein (S), ORF3a protein (ORF3a), envelope protein (E), membrane glycoprotein (M), ORF6 protein (ORF6), ORF7a protein (ORF7a), and ORF7b (ORF7b) genes, complete cds; ORF8 gene, complete sequence; and nucleocapsid phosphoprotein (N) and ORF10 protein (ORF10) genes, complete cds.",AY.44,0.0,,Delta (B.1.617.2-like),1.0,0.0,scorpio call: Alt alleles 13; Ref alleles 0; Amb alleles 0; Oth alleles 0,PUSHER-v1.14,4.1.2,0.3.17,v0.1.10,False,pass,Ambiguous_content:0.03,Usher placements: AY.44(2/2) \ No newline at end of file diff --git a/covigator/tests/unit_tests/test_downloader.py b/covigator/tests/unit_tests/test_downloader.py index fe023584..757afeec 100644 --- a/covigator/tests/unit_tests/test_downloader.py +++ b/covigator/tests/unit_tests/test_downloader.py @@ -11,7 +11,7 @@ class DownloaderTest(AbstractTest): def setUp(self) -> None: self.downloader = Downloader(config=self.config) - @unittest.skip + #@unittest.skip def test_download_ena_run_with_bad_md5(self): run_accession = "TEST12345" ena_run = SampleEna( diff --git a/covigator/tests/unit_tests/test_processor.py b/covigator/tests/unit_tests/test_processor.py index 985bd93e..bfcfaa92 100644 --- a/covigator/tests/unit_tests/test_processor.py +++ b/covigator/tests/unit_tests/test_processor.py @@ -110,21 +110,19 @@ def test_load_pangolin(self): self.session.add(sample) self.session.commit() - def test_load_dedup_metrics(self): - deduplication_metrics_path = pkg_resources.resource_filename( - covigator.tests.__name__, "resources/test.deduplication_metrics.txt") - sample = SampleEna(run_accession="TEST", deduplication_metrics_path=deduplication_metrics_path, fastq_ftp="", fastq_md5="", - num_fastqs=1) - EnaProcessor.load_deduplication_metrics(sample) - self.assertEqual(sample.percent_duplication, 0.919339) - self.assertEqual(sample.unpaired_reads_examined, 0) - self.assertEqual(sample.read_pairs_examined, 352625) - self.assertEqual(sample.secondary_or_supplementary_reads, 1972) - self.assertEqual(sample.unmapped_reads, 0) - self.assertEqual(sample.unpaired_read_duplicates, 0) - self.assertEqual(sample.read_pair_duplicates, 324182) - self.assertEqual(sample.read_pair_optical_duplicates, 0) - + def test_load_another_pangolin(self): + pangolin_path = pkg_resources.resource_filename(covigator.tests.__name__, "resources/test.another_pangolin.csv") + sample = SampleEna(run_accession="TEST", fastq_ftp="blabla", fastq_md5="blabla", num_fastqs=1) + sample = EnaProcessor.load_pangolin(sample, path=pangolin_path) + self.assertEqual(sample.pangolin_pangolin_version, "4.1.2") + self.assertEqual(sample.pangolin_version, "PUSHER-v1.14") + self.assertEqual(sample.pangolin_scorpio_version, "0.3.17") + self.assertEqual(sample.pangolin_constellation_version, "v0.1.10") + self.assertEqual(sample.pangolin_qc_status, "pass") + self.assertEqual(sample.pangolin_note, "Usher placements: AY.44(2/2)") + self.assertEqual(sample.pangolin_lineage, "AY.44") + self.assertEqual(sample.pangolin_conflict, 0.0) + self.assertEqual(sample.pangolin_ambiguity_score, 0.0) self.session.add(sample) self.session.commit() diff --git a/docs/source/_static/figures/CV19DP_logo_oneliner2.svg b/docs/source/_static/figures/CV19DP_logo_oneliner2.svg new file mode 100755 index 00000000..0d11ca51 --- /dev/null +++ b/docs/source/_static/figures/CV19DP_logo_oneliner2.svg @@ -0,0 +1,425 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/requirements.txt b/requirements.txt index 8a9b776c..f4fe8d43 100755 --- a/requirements.txt +++ b/requirements.txt @@ -2,9 +2,9 @@ numpy==1.21.0 pandas==1.3.3 requests==2.25.0 sqlalchemy==1.3.20 -dask[complete]==2022.1.1 -dask_jobqueue==0.7.3 -distributed==2022.1.1 +dask[complete]==2022.9.2 +dask_jobqueue==0.8.1 +distributed==2022.9.2 Werkzeug==2.0.1 cytoolz==0.11.0 logzero==1.6.3 diff --git a/scripts/bash/export_db.sh b/scripts/bash/export_db.sh old mode 100644 new mode 100755 index 57fe8476..69d6fdd2 --- a/scripts/bash/export_db.sh +++ b/scripts/bash/export_db.sh @@ -50,7 +50,7 @@ gatk_pangolin_path, bcftools_pangolin_path, fastp_path, deduplication_metrics_pa vertical_coverage_path, num_reads, covered_bases, coverage, mean_depth, mean_base_quality, \ mean_mapping_quality, pangolin_lineage, pangolin_conflict, pangolin_ambiguity_score, pangolin_scorpio_call, \ pangolin_scorpio_support, pangolin_scorpio_conflict, pangolin_version, pangolin_pangolin_version, \ -pangolin_pango_version, pangolin_status, pangolin_note, percent_duplication, \ +pangolin_scorpio_version, pangolin_constellation_version, pangolin_qc_status, pangolin_qc_notes, pangolin_note, percent_duplication, \ unpaired_reads_examined, read_pairs_examined, secondary_or_supplementary_reads, unmapped_reads, \ unpaired_read_duplicates, read_pair_duplicates, read_pair_optical_duplicates, covigator_accessor_version, \ covigator_processor_version" @@ -58,5 +58,12 @@ psql $pg_uri -c "\\copy sample_ena$version($sample_ena_fields) to program 'gzip psql $pg_uri -c "`get_export_command "variant"`" psql $pg_uri -c "`get_export_command "variant_cooccurrence"`" psql $pg_uri -c "`get_export_command "variant_observation"`" +psql $pg_uri -c "`get_export_command "subclonal_variant"`" psql $pg_uri -c "`get_export_command "subclonal_variant_observation"`" +psql $pg_uri -c "`get_export_command "low_frequency_variant"`" psql $pg_uri -c "`get_export_command "low_frequency_variant_observation"`" + +# Covid19portal +psql $pg_uri -c "`get_export_command "sample_covid19_portal"`" +psql $pg_uri -c "`get_export_command "variant_covid19portal"`" +psql $pg_uri -c "`get_export_command "variant_observation_covid19portal"`" diff --git a/scripts/bash/import_db.minimal.sh b/scripts/bash/import_db.minimal.sh deleted file mode 100755 index 6bd63ff6..00000000 --- a/scripts/bash/import_db.minimal.sh +++ /dev/null @@ -1,72 +0,0 @@ -#!/bin/bash - - -# USAGE: import_db.sh covigator_config.txt /your/data/folder - -source $1 -input_folder=$2 - -version=$COVIGATOR_TABLE_VERSION -export PGPASSWORD=$COVIGATOR_DB_PASSWORD -pg_uri=postgresql://$COVIGATOR_DB_USER@$COVIGATOR_DB_HOST:$COVIGATOR_DB_PORT/$COVIGATOR_DB_NAME - - -get_import_command() { - echo "\\copy $1$version from program 'gzip -dc $input_folder/$1.minimal.csv.gz' csv header;" -} - -get_delete_command() { - echo "delete from $1$version;" -} - -load_table() { - psql $pg_uri -c "`get_delete_command $1`" - psql $pg_uri -c "`get_import_command $1`" -} - -# references -#load_table "conservation" -#load_table "gene" -#load_table "domain" - -# logs -#load_table "log" -#load_table "last_update" - -# precomputations -#load_table "precomputed_annotation" -#load_table "precomputed_indel_length" -#load_table "precomputed_ns_s_counts" -#load_table "precomputed_substitutions_counts" -#load_table "precomputed_table_counts" -#load_table "precomputed_top_occurrence" -#load_table "precomputed_variant_abundance_histogram" -#load_table "precomputed_variants_per_lineage" -#load_table "precomputed_variants_per_sample" - -# ENA -sample_ena_fields="run_accession, finished, sample_accession, scientific_name, study_accession, \ -experiment_accession, first_created, collection_date, instrument_platform, instrument_model, library_name, \ -nominal_length, library_layout, library_strategy, library_source, library_selection, read_count, base_count, \ -sample_collection, sequencing_method, center_name, fastq_ftp, fastq_md5, num_fastqs, host_tax_id, host_sex, \ -host_body_site, host_gravidity, host_phenotype, host_genotype, lat, lon, country_raw, country, country_alpha_2,\ -country_alpha_3, continent, continent_alpha_2, count_snvs, count_insertions, count_deletions, \ -count_subclonal_snvs, count_subclonal_insertions, count_subclonal_deletions, count_low_frequency_snvs, \ -count_low_frequency_insertions, count_low_frequency_deletions, status, created_at, queued_at, downloaded_at, \ -analysed_at, loaded_at, failed_at, error_message, sample_folder, fastq_path, \ -lofreq_vcf_path, ivar_vcf_path, gatk_vcf_path, bcftools_vcf_path, lofreq_pangolin_path, ivar_pangolin_path, \ -gatk_pangolin_path, bcftools_pangolin_path, fastp_path, deduplication_metrics_path, horizontal_coverage_path, \ -vertical_coverage_path, num_reads, covered_bases, coverage, mean_depth, mean_base_quality, mean_mapping_quality, \ - pangolin_lineage, pangolin_conflict, pangolin_ambiguity_score, pangolin_scorpio_call, pangolin_scorpio_support, \ - pangolin_scorpio_conflict, pangolin_version, pangolin_pangolin_version, pangolin_pango_version, pangolin_status, \ - pangolin_note, percent_duplication, unpaired_reads_examined, read_pairs_examined, \ - secondary_or_supplementary_reads, unmapped_reads, unpaired_read_duplicates, read_pair_duplicates, \ - read_pair_optical_duplicates, covigator_accessor_version, covigator_processor_version" -psql $pg_uri -c "\\copy sample_ena$version($sample_ena_fields) from program 'gzip -dc $input_folder/sample_ena.minimal.csv.gz' csv header;" -load_table "variant" -load_table "variant_cooccurrence" -load_table "variant_observation" -load_table "subclonal_variant" -load_table "subclonal_variant_observation" -#load_table "low_frequency_variant_observation" - diff --git a/scripts/bash/import_db.sh b/scripts/bash/import_db.sh index 7d92777c..a3983871 100755 --- a/scripts/bash/import_db.sh +++ b/scripts/bash/import_db.sh @@ -59,7 +59,7 @@ gatk_pangolin_path, bcftools_pangolin_path, fastp_path, deduplication_metrics_pa vertical_coverage_path, num_reads, covered_bases, coverage, mean_depth, mean_base_quality, \ mean_mapping_quality, pangolin_lineage, pangolin_conflict, pangolin_ambiguity_score, pangolin_scorpio_call, \ pangolin_scorpio_support, pangolin_scorpio_conflict, pangolin_version, pangolin_pangolin_version, \ -pangolin_pango_version, pangolin_status, pangolin_note, percent_duplication, \ +pangolin_scorpio_version, pangolin_constellation_version, pangolin_qc_status, pangolin_qc_notes, pangolin_note, percent_duplication, \ unpaired_reads_examined, read_pairs_examined, secondary_or_supplementary_reads, unmapped_reads, \ unpaired_read_duplicates, read_pair_duplicates, read_pair_optical_duplicates, covigator_accessor_version, \ covigator_processor_version" @@ -69,4 +69,10 @@ load_table "variant_cooccurrence" load_table "variant_observation" load_table "subclonal_variant" load_table "subclonal_variant_observation" +load_table "low_frequency_variant" load_table "low_frequency_variant_observation" + +# Covid19 portal +load_table "sample_covid19_portal" +load_table "variant_covid19portal" +load_table "variant_observation_covid19portal" diff --git a/scripts/python/fasta_metadata.py b/scripts/python/fasta_metadata.py new file mode 100644 index 00000000..a403b267 --- /dev/null +++ b/scripts/python/fasta_metadata.py @@ -0,0 +1,34 @@ +from Bio import SeqIO +from Bio.SeqRecord import SeqRecord +from covigator.database.database import Database +from covigator.configuration import Configuration +from covigator.database.model import SampleCovid19Portal + + + +"""" +This is a temporary patch that has been amended in the accessor already +""" +config = Configuration() +session = Database(config=config, initialize=True).get_database_session() + +count = 0 + +sample: SampleCovid19Portal +for sample in session.query(SampleCovid19Portal).all(): + + record: SeqRecord + for record in SeqIO.parse(sample.fasta_path, "fasta"): + sample.sequence_length = len(str(record.seq)) + sample.count_n_bases = record.seq.count("N") + sample.count_ambiguous_bases = sum([record.seq.count(b) for b in "RYWSMKHBVD"]) + break + + count += 1 + + if count % 1000 == 0: + session.commit() + count = 0 + +session.commit() # last batch commit +session.close() diff --git a/setup.py b/setup.py index b62c574b..9e35b42d 100755 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ entry_points={ "console_scripts": [ "covigator-ena-accessor=covigator.command_line:ena_accessor", + "covigator-covid19portal-accessor=covigator.command_line:covid19_portal_accessor", "covigator-ena-downloader=covigator.command_line:ena_downloader", "covigator-processor=covigator.command_line:processor", "covigator-dashboard=covigator.dashboard.dashboard:main",