Skip to content

Commit

Permalink
bring in more zstd updates from @yesimon PR broadinstitute/viral-ngs#937
Browse files Browse the repository at this point in the history
 and re-add all the compressors as conda dependencies (instead of relying on apt packages in the docker image) so as to facilitate the re-enabling of the conda package builds
  • Loading branch information
dpark01 committed Nov 7, 2019
1 parent e3859eb commit 8096cf4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 34 deletions.
4 changes: 2 additions & 2 deletions file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def merge_tarballs(out_tarball, in_tarballs, threads=None, extract_to_disk_path=
def parser_merge_tarballs(parser=argparse.ArgumentParser()):
parser.add_argument(
'out_tarball',
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-);
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst|-);
compression is inferred by the file extension.
Note: if "-" is used, output will be written to stdout and
--pipeOutHint must be provided to indicate compression type
when compression type is not gzip (gzip is used by default).
''')
parser.add_argument(
'in_tarballs', nargs='+',
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2)')
help=('input tarballs (*.tar.gz|*.tar.lz4|*.tar.bz2|*.tar.zst)')
)
parser.add_argument('--extractToDiskPath',
dest="extract_to_disk_path",
Expand Down
8 changes: 7 additions & 1 deletion requirements-conda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ cd-hit=4.6.8
cd-hit-auxtools=4.6.8
fastqc=0.11.7
gatk=3.8
lbzip2=2.5
lz4-c=1.9.1
mvicuna=1.0
novoalign=3.07.00
parallel=20190922
picard-slim=2.21.1
pigz=2.4
prinseq=0.20.4
#r-base=3.5.1
samtools=1.9
trimmomatic=0.38
unzip=6.0
zstd=1.3.8
# Python packages below
arrow=0.12.1
bedtools=2.28.0
biopython=1.72
matplotlib=2.2.4
pysam=0.15.0
pybedtools=0.7.10
zstandard=0.11.0
74 changes: 43 additions & 31 deletions util/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import contextlib
import os
import gzip
import bz2
import zstd
import io
import tempfile
import subprocess
Expand Down Expand Up @@ -328,40 +330,50 @@ def touch_p(path, times=None):
touch(path, times=times)


def open_or_gzopen(fname, *opts, **kwargs):
mode = 'r'
open_opts = list(opts)
@contextlib.contextmanager
def zstd_open(fname, mode='r'):
'''Handle both text and byte decompression of the file.'''
if 'r' in mode:
with open(fname, 'rb') as fh:
dctx = zstd.ZstdDecompressor()
stream_reader = dctx.stream_reader(fh)
if 'b' not in mode:
text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
yield text_stream
return
yield stream_reader
else:
with open(fname, 'wb') as fh:
cctx = zstd.ZstdCompressor(level=kwargs.get('level', 10),
threads=kwargs.get('threads', 1))
stream_writer = cctx.stream_writer(fh)
if 'b' not in mode:
text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
yield text_stream
return
yield stream_writer

def open_or_gzopen(fname, mode='r', **kwargs):
assert type(mode) == str, "open mode must be of type str"

# 'U' mode is deprecated in py3 and may be unsupported in future versions,
# so use newline=None when 'U' is specified
if len(open_opts) > 0:
mode = open_opts[0]
if sys.version_info[0] == 3:
if 'U' in mode:
if 'newline' not in kwargs:
kwargs['newline'] = None
open_opts[0] = mode.replace("U","")

# if this is a gzip file
if 'U' in mode:
if 'newline' not in kwargs:
kwargs['newline'] = None
mode = mode.replace("U","")

if fname.endswith('.gz'):
# if text read mode is desired (by spec or default)
if ('b' not in mode) and (len(open_opts)==0 or 'r' in mode):
# if python 2
if sys.version_info[0] == 2:
# gzip.open() under py2 does not support universal newlines
# so we need to wrap it with something that does
# By ignoring errors in BufferedReader, errors should be handled by TextIoWrapper
return io.TextIOWrapper(io.BufferedReader(gzip.open(fname)))

# if 't' for text mode is not explicitly included,
# replace "U" with "t" since under gzip "rb" is the
# default and "U" depends on "rt"
gz_mode = str(mode).replace("U","" if "t" in mode else "t")
gz_opts = [gz_mode]+list(opts)[1:]
return gzip.open(fname, *gz_opts, **kwargs)
# Allow using 'level' kwarg as an alias for gzip files.
if 'level' in kwargs:
kwargs['compresslevel'] = kwargs.pop('level')
return gzip.open(fname, mode=mode, **kwargs)
elif fname.endswith('.bz2'):
return bz2.open(fname, mode=mode, **kwargs)
elif fname.endswith('.zst'):
return zstd_open(fname, mode=mode, **kwargs)
else:
return open(fname, *open_opts, **kwargs)
return open(fname, mode=mode, **kwargs)


def read_tabfile_dict(inFile, header_prefix="#", skip_prefix=None, rowcount_limit=None):
Expand Down Expand Up @@ -986,8 +998,8 @@ def choose_compressor(filepath, threads=8):
return_obj["compress_cmd"] = compressor + ["-c"]
elif re.search(r'\.?zst$', filepath):
compressor = ['zstd']
return_obj["decompress_cmd"] = compressor + ["-d"]
return_obj["compress_cmd"] = compressor + ["-19"]
return_obj["decompress_cmd"] = compressor + ["-dc"]
return_obj["compress_cmd"] = compressor + ["-c19"]
elif re.search(r'\.?tar$', filepath):
compressor = ['cat']
return_obj["decompress_cmd"] = compressor
Expand Down Expand Up @@ -1031,7 +1043,7 @@ def read(self, size):
compressor = choose_compressor(pipe_hint_out)["compress_cmd"]
outfile = None
else:
compressor =choose_compressor(out_compressed_tarball)["compress_cmd"]
compressor = choose_compressor(out_compressed_tarball)["compress_cmd"]
outfile = open(out_compressed_tarball, "w")

out_compress_ps = subprocess.Popen(compressor, stdout=sys.stdout if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE)
Expand Down

0 comments on commit 8096cf4

Please sign in to comment.