From 8ecb0fafd5cf2d725e3633984c009508471f0680 Mon Sep 17 00:00:00 2001 From: milesgranger Date: Wed, 1 Sep 2021 20:06:20 +0200 Subject: [PATCH] Support Compressor for each variant --- Cargo.toml | 2 +- src/brotli.rs | 34 +++++++++++++++++++++++++++++++++- src/deflate.rs | 35 ++++++++++++++++++++++++++++++++++- src/gzip.rs | 34 +++++++++++++++++++++++++++++++++- src/io.rs | 40 ++++++++++++++++++++++++++++++++++++++++ src/lz4.rs | 39 ++++++++++++++++++++++++++++++++++++++- src/snappy.rs | 28 ++++++++++++++++++++++++++++ src/zstd.rs | 33 ++++++++++++++++++++++++++++++++- tests/test_variants.py | 28 ++++++++++++++++++++++++++++ 9 files changed, 267 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index af5eefd5..098c2c0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cramjam" -version = "2.4.0" +version = "2.4.0-rc1" authors = ["Miles Granger "] edition = "2018" license = "MIT License" diff --git a/src/brotli.rs b/src/brotli.rs index e6e06ee0..b81c0f68 100644 --- a/src/brotli.rs +++ b/src/brotli.rs @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction; use pyo3::PyResult; use std::io::Cursor; +const DEFAULT_COMPRESSION_LEVEL: u32 = 11; + pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(compress, m)?)?; m.add_function(wrap_pyfunction!(decompress, m)?)?; m.add_function(wrap_pyfunction!(compress_into, m)?)?; m.add_function(wrap_pyfunction!(decompress_into, m)?)?; + m.add_class::()?; Ok(()) } @@ -53,8 +56,37 @@ pub fn decompress_into(input: BytesType, mut output: BytesType) -> PyResult>>>, +} + +#[pymethods] +impl Compressor { + /// Initialize a new `Compressor` instance. + #[new] + pub fn __init__(level: Option) -> PyResult { + let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL); + let inner = brotli2::write::BrotliEncoder::new(Cursor::new(vec![]), level); + Ok(Self { inner: Some(inner) }) + } + + /// Compress input into the current compressor's stream. + pub fn compress(&mut self, input: &[u8]) -> PyResult { + crate::io::stream_compress(&mut self.inner, input) + } + + /// Consume the current compressor state and return the compressed stream + /// **NB** The compressor will not be usable after this method is called. + pub fn finish(&mut self) -> PyResult { + crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|c| c.into_inner())) + } +} + pub(crate) mod internal { + use crate::brotli::DEFAULT_COMPRESSION_LEVEL; use brotli2::read::{BrotliDecoder, BrotliEncoder}; use std::io::prelude::*; use std::io::Error; @@ -68,7 +100,7 @@ pub(crate) mod internal { /// Compress via Brotli pub fn compress(input: R, output: &mut W, level: Option) -> Result { - let level = level.unwrap_or_else(|| 11); + let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL); let mut encoder = BrotliEncoder::new(input, level); let n_bytes = std::io::copy(&mut encoder, output)?; Ok(n_bytes as usize) diff --git a/src/deflate.rs b/src/deflate.rs index c8e7d4e0..10fc37a6 100644 --- a/src/deflate.rs +++ b/src/deflate.rs @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction; use pyo3::PyResult; use std::io::Cursor; +const DEFAULT_COMPRESSION_LEVEL: u32 = 6; + pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(compress, m)?)?; m.add_function(wrap_pyfunction!(decompress, m)?)?; m.add_function(wrap_pyfunction!(compress_into, m)?)?; m.add_function(wrap_pyfunction!(decompress_into, m)?)?; + m.add_class::()?; Ok(()) } @@ -53,8 +56,38 @@ pub fn decompress_into(input: BytesType, mut output: BytesType) -> PyResult>>>, +} + +#[pymethods] +impl Compressor { + /// Initialize a new `Compressor` instance. + #[new] + pub fn __init__(level: Option) -> PyResult { + let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL); + let compression = flate2::Compression::new(level); + let inner = flate2::write::DeflateEncoder::new(Cursor::new(vec![]), compression); + Ok(Self { inner: Some(inner) }) + } + + /// Compress input into the current compressor's stream. + pub fn compress(&mut self, input: &[u8]) -> PyResult { + crate::io::stream_compress(&mut self.inner, input) + } + + /// Consume the current compressor state and return the compressed stream + /// **NB** The compressor will not be usable after this method is called. + pub fn finish(&mut self) -> PyResult { + crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|c| c.into_inner())) + } +} + pub(crate) mod internal { + use crate::deflate::DEFAULT_COMPRESSION_LEVEL; use flate2::read::{DeflateDecoder, DeflateEncoder}; use flate2::Compression; use std::io::prelude::*; @@ -69,7 +102,7 @@ pub(crate) mod internal { /// Compress gzip data pub fn compress(input: R, output: &mut W, level: Option) -> Result { - let level = level.unwrap_or_else(|| 6); + let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL); let mut encoder = DeflateEncoder::new(input, Compression::new(level)); let n_bytes = std::io::copy(&mut encoder, output)?; diff --git a/src/gzip.rs b/src/gzip.rs index ded656a6..12a9fc47 100644 --- a/src/gzip.rs +++ b/src/gzip.rs @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction; use pyo3::PyResult; use std::io::Cursor; +const DEFAULT_COMPRESSION_LEVEL: u32 = 6; + pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(compress, m)?)?; m.add_function(wrap_pyfunction!(decompress, m)?)?; m.add_function(wrap_pyfunction!(compress_into, m)?)?; m.add_function(wrap_pyfunction!(decompress_into, m)?)?; + m.add_class::()?; Ok(()) } @@ -53,7 +56,36 @@ pub fn decompress_into(input: BytesType, mut output: BytesType) -> PyResult>>>, +} + +#[pymethods] +impl Compressor { + /// Initialize a new `Compressor` instance. + #[new] + pub fn __init__(level: Option) -> PyResult { + let level = level.unwrap_or(DEFAULT_COMPRESSION_LEVEL); + let inner = flate2::write::GzEncoder::new(Cursor::new(vec![]), flate2::Compression::new(level)); + Ok(Self { inner: Some(inner) }) + } + + /// Compress input into the current compressor's stream. + pub fn compress(&mut self, input: &[u8]) -> PyResult { + crate::io::stream_compress(&mut self.inner, input) + } + + /// Consume the current compressor state and return the compressed stream + /// **NB** The compressor will not be usable after this method is called. + pub fn finish(&mut self) -> PyResult { + crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|c| c.into_inner())) + } +} + pub(crate) mod internal { + use crate::gzip::DEFAULT_COMPRESSION_LEVEL; use flate2::read::{GzEncoder, MultiGzDecoder}; use flate2::Compression; use std::io::prelude::*; @@ -70,7 +102,7 @@ pub(crate) mod internal { /// Compress gzip data pub fn compress(input: R, output: &mut W, level: Option) -> Result { - let level = level.unwrap_or_else(|| 6); + let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL); let mut encoder = GzEncoder::new(input, Compression::new(level)); let n_bytes = std::io::copy(&mut encoder, output)?; Ok(n_bytes as usize) diff --git a/src/io.rs b/src/io.rs index b7b79396..9dab764b 100644 --- a/src/io.rs +++ b/src/io.rs @@ -5,6 +5,7 @@ use std::fs::{File, OpenOptions}; use std::io::{copy, Cursor, Read, Seek, SeekFrom, Write}; +use crate::exceptions::CompressionError; use crate::BytesType; use numpy::PyArray1; use pyo3::class::buffer::PyBufferProtocol; @@ -595,3 +596,42 @@ impl Read for RustyFile { self.inner.read(buf) } } + +// general stream compression interface. Can't use associated types due to pyo3::pyclass +// not supporting generic structs. +#[inline(always)] +pub(crate) fn stream_compress(encoder: &mut Option, input: &[u8]) -> PyResult { + match encoder { + Some(encoder) => { + let result = std::io::copy(&mut Cursor::new(input), encoder).map(|v| v as usize); + crate::to_py_err!(CompressionError -> result) + } + None => Err(CompressionError::new_err( + "Compressor looks to have been consumed via `finish()`. \ + please create a new compressor instance.", + )), + } +} + +// general stream finish interface. Can't use associated types due to pyo3::pyclass +// not supporting generic structs. +#[inline(always)] +pub(crate) fn stream_finish(encoder: &mut Option, into_vec: F) -> PyResult +where + W: Write, + E: ToString, + F: Fn(W) -> Result, E>, +{ + // &mut encoder is part of a Compressor, often the .finish portion consumes + // the struct; which cannot be done with pyclass. So we'll swap it out for None + let mut detached_encoder = None; + std::mem::swap(&mut detached_encoder, encoder); + + match detached_encoder { + Some(encoder) => { + let result = crate::to_py_err!(CompressionError -> into_vec(encoder))?; + Ok(RustyBuffer::from(result)) + } + None => Ok(RustyBuffer::from(vec![])), + } +} diff --git a/src/lz4.rs b/src/lz4.rs index 2bd6d5d4..07881ae9 100644 --- a/src/lz4.rs +++ b/src/lz4.rs @@ -7,6 +7,8 @@ use pyo3::wrap_pyfunction; use pyo3::PyResult; use std::io::Cursor; +const DEFAULT_COMPRESSION_LEVEL: u32 = 4; + pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(compress, m)?)?; m.add_function(wrap_pyfunction!(decompress, m)?)?; @@ -14,6 +16,7 @@ pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(decompress_block, m)?)?; m.add_function(wrap_pyfunction!(compress_into, m)?)?; m.add_function(wrap_pyfunction!(decompress_into, m)?)?; + m.add_class::()?; Ok(()) } @@ -117,7 +120,41 @@ pub fn compress_block( Ok(RustyBuffer::from(out)) } +/// Snappy Compressor object for streaming compression +#[pyclass] +pub struct Compressor { + inner: Option>>>, +} + +#[pymethods] +impl Compressor { + /// Initialize a new `Compressor` instance. + #[new] + pub fn __init__(level: Option) -> PyResult { + let inner = lz4::EncoderBuilder::new() + .auto_flush(true) + .level(level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL)) + .build(Cursor::new(vec![]))?; + Ok(Self { inner: Some(inner) }) + } + + /// Compress input into the current compressor's stream. + pub fn compress(&mut self, input: &[u8]) -> PyResult { + crate::io::stream_compress(&mut self.inner, input) + } + + /// Consume the current compressor state and return the compressed stream + /// **NB** The compressor will not be usable after this method is called. + pub fn finish(&mut self) -> PyResult { + crate::io::stream_finish(&mut self.inner, |inner| { + let (cursor, result) = inner.finish(); + result.map(|_| cursor.into_inner()) + }) + } +} + pub(crate) mod internal { + use crate::lz4::DEFAULT_COMPRESSION_LEVEL; use lz4::{Decoder, EncoderBuilder}; use std::io::{Error, Read, Seek, SeekFrom, Write}; @@ -138,7 +175,7 @@ pub(crate) mod internal { let start_pos = output.seek(SeekFrom::Current(0))?; let mut encoder = EncoderBuilder::new() .auto_flush(true) - .level(level.unwrap_or_else(|| 4)) + .level(level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL)) .build(output)?; // this returns, bytes read from uncompressed, input; we want bytes written diff --git a/src/snappy.rs b/src/snappy.rs index e83181ba..b5ac7474 100644 --- a/src/snappy.rs +++ b/src/snappy.rs @@ -18,6 +18,7 @@ pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(decompress_raw_into, m)?)?; m.add_function(wrap_pyfunction!(compress_raw_max_len, m)?)?; m.add_function(wrap_pyfunction!(decompress_raw_len, m)?)?; + m.add_class::()?; Ok(()) } @@ -123,6 +124,33 @@ pub fn decompress_raw_len(data: BytesType) -> PyResult { to_py_err!(DecompressionError -> snap::raw::decompress_len(data.as_bytes())) } +/// Snappy Compressor object for streaming compression +#[pyclass] +pub struct Compressor { + inner: Option>>>, +} + +#[pymethods] +impl Compressor { + /// Initialize a new `Compressor` instance. + #[new] + pub fn __init__() -> PyResult { + let inner = snap::write::FrameEncoder::new(Cursor::new(vec![])); + Ok(Self { inner: Some(inner) }) + } + + /// Compress input into the current compressor's stream. + pub fn compress(&mut self, input: &[u8]) -> PyResult { + crate::io::stream_compress(&mut self.inner, input) + } + + /// Consume the current compressor state and return the compressed stream + /// **NB** The compressor will not be usable after this method is called. + pub fn finish(&mut self) -> PyResult { + crate::io::stream_finish(&mut self.inner, |inner| inner.into_inner().map(|c| c.into_inner())) + } +} + pub(crate) mod internal { use snap::read::{FrameDecoder, FrameEncoder}; use std::io::{Error, Read, Write}; diff --git a/src/zstd.rs b/src/zstd.rs index bceee059..9c0eb3ec 100644 --- a/src/zstd.rs +++ b/src/zstd.rs @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction; use pyo3::PyResult; use std::io::Cursor; +const DEFAULT_COMPRESSION_LEVEL: i32 = 0; + pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(compress, m)?)?; m.add_function(wrap_pyfunction!(decompress, m)?)?; m.add_function(wrap_pyfunction!(compress_into, m)?)?; m.add_function(wrap_pyfunction!(decompress_into, m)?)?; + m.add_class::()?; Ok(()) } @@ -53,8 +56,36 @@ pub fn decompress_into<'a>(_py: Python<'a>, input: BytesType<'a>, mut output: By Ok(r) } +/// ZSTD Compressor object for streaming compression +#[pyclass] +pub struct Compressor { + inner: Option>>>, +} + +#[pymethods] +impl Compressor { + /// Initialize a new `Compressor` instance. + #[new] + pub fn __init__(level: Option) -> PyResult { + let inner = zstd::stream::write::Encoder::new(Cursor::new(vec![]), level.unwrap_or(DEFAULT_COMPRESSION_LEVEL))?; + Ok(Self { inner: Some(inner) }) + } + + /// Compress input into the current compressor's stream. + pub fn compress(&mut self, input: &[u8]) -> PyResult { + crate::io::stream_compress(&mut self.inner, input) + } + + /// Consume the current compressor state and return the compressed stream + /// **NB** The compressor will not be usable after this method is called. + pub fn finish(&mut self) -> PyResult { + crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|v| v.into_inner())) + } +} + pub(crate) mod internal { + use crate::zstd::DEFAULT_COMPRESSION_LEVEL; use std::io::{Error, Read, Write}; /// Decompress gzip data @@ -66,7 +97,7 @@ pub(crate) mod internal { /// Compress gzip data pub fn compress(input: R, output: &mut W, level: Option) -> Result { - let level = level.unwrap_or_else(|| 0); // 0 will use zstd's default, currently 3 + let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL); // 0 will use zstd's default, currently 3 let mut encoder = zstd::stream::read::Encoder::new(input, level)?; let n_bytes = std::io::copy(&mut encoder, output)?; Ok(n_bytes as usize) diff --git a/tests/test_variants.py b/tests/test_variants.py index 87bedada..ab0d2fd0 100644 --- a/tests/test_variants.py +++ b/tests/test_variants.py @@ -239,3 +239,31 @@ def test_gzip_multiple_streams(): o2 = bytes(cramjam.gzip.compress(b"bar")) out = bytes(cramjam.gzip.decompress(o1 + o2)) assert out == b"foobar" + + +@pytest.mark.parametrize( + "mod", + ( + cramjam.brotli, + cramjam.deflate, + cramjam.gzip, + cramjam.lz4, + cramjam.snappy, + cramjam.zstd, + ), +) +def test_streams_compressor(mod): + compressor = mod.Compressor() + compressor.compress(b"foo") + compressor.compress(b"bar") + out = compressor.finish() + decompressed = mod.decompress(out) + assert bytes(decompressed) == b"foobar" + + # just empty bytes after the first .finish() + # same behavior as brotli.Compressor() + assert bytes(compressor.finish()) == b"" + + # compress will raise an error as the stream is completed + with pytest.raises(cramjam.CompressionError): + compressor.compress(b'data')