Skip to content

Commit

Permalink
open_or_gzopen -> compressed_open. Add support for zstd.
Browse files Browse the repository at this point in the history
Zstd has a nonstandard python api which requires customization. Also allows for multithreading
compression.
  • Loading branch information
yesimon committed Mar 8, 2019
1 parent 157cd7b commit 6698dfa
Show file tree
Hide file tree
Showing 22 changed files with 123 additions and 71 deletions.
8 changes: 4 additions & 4 deletions assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,8 +1080,8 @@ def main_filter_short_seqs(args):
'''Check sequences in inFile, retaining only those that are at least minLength'''
# orig by rsealfon, edited by dpark
# TO DO: make this more generalized to accept multiple minLengths (for multiple chromosomes/segments)
with util.file.open_or_gzopen(args.inFile) as inf:
with util.file.open_or_gzopen(args.outFile, 'w') as outf:
with util.file.compressed_open(args.inFile, 'rt') as inf:
with util.file.compressed_open(args.outFile, 'wt') as outf:
Bio.SeqIO.write(
[
s for s in Bio.SeqIO.parse(inf, args.format)
Expand Down Expand Up @@ -1619,8 +1619,8 @@ def deambig_fasta(inFasta, outFasta):
random unambiguous base from among the possibilities described by the ambiguity
code. Write output to fasta file.
'''
with util.file.open_or_gzopen(outFasta, 'wt') as outf:
with util.file.open_or_gzopen(inFasta, 'rt') as inf:
with util.file.compressed_open(outFasta, 'wt') as outf:
with util.file.compressed_open(inFasta, 'rt') as inf:
for record in Bio.SeqIO.parse(inf, 'fasta'):
for line in util.file.fastaMaker([(record.id, ''.join(map(deambig_base, str(record.seq))))]):
outf.write(line)
Expand Down
8 changes: 4 additions & 4 deletions file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ def merge_tarballs(out_tarball, in_tarballs, threads=None, extract_to_disk_path=
return 0
def parser_merge_tarballs(parser=argparse.ArgumentParser()):
parser.add_argument(
'out_tarball',
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-);
'out_tarball',
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst|-);
compression is inferred by the file extension.
Note: if "-" is used, output will be written to stdout and
--pipeOutHint must be provided to indicate compression type
when compression type is not gzip (gzip is used by default).
''')
parser.add_argument(
'in_tarballs', nargs='+',
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2)')
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst)')
)
parser.add_argument('--extractToDiskPath',
dest="extract_to_disk_path",
Expand All @@ -61,4 +61,4 @@ def full_parser():


if __name__ == '__main__':
util.cmd.main_argparse(__commands__, __doc__)
util.cmd.main_argparse(__commands__, __doc__)
2 changes: 1 addition & 1 deletion illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ def __init__(self, infile, use_sample_name=True, only_lane=None, allow_non_uniqu
def _detect_and_load_sheet(self, infile):
if infile.endswith(('.csv','.csv.gz')):
# one of a few possible CSV formats (watch out for line endings from other OSes)
with util.file.open_or_gzopen(infile, 'rU') as inf:
with util.file.compressed_open(infile, 'rt') as inf:
header = None
miseq_skip = False
row_num = 0
Expand Down
2 changes: 1 addition & 1 deletion interhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ def transposeChromosomeFiles(inputFilenamesList, sampleRelationFile=None, sample
outputFilenames = []

# open all files
inputFilesList = [util.file.open_or_gzopen(x, 'r') for x in inputFilenamesList]
inputFilesList = [util.file.compressed_open(x, 'rt') for x in inputFilenamesList]
# get BioPython iterators for each of the FASTA files specified in the input
fastaFiles = [SeqIO.parse(x, 'fasta') for x in inputFilesList]

Expand Down
20 changes: 10 additions & 10 deletions intrahost.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def vphaser_one_sample(inBam, inConsFasta, outTab, vphaserNumThreads=None,
filteredIter = filter_strand_bias(variantIter, minReadsEach, maxBias)

libraryFilteredIter = compute_library_bias(filteredIter, bam_to_process, inConsFasta)
with util.file.open_or_gzopen(outTab, 'wt') as outf:
with util.file.compressed_open(outTab, 'wt') as outf:
for row in libraryFilteredIter:
outf.write('\t'.join(row) + '\n')

Expand Down Expand Up @@ -471,12 +471,12 @@ def merge_to_vcf(

samplenames_from_alignments = set()
for alignmentFile in alignments:
with util.file.open_or_gzopen(alignmentFile, 'r') as inf:
with util.file.compressed_open(alignmentFile, 'rt') as inf:
for seq in Bio.SeqIO.parse(inf, 'fasta'):
samplenames_from_alignments.add(sampleIDMatch(seq.id))

refnames = set()
with util.file.open_or_gzopen(refFasta, 'r') as inf:
with util.file.compressed_open(refFasta, 'rt') as inf:
for seq in Bio.SeqIO.parse(inf, 'fasta'):
refnames.add(sampleIDMatch(seq.id))

Expand Down Expand Up @@ -512,7 +512,7 @@ def merge_to_vcf(
log.info(samp_to_isnv)

# get IDs and sequence lengths for reference sequence
with util.file.open_or_gzopen(refFasta, 'r') as inf:
with util.file.compressed_open(refFasta, 'rt') as inf:
ref_chrlens = list((seq.id, len(seq)) for seq in Bio.SeqIO.parse(inf, 'fasta'))

# use the output filepath specified if it is a .vcf, otherwise if it is gzipped we need
Expand Down Expand Up @@ -566,10 +566,10 @@ def merge_to_vcf(
# copy the list of alignment files
alignmentFiles = list(alignments)

with util.file.open_or_gzopen(refFasta, 'r') as inf:
with util.file.compressed_open(refFasta, 'rt') as inf:
for refSeq in Bio.SeqIO.parse(inf, 'fasta'):
for alignmentFile in alignmentFiles:
with util.file.open_or_gzopen(alignmentFile, 'r') as inf2:
with util.file.compressed_open(alignmentFile, 'r') as inf2:
for seq in Bio.SeqIO.parse(inf2, 'fasta'):
if refSeq.id == seq.id:
ref_seq_id_to_alignment_file[seq.id] = alignmentFile
Expand All @@ -583,7 +583,7 @@ def merge_to_vcf(
"There must be an isnv file for each sample. %s samples, %s isnv files" % (len(samples), len(isnvs)))

for fileName in alignments:
with util.file.open_or_gzopen(fileName, 'r') as inf:
with util.file.compressed_open(fileName, 'rt') as inf:
# get two independent iterators into the alignment file
number_of_aligned_sequences = count_iter_items(Bio.SeqIO.parse(inf, 'fasta'))
num_isnv_files = len(isnvs)
Expand Down Expand Up @@ -622,7 +622,7 @@ def merge_to_vcf(
samplesToUse = [x for x in cm.chrMaps.keys() if sampleIDMatch(x) in samples]

alignmentFile = ref_seq_id_to_alignment_file[ref_sequence.id]
with util.file.open_or_gzopen(alignmentFile, 'r') as alignFileIn:
with util.file.compressed_open(alignmentFile, 'rt') as alignFileIn:
for seq in Bio.SeqIO.parse(alignFileIn, 'fasta'):
for sampleName in samplesToUse:
if seq.id == sampleName:
Expand Down Expand Up @@ -964,7 +964,7 @@ def compute_Fws(vcfrow):
def add_Fws_vcf(inVcf, outVcf):
'''Compute the Fws statistic on iSNV data. See Manske, 2012 (Nature)'''
with open(outVcf, 'wt') as outf:
with util.file.open_or_gzopen(inVcf, 'rt') as inf:
with util.file.compressed_open(inVcf, 'rt') as inf:
for line in inf:
if line.startswith('##'):
outf.write(line)
Expand Down Expand Up @@ -1123,7 +1123,7 @@ def main_iSNV_table(args):
'''Convert VCF iSNV data to tabular text'''
header = ['chr', 'pos', 'sample', 'patient', 'time', 'alleles', 'iSNV_freq', 'Hw', 'Hs', 'eff_type',
'eff_codon_dna', 'eff_aa', 'eff_aa_pos', 'eff_prot_len', 'eff_gene', 'eff_protein']
with util.file.open_or_gzopen(args.outFile, 'wt') as outf:
with util.file.compressed_open(args.outFile, 'wt') as outf:
outf.write('\t'.join(header) + '\n')
for row in iSNV_table(util.file.read_tabfile_dict(args.inVcf)):
sample_parts = row['sample'].split('.')
Expand Down
12 changes: 6 additions & 6 deletions metagenomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import tools.krona
import tools.picard
import tools.samtools
from util.file import open_or_gzopen
from util.file import compressed_open

__commands__ = []

Expand Down Expand Up @@ -1030,7 +1030,7 @@ def sam_lca_report(tax_db, bam_aligned, outReport, outReads=None, unique_only=No
else:
lca_tsv = util.file.mkstempfname('.tsv')

with util.file.open_or_gzopen(lca_tsv, 'wt') as lca:
with compressed_open(lca_tsv, 'wt') as lca:
hits = sam_lca(tax_db, bam_aligned, lca, top_percent=10, unique_only=unique_only)

with open(outReport, 'w') as f:
Expand Down Expand Up @@ -1087,15 +1087,15 @@ def metagenomic_report_merge(metagenomic_reports, out_kraken_summary, kraken_db,
# if we're creating a Krona input file
if out_krona_input:
# open the output file (as gz if necessary)
with util.file.open_or_gzopen(out_krona_input, "wt") as outf:
with util.file.compressed_open(out_krona_input, "wt") as outf:
# create a TSV writer for the output file
output_writer = csv.writer(outf, delimiter='\t', lineterminator='\n')

if metagenomic_reports:
# for each Kraken-format metag file specified, pull out the appropriate columns
# and write them to the TSV output
for metag_file in metagenomic_reports:
with util.file.open_or_gzopen(metag_file.name, "rt") as inf:
with compressed_open(metag_file.name, "rt") as inf:
file_reader = csv.reader(inf, delimiter='\t')
for row in file_reader:
# for only the two relevant columns
Expand Down Expand Up @@ -1230,7 +1230,7 @@ def filter_bam_to_taxa( in_bam,

# perform the actual filtering to return a list of read IDs, writeen to a temp file
with util.file.tempfname(".txt.gz") as temp_read_list:
with open_or_gzopen(temp_read_list, "wt") as read_IDs_file:
with compressed_open(temp_read_list, "wt") as read_IDs_file:
read_ids_written = 0
for row in util.file.read_tabfile(read_IDs_to_tax_IDs):
assert tax_id_col<len(row), "tax_id_col does not appear to be in range for number of columns present in mapping file"
Expand Down Expand Up @@ -1305,7 +1305,7 @@ def indent_len(in_string):
sample_root_summary = {}
tax_headings_copy = [s.lower() for s in tax_headings]

with util.file.open_or_gzopen(f, 'rU') as inf:
with compressed_open(f, 'rt') as inf:
should_process = False
indent_of_selection = -1
currently_being_processed = ""
Expand Down
10 changes: 5 additions & 5 deletions ncbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ def tbl_transfer_prealigned(inputFasta, refFasta, refAnnotTblFiles, outputDir, o

# identify which of the sequences in the multialignment is the reference,
# matching by ID to one of the sequences in the refFasta
with util.file.open_or_gzopen(inputFasta, 'r') as inf:
with util.file.compressed_open(inputFasta, 'rt') as inf:
for seq in Bio.SeqIO.parse(inf, 'fasta'):
with util.file.open_or_gzopen(refFasta, 'r') as reff:
with util.file.compressed_open(refFasta, 'rt') as reff:
for refSeq in Bio.SeqIO.parse(reff, 'fasta'):
if seq.id == refSeq.id:
ref_fasta_filename = util.file.mkstempfname('.fasta')
Expand All @@ -220,7 +220,7 @@ def tbl_transfer_prealigned(inputFasta, refFasta, refAnnotTblFiles, outputDir, o
raise KeyError("No reference table was found for the reference %s" % (matchingRefSeq.id))

# write out the desired sequences to separate fasta files
with util.file.open_or_gzopen(inputFasta, 'r') as inf:
with util.file.compressed_open(inputFasta, 'rt') as inf:
for seq in Bio.SeqIO.parse(inf, 'fasta'):
# if we are looking at the reference sequence in the multialignment,
# continue to the next sequence
Expand Down Expand Up @@ -559,14 +559,14 @@ def prep_sra_table(lib_fname, biosampleFile, md5_fname, outFile):
'''
metadata = {}

with util.file.open_or_gzopen(biosampleFile, 'rU') as inf:
with util.file.compressed_open(biosampleFile, 'rt') as inf:
header = inf.readline()
for line in inf:
row = line.rstrip('\n\r').split('\t')
metadata.setdefault(row[0], {})
metadata[row[0]]['biosample_accession'] = row[1]

with util.file.open_or_gzopen(md5_fname, 'rU') as inf:
with util.file.compressed_open(md5_fname, 'rt') as inf:
for line in inf:
row = line.rstrip('\n\r').split()
s = os.path.basename(row[1])
Expand Down
6 changes: 3 additions & 3 deletions pipes/rules/common.rules
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def set_env_vars():
os.environ[k] = v

def read_tab_file(fname):
with util.file.open_or_gzopen(fname, 'rU') as inf:
with util.file.compressed_open(fname, 'rt') as inf:
header = [item.strip() for item in inf.readline().strip().rstrip('\n').split('\t')]
for line in inf:
if len(line.strip())==0:
Expand All @@ -44,7 +44,7 @@ def read_tab_file(fname):
def read_samples_file(fname, number_of_chromosomes=1, append_chrom_num=False):
if fname==None:
return []
with util.file.open_or_gzopen(fname, 'rU') as inf:
with util.file.compressed_open(fname, 'rt') as inf:
for line in inf:
if len(line.strip())==0:
continue
Expand All @@ -58,7 +58,7 @@ def read_samples_file(fname, number_of_chromosomes=1, append_chrom_num=False):
def read_accessions_file(fname):
if fname==None:
return []
with util.file.open_or_gzopen(fname, 'rU') as inf:
with util.file.compressed_open(fname, 'rt') as inf:
for line in inf:
if len(line.strip())==0:
continue
Expand Down
6 changes: 3 additions & 3 deletions read_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ def split_bam(inBam, outBams):
samtools.view(['-@', '3'], inBam, bigsam)

# split bigsam into little ones
with util.file.open_or_gzopen(bigsam, 'rt') as inf:
with util.file.compressed_open(bigsam, 'rt') as inf:
for outBam in outBams:
log.info("preparing file " + outBam)
tmp_sam_reads = mkstempfname('.sam')
Expand Down Expand Up @@ -834,7 +834,7 @@ def mvicuna_fastqs_to_readlist(inFastq1, inFastq2, readList):
# Make a list of reads to keep
with open(readList, 'at') as outf:
for fq in (outFastq1, outFastq2):
with util.file.open_or_gzopen(fq, 'rt') as inf:
with util.file.compressed_open(fq, 'rt') as inf:
line_num = 0
for line in inf:
if (line_num % 4) == 0:
Expand Down Expand Up @@ -1398,7 +1398,7 @@ def main_extract_tarball(*args, **kwargs):

def fasta_read_names(in_fasta, out_read_names):
"""Save the read names of reads in a .fasta file to a text file"""
with util.file.open_or_gzopen(in_fasta) as in_fasta_f, open(out_read_names, 'wt') as out_read_names_f:
with util.file.compressed_open(in_fasta, 'rt') as in_fasta_f, open(out_read_names, 'wt') as out_read_names_f:
last_read_name = None
for line in in_fasta_f:
if line.startswith('>'):
Expand Down
4 changes: 2 additions & 2 deletions reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def parser_alignment_summary(parser=argparse.ArgumentParser()):

def consolidate_fastqc(inDirs, outFile):
'''Consolidate multiple FASTQC reports into one.'''
with util.file.open_or_gzopen(outFile, 'wt') as outf:
with util.file.compressed_open(outFile, 'wt') as outf:
header = ['Sample']
out_n = 0
for sdir in inDirs:
Expand Down Expand Up @@ -395,7 +395,7 @@ def parser_consolidate_fastqc(parser=argparse.ArgumentParser()):
def get_bam_info(bamstats_dir):
libs = {}
for fn in glob.glob(os.path.join(bamstats_dir, "*.txt")):
with util.file.open_or_gzopen(fn, 'rt') as inf:
with util.file.compressed_open(fn, 'rt') as inf:
bam = {}
for line in inf:
k, v = line.rstrip('\n').split('\t')
Expand Down
2 changes: 2 additions & 0 deletions requirements-conda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trimmomatic=0.38
trinity=date.2011_11_26
unzip=6.0
vphaser2=2.0
zstd=1.3.8
# Python packages below
arrow=0.12.1
bedtools=2.27.1
Expand All @@ -39,3 +40,4 @@ matplotlib=1.5.3
pysam=0.15.0
pybedtools=0.7.10
PyYAML=3.12
zstandard=0.11.0
2 changes: 1 addition & 1 deletion taxon_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def _run_blastn_chunk(db, input_fasta, out_hits, blast_threads):
""" run blastn on the input fasta file. this is intended to be run in parallel
by blastn_chunked_fasta
"""
with util.file.open_or_gzopen(out_hits, 'wt') as outf:
with util.file.compressed_open(out_hits, 'wt') as outf:
for read_id in tools.blast.BlastnTool().get_hits_fasta(input_fasta, db, threads=blast_threads):
outf.write(read_id + '\n')

Expand Down
14 changes: 7 additions & 7 deletions test/integration/test_kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def test_kraken(kraken_db, input_bam):
args = parser.parse_args(cmd)
args.func_main(args)

with util.file.open_or_gzopen(out_reads, 'r') as inf:
with util.file.compressed_open(out_reads, 'rt') as inf:
assert len(inf.read()) > 0
with util.file.open_or_gzopen(out_report) as inf:
with util.file.compressed_open(out_report, 'rt') as inf:
report_lines = [x.strip().split() for x in inf.readlines()]

assert os.path.getsize(out_report) > 0
Expand Down Expand Up @@ -124,10 +124,10 @@ def test_kraken_multi(kraken_db):

# just check for non-empty outputs
for outfile in out_reads:
with util.file.open_or_gzopen(outfile, 'r') as inf:
with util.file.compressed_open(outfile, 'rt') as inf:
assert len(inf.read()) > 0
for outfile in out_reports:
with util.file.open_or_gzopen(outfile) as inf:
with util.file.compressed_open(outfile, 'rt') as inf:
assert len(inf.read()) > 0

@unittest.skip("this deadlocks currently...")
Expand All @@ -152,10 +152,10 @@ def test_kraken_fifo(kraken_db):

# just check for non-empty outputs
for outfile in out_reads:
with util.file.open_or_gzopen(outfile, 'r') as inf:
with util.file.compressed_open(outfile, 'rt') as inf:
assert len(inf.read()) > 0
for outfile in out_reports:
with util.file.open_or_gzopen(outfile) as inf:
with util.file.compressed_open(outfile, 'rt') as inf:
assert len(inf.read()) > 0

def test_kraken_krona(kraken_db, krona_db, input_bam):
Expand Down Expand Up @@ -183,7 +183,7 @@ def test_kraken_on_empty(kraken_db, input_bam):
args = parser.parse_args(cmd)
args.func_main(args)

with util.file.open_or_gzopen(out_reads, 'r') as inf:
with util.file.compressed_open(out_reads, 'rt') as inf:
assert len(inf.read()) == 0
with open(out_report, 'rt') as inf:
out_report_contents = inf.readlines()
Expand Down
4 changes: 2 additions & 2 deletions test/unit/test_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def test_ambig_align(self):
inDir = util.file.get_test_input_path(self)
contigs_gz = os.path.join(inDir, 'contigs.lasv.ambig.fasta.gz')
contigs = util.file.mkstempfname('.fasta')
with util.file.open_or_gzopen(contigs_gz, 'rb') as f_in:
with util.file.compressed_open(contigs_gz, 'rb') as f_in:
with open(contigs, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
expected = os.path.join(inDir, 'expected.lasv.ambig.fasta')
Expand All @@ -405,7 +405,7 @@ def test_ambig_align_ebov(self):
inDir = util.file.get_test_input_path(self)
contigs_gz = os.path.join(inDir, 'contigs.ebov.ambig.fasta.gz')
contigs = util.file.mkstempfname('.fasta')
with util.file.open_or_gzopen(contigs_gz, 'rb') as f_in:
with util.file.compressed_open(contigs_gz, 'rb') as f_in:
with open(contigs, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
expected = os.path.join(inDir, 'expected.ebov.ambig.fasta')
Expand Down
Loading

0 comments on commit 6698dfa

Please sign in to comment.