diff --git a/assembly.py b/assembly.py index 06023e7a3..3445bdc19 100755 --- a/assembly.py +++ b/assembly.py @@ -1080,8 +1080,8 @@ def main_filter_short_seqs(args): '''Check sequences in inFile, retaining only those that are at least minLength''' # orig by rsealfon, edited by dpark # TO DO: make this more generalized to accept multiple minLengths (for multiple chromosomes/segments) - with util.file.open_or_gzopen(args.inFile) as inf: - with util.file.open_or_gzopen(args.outFile, 'w') as outf: + with util.file.compressed_open(args.inFile, 'rt') as inf: + with util.file.compressed_open(args.outFile, 'wt') as outf: Bio.SeqIO.write( [ s for s in Bio.SeqIO.parse(inf, args.format) @@ -1619,8 +1619,8 @@ def deambig_fasta(inFasta, outFasta): random unambiguous base from among the possibilities described by the ambiguity code. Write output to fasta file. ''' - with util.file.open_or_gzopen(outFasta, 'wt') as outf: - with util.file.open_or_gzopen(inFasta, 'rt') as inf: + with util.file.compressed_open(outFasta, 'wt') as outf: + with util.file.compressed_open(inFasta, 'rt') as inf: for record in Bio.SeqIO.parse(inf, 'fasta'): for line in util.file.fastaMaker([(record.id, ''.join(map(deambig_base, str(record.seq))))]): outf.write(line) diff --git a/file_utils.py b/file_utils.py index 3b6fbf8bb..ab57de21a 100755 --- a/file_utils.py +++ b/file_utils.py @@ -27,8 +27,8 @@ def merge_tarballs(out_tarball, in_tarballs, threads=None, extract_to_disk_path= return 0 def parser_merge_tarballs(parser=argparse.ArgumentParser()): parser.add_argument( - 'out_tarball', - help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-); + 'out_tarball', + help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst|-); compression is inferred by the file extension. Note: if "-" is used, output will be written to stdout and --pipeOutHint must be provided to indicate compression type @@ -36,7 +36,7 @@ def parser_merge_tarballs(parser=argparse.ArgumentParser()): ''') parser.add_argument( 'in_tarballs', nargs='+', - help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2)') + help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst)') ) parser.add_argument('--extractToDiskPath', dest="extract_to_disk_path", @@ -61,4 +61,4 @@ def full_parser(): if __name__ == '__main__': - util.cmd.main_argparse(__commands__, __doc__) \ No newline at end of file + util.cmd.main_argparse(__commands__, __doc__) diff --git a/illumina.py b/illumina.py index 153d75d7b..fc868244c 100755 --- a/illumina.py +++ b/illumina.py @@ -44,7 +44,7 @@ def parser_illumina_demux(parser=argparse.ArgumentParser()): help='Output ExtractIlluminaBarcodes metrics file. Default is to dump to a temp file.', default=None) parser.add_argument('--commonBarcodes', - help='''Write a TSV report of all barcode counts, in descending order. + help='''Write a TSV report of all barcode counts, in descending order. Only applicable for read structures containing "B"''', default=None) parser.add_argument('--sampleSheet', @@ -89,7 +89,7 @@ def parser_illumina_demux(parser=argparse.ArgumentParser()): def main_illumina_demux(args): - ''' Read Illumina runs & produce BAM files, demultiplexing to one bam per sample, or + ''' Read Illumina runs & produce BAM files, demultiplexing to one bam per sample, or for simplex runs, a single bam will be produced bearing the flowcell ID. Wraps together Picard's ExtractBarcodes (for multiplexed samples) and IlluminaBasecallsToSam while handling the various required input formats. Also can @@ -126,13 +126,13 @@ def main_illumina_demux(args): link_locs=False # For HiSeq-4000/X runs, If Picard's CheckIlluminaDirectory is # called with LINK_LOCS=true, symlinks with absolute paths - # may be created, pointing from tile-specific *.locs to the + # may be created, pointing from tile-specific *.locs to the # single s.locs file in the Intensities directory. # These links may break if the run directory is moved. # We should begin by removing broken links, if present, # and call CheckIlluminaDirectory ourselves if a 's.locs' # file is present, but only if the directory check fails - # since link_locs=true tries to create symlinks even if they + # since link_locs=true tries to create symlinks even if they # (or the files) already exist try: tools.picard.CheckIlluminaDirectoryTool().execute( @@ -163,8 +163,8 @@ def main_illumina_demux(args): link_locs=link_locs ) - multiplexed_samples = True if 'B' in read_structure else False - + multiplexed_samples = True if 'B' in read_structure else False + if multiplexed_samples: assert samples is not None, "This looks like a multiplexed run since 'B' is in the read_structure: a SampleSheet must be given." else: @@ -308,19 +308,19 @@ def main_lane_metrics(args): def parser_common_barcodes(parser=argparse.ArgumentParser()): parser.add_argument('inDir', help='Illumina BCL directory (or tar.gz of BCL directory). This is the top-level run directory.') parser.add_argument('lane', help='Lane number.', type=int) - parser.add_argument('outSummary', help='''Path to the summary file (.tsv format). It includes several columns: - (barcode1, likely_index_name1, barcode2, likely_index_name2, count), - where likely index names are either the exact match index name for the barcode + parser.add_argument('outSummary', help='''Path to the summary file (.tsv format). It includes several columns: + (barcode1, likely_index_name1, barcode2, likely_index_name2, count), + where likely index names are either the exact match index name for the barcode sequence, or those Hamming distance of 1 away.''') parser.add_argument('--truncateToLength', help='If specified, only this number of barcodes will be returned. Useful if you only want the top N barcodes.', type=int, default=None) - parser.add_argument('--omitHeader', + parser.add_argument('--omitHeader', help='If specified, a header will not be added to the outSummary tsv file.', action='store_true') - parser.add_argument('--includeNoise', + parser.add_argument('--includeNoise', help='If specified, barcodes with periods (".") will be included.', action='store_true') parser.add_argument('--outMetrics', @@ -350,8 +350,8 @@ def parser_common_barcodes(parser=argparse.ArgumentParser()): return parser def main_common_barcodes(args): - ''' - Extract Illumina barcodes for a run and write a TSV report + ''' + Extract Illumina barcodes for a run and write a TSV report of the barcode counts in descending order ''' @@ -422,9 +422,9 @@ def sum_reducer(accumulator, element): count_to_write = truncateToLength if truncateToLength else len(barcode_counts) barcode_pairs_sorted_by_count = sorted(barcode_counts, key=barcode_counts.get, reverse=True)[:count_to_write] - mapped_counts = ( (k[:8], ",".join([x for x in illumina_reference.guess_index(k[:8], distance=1)] or ["Unknown"]), - k[8:], ",".join([x for x in illumina_reference.guess_index(k[8:], distance=1)] or ["Unknown"]), - barcode_counts[k]) + mapped_counts = ( (k[:8], ",".join([x for x in illumina_reference.guess_index(k[:8], distance=1)] or ["Unknown"]), + k[8:], ",".join([x for x in illumina_reference.guess_index(k[8:], distance=1)] or ["Unknown"]), + barcode_counts[k]) for k in barcode_pairs_sorted_by_count) # write the barcodes and their corresponding counts @@ -445,13 +445,13 @@ def sum_reducer(accumulator, element): def parser_guess_barcodes(parser=argparse.ArgumentParser()): parser.add_argument('in_barcodes', help='The barcode counts file produced by common_barcodes.') parser.add_argument('in_picard_metrics', help='The demultiplexing read metrics produced by Picard.') - parser.add_argument('out_summary_tsv', help='''Path to the summary file (.tsv format). It includes several columns: - (sample_name, expected_barcode_1, expected_barcode_2, - expected_barcode_1_name, expected_barcode_2_name, - expected_barcodes_read_count, guessed_barcode_1, - guessed_barcode_2, guessed_barcode_1_name, - guessed_barcode_2_name, guessed_barcodes_read_count, - match_type), + parser.add_argument('out_summary_tsv', help='''Path to the summary file (.tsv format). It includes several columns: + (sample_name, expected_barcode_1, expected_barcode_2, + expected_barcode_1_name, expected_barcode_2_name, + expected_barcodes_read_count, guessed_barcode_1, + guessed_barcode_2, guessed_barcode_1_name, + guessed_barcode_2_name, guessed_barcodes_read_count, + match_type), where the expected values are those used by Picard during demultiplexing and the guessed values are based on the barcodes seen among the data.''') @@ -465,11 +465,11 @@ def parser_guess_barcodes(parser=argparse.ArgumentParser()): help='If specified, only guess barcodes for these sample names.', type=str, default=None) - parser.add_argument('--outlier_threshold', + parser.add_argument('--outlier_threshold', help='threshold of how far from unbalanced a sample must be to be considered an outlier.', type=float, default=0.675) - parser.add_argument('--expected_assigned_fraction', + parser.add_argument('--expected_assigned_fraction', help='The fraction of reads expected to be assigned. An exception is raised if fewer than this fraction are assigned.', type=float, default=0.7) @@ -483,27 +483,27 @@ def parser_guess_barcodes(parser=argparse.ArgumentParser()): type=int, help='''The number of rows to use from the in_barcodes.''') - + util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmp_dir', None))) util.cmd.attach_main(parser, main_guess_barcodes, split_args=True) return parser -def main_guess_barcodes(in_barcodes, - in_picard_metrics, - out_summary_tsv, - sample_names, - outlier_threshold, - expected_assigned_fraction, - number_of_negative_controls, - readcount_threshold, +def main_guess_barcodes(in_barcodes, + in_picard_metrics, + out_summary_tsv, + sample_names, + outlier_threshold, + expected_assigned_fraction, + number_of_negative_controls, + readcount_threshold, rows_limit): """ Guess the barcode value for a sample name, based on the following: - a list is made of novel barcode pairs seen in the data, but not in the picard metrics - - for the sample in question, get the most abundant novel barcode pair where one of the + - for the sample in question, get the most abundant novel barcode pair where one of the barcodes seen in the data matches one of the barcodes in the picard metrics (partial match) - - if there are no partial matches, get the most abundant novel barcode pair + - if there are no partial matches, get the most abundant novel barcode pair Limitations: - If multiple samples share a barcode with multiple novel barcodes, disentangling them @@ -518,10 +518,10 @@ def main_guess_barcodes(in_barcodes, """ bh = util.illumina_indices.IlluminaBarcodeHelper(in_barcodes, in_picard_metrics, rows_limit) - guessed_barcodes = bh.find_uncertain_barcodes(sample_names=sample_names, - outlier_threshold=outlier_threshold, - expected_assigned_fraction=expected_assigned_fraction, - number_of_negative_controls=number_of_negative_controls, + guessed_barcodes = bh.find_uncertain_barcodes(sample_names=sample_names, + outlier_threshold=outlier_threshold, + expected_assigned_fraction=expected_assigned_fraction, + number_of_negative_controls=number_of_negative_controls, readcount_threshold=readcount_threshold) bh.write_guessed_barcodes(out_summary_tsv, guessed_barcodes) @@ -718,7 +718,7 @@ def __init__(self, infile, use_sample_name=True, only_lane=None, allow_non_uniqu def _detect_and_load_sheet(self, infile): if infile.endswith(('.csv','.csv.gz')): # one of a few possible CSV formats (watch out for line endings from other OSes) - with util.file.open_or_gzopen(infile, 'rU') as inf: + with util.file.compressed_open(infile, 'rU') as inf: header = None miseq_skip = False row_num = 0 diff --git a/interhost.py b/interhost.py index 7292e280c..4d10852d2 100755 --- a/interhost.py +++ b/interhost.py @@ -561,7 +561,7 @@ def transposeChromosomeFiles(inputFilenamesList, sampleRelationFile=None, sample outputFilenames = [] # open all files - inputFilesList = [util.file.open_or_gzopen(x, 'r') for x in inputFilenamesList] + inputFilesList = [util.file.compressed_open(x, 'rt') for x in inputFilenamesList] # get BioPython iterators for each of the FASTA files specified in the input fastaFiles = [SeqIO.parse(x, 'fasta') for x in inputFilesList] diff --git a/intrahost.py b/intrahost.py index 1d7c6d89e..1a7150514 100755 --- a/intrahost.py +++ b/intrahost.py @@ -146,7 +146,7 @@ def vphaser_one_sample(inBam, inConsFasta, outTab, vphaserNumThreads=None, filteredIter = filter_strand_bias(variantIter, minReadsEach, maxBias) libraryFilteredIter = compute_library_bias(filteredIter, bam_to_process, inConsFasta) - with util.file.open_or_gzopen(outTab, 'wt') as outf: + with util.file.compressed_open(outTab, 'wt') as outf: for row in libraryFilteredIter: outf.write('\t'.join(row) + '\n') @@ -471,12 +471,12 @@ def merge_to_vcf( samplenames_from_alignments = set() for alignmentFile in alignments: - with util.file.open_or_gzopen(alignmentFile, 'r') as inf: + with util.file.compressed_open(alignmentFile, 'rt') as inf: for seq in Bio.SeqIO.parse(inf, 'fasta'): samplenames_from_alignments.add(sampleIDMatch(seq.id)) refnames = set() - with util.file.open_or_gzopen(refFasta, 'r') as inf: + with util.file.compressed_open(refFasta, 'rt') as inf: for seq in Bio.SeqIO.parse(inf, 'fasta'): refnames.add(sampleIDMatch(seq.id)) @@ -512,7 +512,7 @@ def merge_to_vcf( log.info(samp_to_isnv) # get IDs and sequence lengths for reference sequence - with util.file.open_or_gzopen(refFasta, 'r') as inf: + with util.file.compressed_open(refFasta, 'rt') as inf: ref_chrlens = list((seq.id, len(seq)) for seq in Bio.SeqIO.parse(inf, 'fasta')) # use the output filepath specified if it is a .vcf, otherwise if it is gzipped we need @@ -566,10 +566,10 @@ def merge_to_vcf( # copy the list of alignment files alignmentFiles = list(alignments) - with util.file.open_or_gzopen(refFasta, 'r') as inf: + with util.file.compressed_open(refFasta, 'rt') as inf: for refSeq in Bio.SeqIO.parse(inf, 'fasta'): for alignmentFile in alignmentFiles: - with util.file.open_or_gzopen(alignmentFile, 'r') as inf2: + with util.file.compressed_open(alignmentFile, 'r') as inf2: for seq in Bio.SeqIO.parse(inf2, 'fasta'): if refSeq.id == seq.id: ref_seq_id_to_alignment_file[seq.id] = alignmentFile @@ -583,7 +583,7 @@ def merge_to_vcf( "There must be an isnv file for each sample. %s samples, %s isnv files" % (len(samples), len(isnvs))) for fileName in alignments: - with util.file.open_or_gzopen(fileName, 'r') as inf: + with util.file.compressed_open(fileName, 'rt') as inf: # get two independent iterators into the alignment file number_of_aligned_sequences = count_iter_items(Bio.SeqIO.parse(inf, 'fasta')) num_isnv_files = len(isnvs) @@ -622,7 +622,7 @@ def merge_to_vcf( samplesToUse = [x for x in cm.chrMaps.keys() if sampleIDMatch(x) in samples] alignmentFile = ref_seq_id_to_alignment_file[ref_sequence.id] - with util.file.open_or_gzopen(alignmentFile, 'r') as alignFileIn: + with util.file.compressed_open(alignmentFile, 'rt') as alignFileIn: for seq in Bio.SeqIO.parse(alignFileIn, 'fasta'): for sampleName in samplesToUse: if seq.id == sampleName: @@ -964,7 +964,7 @@ def compute_Fws(vcfrow): def add_Fws_vcf(inVcf, outVcf): '''Compute the Fws statistic on iSNV data. See Manske, 2012 (Nature)''' with open(outVcf, 'wt') as outf: - with util.file.open_or_gzopen(inVcf, 'rt') as inf: + with util.file.compressed_open(inVcf, 'rt') as inf: for line in inf: if line.startswith('##'): outf.write(line) @@ -1123,7 +1123,7 @@ def main_iSNV_table(args): '''Convert VCF iSNV data to tabular text''' header = ['chr', 'pos', 'sample', 'patient', 'time', 'alleles', 'iSNV_freq', 'Hw', 'Hs', 'eff_type', 'eff_codon_dna', 'eff_aa', 'eff_aa_pos', 'eff_prot_len', 'eff_gene', 'eff_protein'] - with util.file.open_or_gzopen(args.outFile, 'wt') as outf: + with util.file.compressed_open(args.outFile, 'wt') as outf: outf.write('\t'.join(header) + '\n') for row in iSNV_table(util.file.read_tabfile_dict(args.inVcf)): sample_parts = row['sample'].split('.') diff --git a/metagenomics.py b/metagenomics.py index 081a5a9fd..73b30a4bb 100755 --- a/metagenomics.py +++ b/metagenomics.py @@ -36,7 +36,7 @@ import tools.krona import tools.picard import tools.samtools -from util.file import open_or_gzopen +from util.file import compressed_open __commands__ = [] @@ -109,7 +109,7 @@ def __init__( def load_gi_single_dmp(self, dmp_path): '''Load a gi->taxid dmp file from NCBI taxonomy.''' gi_array = {} - with open_or_gzopen(dmp_path) as f: + with compressed_open(dmp_path) as f: for i, line in enumerate(f): gi, taxid = line.strip().split('\t') gi = int(gi) @@ -125,7 +125,7 @@ def load_names(self, names_db, scientific_only=True): names = {} else: names = collections.defaultdict(list) - for line in open_or_gzopen(names_db): + for line in compressed_open(names_db): parts = line.strip().split('|') taxid = int(parts[0]) name = parts[1].strip() @@ -142,7 +142,7 @@ def load_nodes(self, nodes_db): '''Load ranks and parents arrays from NCBI taxonomy.''' ranks = {} parents = {} - with open_or_gzopen(nodes_db) as f: + with compressed_open(nodes_db) as f: for line in f: parts = line.strip().split('|') taxid = int(parts[0]) @@ -589,8 +589,8 @@ def filter_file(path, sep='\t', taxid_column=0, gi_column=None, a2t=False, heade output_path = os.path.join(outputDb, path) input_path = maybe_compressed(input_path) - with open_or_gzopen(input_path, 'rt') as f, \ - open_or_gzopen(output_path, 'wt') as out_f: + with compressed_open(input_path, 'rt') as f, \ + compressed_open(output_path, 'wt') as out_f: if header: out_f.write(f.readline()) # Cannot use next(f) for python2 for line in f: @@ -868,7 +868,7 @@ def sam_lca_report(tax_db, bam_aligned, outReport, outReads=None, unique_only=No else: lca_tsv = util.file.mkstempfname('.tsv') - with util.file.open_or_gzopen(lca_tsv, 'wt') as lca: + with compressed_open(lca_tsv, 'wt') as lca: hits = sam_lca(tax_db, bam_aligned, lca, top_percent=10, unique_only=unique_only) with open(outReport, 'w') as f: @@ -923,7 +923,7 @@ def metagenomic_report_merge(metagenomic_reports, out_kraken_summary, kraken_db, # if we're creating a Krona input file if out_krona_input: # open the output file (as gz if necessary) - with util.file.open_or_gzopen(out_krona_input, "wt") as outf: + with util.file.compressed_open(out_krona_input, "wt") as outf: # create a TSV writer for the output file output_writer = csv.writer(outf, delimiter='\t', lineterminator='\n') @@ -931,7 +931,7 @@ def metagenomic_report_merge(metagenomic_reports, out_kraken_summary, kraken_db, # for each Kraken-format metag file specified, pull out the appropriate columns # and write them to the TSV output for metag_file in metagenomic_reports: - with util.file.open_or_gzopen(metag_file.name, "rt") as inf: + with compressed_open(metag_file.name, "rt") as inf: file_reader = csv.reader(inf, delimiter='\t') for row in file_reader: # for only the two relevant columns @@ -1047,7 +1047,7 @@ def filter_bam_to_taxa(in_bam, read_IDs_to_tax_IDs, out_bam, # perform the actual filtering to return a list of read IDs, writeen to a temp file with util.file.tempfname(".txt.gz") as temp_read_list: - with open_or_gzopen(temp_read_list, "wt") as read_IDs_file: + with compressed_open(temp_read_list, "wt") as read_IDs_file: read_ids_written = 0 for row in util.file.read_tabfile(read_IDs_to_tax_IDs): assert tax_id_col'): diff --git a/reports.py b/reports.py index 885671f97..2be2bf651 100755 --- a/reports.py +++ b/reports.py @@ -364,7 +364,7 @@ def parser_alignment_summary(parser=argparse.ArgumentParser()): def consolidate_fastqc(inDirs, outFile): '''Consolidate multiple FASTQC reports into one.''' - with util.file.open_or_gzopen(outFile, 'wt') as outf: + with util.file.compressed_open(outFile, 'wt') as outf: header = ['Sample'] out_n = 0 for sdir in inDirs: @@ -398,7 +398,7 @@ def parser_consolidate_fastqc(parser=argparse.ArgumentParser()): def get_bam_info(bamstats_dir): libs = {} for fn in glob.glob(os.path.join(bamstats_dir, "*.txt")): - with util.file.open_or_gzopen(fn, 'rt') as inf: + with util.file.compressed_open(fn, 'rt') as inf: bam = {} for line in inf: k, v = line.rstrip('\n').split('\t') diff --git a/requirements-conda.txt b/requirements-conda.txt index 7f2a5df76..664b488ba 100644 --- a/requirements-conda.txt +++ b/requirements-conda.txt @@ -1,5 +1,5 @@ blast=2.7.1 -bbmap=38.56 +bbmap=38.71 bmtagger=3.101 bwa=0.7.17 cd-hit=4.6.8 @@ -24,6 +24,7 @@ parallel=20160622 picard-slim=2.21.1 pigz=2.4 prinseq=0.20.4 +r-base=3.5.1 samtools=1.9 snpeff=4.3.1t spades=3.12.0 @@ -32,7 +33,7 @@ trimmomatic=0.38 trinity=date.2011_11_26 unzip=6.0 vphaser2=2.0 -r-base=3.5.1 +zstd=1.3.8 # Python packages below arrow=0.12.1 bedtools=2.28.0 @@ -42,3 +43,4 @@ matplotlib=2.2.4 pysam=0.15.0 pybedtools=0.7.10 PyYAML=5.1 +zstandard=0.11.0 diff --git a/taxon_filter.py b/taxon_filter.py index eec2f9e23..90d34131c 100755 --- a/taxon_filter.py +++ b/taxon_filter.py @@ -413,7 +413,7 @@ def _run_blastn_chunk(db, input_fasta, out_hits, blast_threads): """ run blastn on the input fasta file. this is intended to be run in parallel by blastn_chunked_fasta """ - with util.file.open_or_gzopen(out_hits, 'wt') as outf: + with util.file.compressed_open(out_hits, 'wt') as outf: for read_id in tools.blast.BlastnTool().get_hits_fasta(input_fasta, db, threads=blast_threads): outf.write(read_id + '\n') diff --git a/test/integration/test_krakenuniq.py b/test/integration/test_krakenuniq.py index b4b57cb36..c22048c7e 100644 --- a/test/integration/test_krakenuniq.py +++ b/test/integration/test_krakenuniq.py @@ -72,9 +72,9 @@ def test_krakenuniq(krakenuniq_db, input_bam): args = parser.parse_args(cmd) args.func_main(args) - with util.file.open_or_gzopen(out_reads, 'r') as inf: + with util.file.compressed_open(out_reads, 'r') as inf: assert len(inf.read()) > 0 - with util.file.open_or_gzopen(out_report) as inf: + with util.file.compressed_open(out_report) as inf: report_lines = [x.strip().split('\t') for x in inf.readlines()] report_lines = [x for x in report_lines if x] @@ -130,7 +130,7 @@ def test_krakenuniq_on_empty(krakenuniq_db, input_bam): args = parser.parse_args(cmd) args.func_main(args) - with util.file.open_or_gzopen(out_reads, 'r') as inf: + with util.file.compressed_open(out_reads, 'r') as inf: assert len(inf.read()) == 0 assert is_gz_file(out_reads) diff --git a/test/unit/test_assembly.py b/test/unit/test_assembly.py index ead135c69..ab27d61de 100644 --- a/test/unit/test_assembly.py +++ b/test/unit/test_assembly.py @@ -388,7 +388,7 @@ def test_ambig_align(self): inDir = util.file.get_test_input_path(self) contigs_gz = os.path.join(inDir, 'contigs.lasv.ambig.fasta.gz') contigs = util.file.mkstempfname('.fasta') - with util.file.open_or_gzopen(contigs_gz, 'rb') as f_in: + with util.file.compressed_open(contigs_gz, 'rb') as f_in: with open(contigs, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) expected = os.path.join(inDir, 'expected.lasv.ambig.fasta') @@ -405,7 +405,7 @@ def test_ambig_align_ebov(self): inDir = util.file.get_test_input_path(self) contigs_gz = os.path.join(inDir, 'contigs.ebov.ambig.fasta.gz') contigs = util.file.mkstempfname('.fasta') - with util.file.open_or_gzopen(contigs_gz, 'rb') as f_in: + with util.file.compressed_open(contigs_gz, 'rb') as f_in: with open(contigs, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) expected = os.path.join(inDir, 'expected.ebov.ambig.fasta') diff --git a/test/unit/test_intrahost.py b/test/unit/test_intrahost.py index b078ad45f..f660298c6 100644 --- a/test/unit/test_intrahost.py +++ b/test/unit/test_intrahost.py @@ -257,7 +257,7 @@ def multi_align_samples(self, retree=1): genomeKVIterator = self.genomeFastas.items() for sampleName, fastaFile in genomeKVIterator: - with util.file.open_or_gzopen(fastaFile, 'r') as inf: + with util.file.compressed_open(fastaFile, 'rt') as inf: for seq in Bio.SeqIO.parse(inf, 'fasta'): self.sequence_order.setdefault(sampleName, default=[]) self.sequence_order[sampleName].append(seq.id) @@ -305,14 +305,14 @@ def test_empty_output(self): #intrahost.merge_to_vcf(ref, outVcf, ['s1'], [emptyfile], [s1]) self.assertRaises(LookupError, intrahost.merge_to_vcf, ref, outVcf, ['s1'], [emptyfile], [s1]) self.assertGreater(os.path.getsize(outVcf), 0) - with util.file.open_or_gzopen(outVcf, 'rt') as inf: + with util.file.compressed_open(outVcf, 'rt') as inf: for line in inf: self.assertTrue(line.startswith('#')) outVcf = util.file.mkstempfname('.vcf.gz') #intrahost.merge_to_vcf(ref, outVcf, ['s1'], [emptyfile], [s1]) self.assertRaises(LookupError, intrahost.merge_to_vcf, ref, outVcf, ['s1'], [emptyfile], [s1]) self.assertGreater(os.path.getsize(outVcf), 0) - with util.file.open_or_gzopen(outVcf, 'rt') as inf: + with util.file.compressed_open(outVcf, 'rt') as inf: for line in inf: self.assertTrue(line.startswith('#')) diff --git a/test/unit/test_kmer_utils.py b/test/unit/test_kmer_utils.py index bf0bb7ddc..8e843db5c 100644 --- a/test/unit/test_kmer_utils.py +++ b/test/unit/test_kmer_utils.py @@ -53,7 +53,7 @@ def _yield_seq_recs(seq_file): t_fa = os.path.join(t_dir, 'bam2fa.fasta') tools.samtools.SamtoolsTool().bam2fa(seq_file, t_fa, append_mate_num=True) seq_file = t_fa - with util.file.open_or_gzopen(seq_file, 'rt') as seq_f: + with util.file.compressed_open(seq_file, 'rt') as seq_f: for rec in Bio.SeqIO.parse(seq_f, util.file.uncompressed_file_type(seq_file)[1:]): yield rec diff --git a/tools/kmc.py b/tools/kmc.py index 2d8b1d4c6..fe4a62fb6 100644 --- a/tools/kmc.py +++ b/tools/kmc.py @@ -154,7 +154,7 @@ def dump_kmer_counts(self, kmer_db, out_kmers, min_occs=1, max_occs=util.misc.MA def read_kmer_counts(self, kmer_counts_txt): """Read kmer counts from a file written by dump_kmer_counts, as collections.Counter""" counts = collections.Counter() - with util.file.open_or_gzopen(kmer_counts_txt) as kmers_f: + with util.file.compressed_open(kmer_counts_txt, 'rt') as kmers_f: for line in kmers_f: kmer, count = line.strip().split() counts[kmer] = int(count) diff --git a/tools/kraken.py b/tools/kraken.py index c3eef1acc..53425e56b 100644 --- a/tools/kraken.py +++ b/tools/kraken.py @@ -270,7 +270,7 @@ def execute(self, command, db, output, args=None, options=None, elif command == self.BINS['build']: subprocess.check_call(cmd) else: - with util.file.open_or_gzopen(output, 'w') as of: + with util.file.compressed_open(output, 'wt') as of: util.misc.run(cmd, stdout=of, stderr=subprocess.PIPE, check=True) diff --git a/tools/mummer.py b/tools/mummer.py index 948b2a491..9f70c7275 100644 --- a/tools/mummer.py +++ b/tools/mummer.py @@ -238,7 +238,7 @@ def scaffold_contigs_custom(self, refFasta, contigsFasta, outFasta, # load intervals into a FeatureSorter fs = util.misc.FeatureSorter() - with util.file.open_or_gzopen(tiling, 'rU') as inf: + with util.file.compressed_open(tiling, 'rt') as inf: for line in inf: row = line.rstrip('\n\r').split('\t') c = row[11] diff --git a/travis/install-conda.sh b/travis/install-conda.sh index 501dd2943..51d0853a2 100755 --- a/travis/install-conda.sh +++ b/travis/install-conda.sh @@ -12,6 +12,12 @@ if [ -d "$MINICONDA_DIR" ] && [ -e "$MINICONDA_DIR/bin/conda" ]; then export PATH="$MINICONDA_DIR/bin:$PATH" fi hash -r + conda config --set always_yes yes --set changeps1 no --set remote_max_retries 6 #--set channel_priority strict + conda config --add channels defaults + conda config --add channels bioconda + conda config --add channels conda-forge + conda config --add channels broad-viral + conda config --show-sources # print channels else # if it does not exist, we need to install miniconda rm -rf "$MINICONDA_DIR" # remove the directory in case we have an empty cached directory diff --git a/util/annot.py b/util/annot.py index 1744ba6af..f830d303a 100644 --- a/util/annot.py +++ b/util/annot.py @@ -44,7 +44,7 @@ def __init__(self, snpEffVcf=None, snpIterator=None): def loadVcf(self, snpEffVcf): #log.info("reading in snpEff VCF file: %s" % snpEffVcf) - with util.file.open_or_gzopen(snpEffVcf, 'rt') as inf: + with util.file.compressed_open(snpEffVcf, 'rt') as inf: ffp = util.file.FlatFileParser(inf) try: imap = hasattr(itertools, 'imap') and itertools.imap or map # py2 & py3 compatibility diff --git a/util/file.py b/util/file.py index 6c7305a52..6795d7fa6 100644 --- a/util/file.py +++ b/util/file.py @@ -5,6 +5,7 @@ __author__ = "dpark@broadinstitute.org" +import bz2 import codecs import contextlib import os @@ -21,6 +22,7 @@ import csv import inspect import tarfile +import zstd import util.cmd import util.misc @@ -329,6 +331,49 @@ def touch_p(path, times=None): touch(path, times=times) +@contextlib.contextmanager +def zstd_open(fname, mode='r'): + '''Handle both text and byte decompression of the file.''' + if 'r' in mode: + with open(fname, 'rb') as fh: + dctx = zstd.ZstdDecompressor() + stream_reader = dctx.stream_reader(fh) + if 'b' not in mode: + text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') + yield text_stream + return + yield stream_reader + else: + with open(fname, 'wb') as fh: + cctx = zstd.ZstdCompressor(level=kwargs.get('level', 10), + threads=kwargs.get('threads', 1)) + stream_writer = cctx.stream_writer(fh) + if 'b' not in mode: + text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') + yield text_stream + return + yield stream_writer + + +def compressed_open(fname, mode='r', **kwargs): + '''Create file-like context manager from an optionally compressed filename. + + Supports regular files, gzip, bz2, and zstd. + ''' + + if fname.endswith('.gz'): + # Allow using 'level' kwarg as an alias for gzip files. + if 'level' in kwargs: + kwargs['compresslevel'] = kwargs.pop('level') + return gzip.open(fname, mode=mode, **kwargs) + elif fname.endswith('.bz2'): + return bz2.open(fname, mode=mode, **kwargs) + elif fname.endswith('.zst'): + return zstd_open(fname, mode=mode, **kwargs) + else: + return open(fname, mode=mode, **kwargs) + + def open_or_gzopen(fname, *opts, **kwargs): mode = 'r' open_opts = list(opts) @@ -403,8 +448,8 @@ def read_tabfile(inFile): ''' Read a tab text file (possibly gzipped) and return contents as an iterator of arrays. ''' - with open_or_gzopen(inFile, 'rU') as inf: - for line_no,line in enumerate(inf): + with compressed_open(inFile, 'rt') as inf: + for line_no, line in enumerate(inf): if line_no==0: # remove BOM, if present line = line.replace('\ufeff','') @@ -413,7 +458,7 @@ def read_tabfile(inFile): def readFlatFileHeader(filename, headerPrefix='#', delim='\t'): - with open_or_gzopen(filename, 'rt') as inf: + with compressed_open(filename, 'rt') as inf: header = inf.readline().rstrip('\n').split(delim) if header and header[0].startswith(headerPrefix): header[0] = header[0][len(headerPrefix):] @@ -591,9 +636,9 @@ def replace_in_file(filename, original, new): def cat(output_file, input_files): '''Cat list of input filenames to output filename.''' - with open_or_gzopen(output_file, 'wb') as wfd: + with compressed_open(output_file, 'wb') as wfd: for f in input_files: - with open_or_gzopen(f, 'rb') as fd: + with compressed_open(f, 'rb') as fd: shutil.copyfileobj(fd, wfd, 1024*1024*10) @@ -832,7 +877,7 @@ def count_fastq_reads(inFastq): def line_count(infname): n = 0 - with open_or_gzopen(infname, 'rt') as inf: + with compressed_open(infname, 'rt') as inf: for line in inf: n += 1 return n @@ -859,7 +904,7 @@ def slurp_file(fname, maxSizeMb=50): if maxSizeMb and fileSize > maxSizeMb*1024*1024: raise RuntimeError('Tried to slurp large file {} (size={}); are you sure? Increase `maxSizeMb` param if yes'. format(fname, fileSize)) - with open_or_gzopen(fname) as f: + with compressed_open(fname) as f: return f.read() def is_broken_link(filename): @@ -930,6 +975,10 @@ def choose_compressor(filepath, threads=8): compressor = ['lz4'] return_obj["decompress_cmd"] = compressor + ["-dc"] return_obj["compress_cmd"] = compressor + ["-c"] + elif re.search(r'\.?zst$', filepath): + compressor = ['zstd'] + return_obj["decompress_cmd"] = compressor + ["-dc"] + return_obj["compress_cmd"] = compressor + ["-c"] elif re.search(r'\.?tar$', filepath): compressor = ['cat'] return_obj["decompress_cmd"] = compressor @@ -973,7 +1022,7 @@ def read(self, size): compressor = choose_compressor(pipe_hint_out)["compress_cmd"] outfile = None else: - compressor =choose_compressor(out_compressed_tarball)["compress_cmd"] + compressor = choose_compressor(out_compressed_tarball)["compress_cmd"] outfile = open(out_compressed_tarball, "w") out_compress_ps = subprocess.Popen(compressor, stdout=sys.stdout if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE) @@ -982,12 +1031,12 @@ def read(self, size): for in_compressed_tarball in input_compressed_tarballs: if in_compressed_tarball != "-": - pigz_ps = subprocess.Popen(choose_compressor(in_compressed_tarball)["decompress_cmd"] + [in_compressed_tarball], stdout=subprocess.PIPE) + unz_ps = subprocess.Popen(choose_compressor(in_compressed_tarball)["decompress_cmd"] + [in_compressed_tarball], stdout=subprocess.PIPE) else: if not pipe_hint_in: raise IOError("cannot autodetect compression for stdin unless pipeInHint provided") - pigz_ps = subprocess.Popen(choose_compressor(pipe_hint_in)["decompress_cmd"] + [in_compressed_tarball], stdout=subprocess.PIPE, stdin=sys.stdin) - tar_in = tarfile.open(fileobj=pigz_ps.stdout, mode="r|", ignore_zeros=True) + unz_ps = subprocess.Popen(choose_compressor(pipe_hint_in)["decompress_cmd"] + [in_compressed_tarball], stdout=subprocess.PIPE, stdin=sys.stdin) + tar_in = tarfile.open(fileobj=unz_ps.stdout, mode="r|", ignore_zeros=True) fileinfo = tar_in.next() while fileinfo is not None: @@ -1011,10 +1060,10 @@ def read(self, size): tar_out.addfile(fileinfo, fileobj=fileobj) fileinfo = tar_in.next() - pigz_ps.wait() + unz_ps.wait() tar_in.close() - if pigz_ps.returncode != 0: - raise subprocess.CalledProcessError(pigz_ps.returncode, "Call error %s" % pigz_ps.returncode) + if unz_ps.returncode != 0: + raise subprocess.CalledProcessError(unz_ps.returncode, "Call error %s" % unz_ps.returncode) tar_out.close() out_compress_ps.stdin.close() diff --git a/util/vcf.py b/util/vcf.py index b5a867ada..06ab261ba 100644 --- a/util/vcf.py +++ b/util/vcf.py @@ -127,7 +127,7 @@ def get_chrlens(inFile): c_len = int(row[2][3:]) chrlens.append((c, c_len)) elif inFile.endswith('.vcf') or inFile.endswith('.vcf.gz'): - with util.file.open_or_gzopen(inFile, 'rt') as inf: + with util.file.compressed_open(inFile, 'rt') as inf: for line in inf: line = line.rstrip('\n') if line.startswith('##contig='):