Skip to content

Commit

Permalink
Support Compressor for each variant
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger committed Sep 6, 2021
1 parent b9cfa13 commit 8ecb0fa
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cramjam"
version = "2.4.0"
version = "2.4.0-rc1"
authors = ["Miles Granger <miles59923@gmail.com>"]
edition = "2018"
license = "MIT License"
Expand Down
34 changes: 33 additions & 1 deletion src/brotli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction;
use pyo3::PyResult;
use std::io::Cursor;

const DEFAULT_COMPRESSION_LEVEL: u32 = 11;

pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compress, m)?)?;
m.add_function(wrap_pyfunction!(decompress, m)?)?;
m.add_function(wrap_pyfunction!(compress_into, m)?)?;
m.add_function(wrap_pyfunction!(decompress_into, m)?)?;
m.add_class::<Compressor>()?;
Ok(())
}

Expand Down Expand Up @@ -53,8 +56,37 @@ pub fn decompress_into(input: BytesType, mut output: BytesType) -> PyResult<usiz
Ok(r)
}

/// Brotli Compressor object for streaming compression
#[pyclass]
pub struct Compressor {
inner: Option<brotli2::write::BrotliEncoder<Cursor<Vec<u8>>>>,
}

#[pymethods]
impl Compressor {
/// Initialize a new `Compressor` instance.
#[new]
pub fn __init__(level: Option<u32>) -> PyResult<Self> {
let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL);
let inner = brotli2::write::BrotliEncoder::new(Cursor::new(vec![]), level);
Ok(Self { inner: Some(inner) })
}

/// Compress input into the current compressor's stream.
pub fn compress(&mut self, input: &[u8]) -> PyResult<usize> {
crate::io::stream_compress(&mut self.inner, input)
}

/// Consume the current compressor state and return the compressed stream
/// **NB** The compressor will not be usable after this method is called.
pub fn finish(&mut self) -> PyResult<RustyBuffer> {
crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|c| c.into_inner()))
}
}

pub(crate) mod internal {

use crate::brotli::DEFAULT_COMPRESSION_LEVEL;
use brotli2::read::{BrotliDecoder, BrotliEncoder};
use std::io::prelude::*;
use std::io::Error;
Expand All @@ -68,7 +100,7 @@ pub(crate) mod internal {

/// Compress via Brotli
pub fn compress<W: Write + ?Sized, R: Read>(input: R, output: &mut W, level: Option<u32>) -> Result<usize, Error> {
let level = level.unwrap_or_else(|| 11);
let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL);
let mut encoder = BrotliEncoder::new(input, level);
let n_bytes = std::io::copy(&mut encoder, output)?;
Ok(n_bytes as usize)
Expand Down
35 changes: 34 additions & 1 deletion src/deflate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction;
use pyo3::PyResult;
use std::io::Cursor;

const DEFAULT_COMPRESSION_LEVEL: u32 = 6;

pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compress, m)?)?;
m.add_function(wrap_pyfunction!(decompress, m)?)?;
m.add_function(wrap_pyfunction!(compress_into, m)?)?;
m.add_function(wrap_pyfunction!(decompress_into, m)?)?;
m.add_class::<Compressor>()?;
Ok(())
}

Expand Down Expand Up @@ -53,8 +56,38 @@ pub fn decompress_into(input: BytesType, mut output: BytesType) -> PyResult<usiz
Ok(r)
}

/// Deflate Compressor object for streaming compression
#[pyclass]
pub struct Compressor {
inner: Option<flate2::write::DeflateEncoder<Cursor<Vec<u8>>>>,
}

#[pymethods]
impl Compressor {
/// Initialize a new `Compressor` instance.
#[new]
pub fn __init__(level: Option<u32>) -> PyResult<Self> {
let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL);
let compression = flate2::Compression::new(level);
let inner = flate2::write::DeflateEncoder::new(Cursor::new(vec![]), compression);
Ok(Self { inner: Some(inner) })
}

/// Compress input into the current compressor's stream.
pub fn compress(&mut self, input: &[u8]) -> PyResult<usize> {
crate::io::stream_compress(&mut self.inner, input)
}

/// Consume the current compressor state and return the compressed stream
/// **NB** The compressor will not be usable after this method is called.
pub fn finish(&mut self) -> PyResult<RustyBuffer> {
crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|c| c.into_inner()))
}
}

pub(crate) mod internal {

use crate::deflate::DEFAULT_COMPRESSION_LEVEL;
use flate2::read::{DeflateDecoder, DeflateEncoder};
use flate2::Compression;
use std::io::prelude::*;
Expand All @@ -69,7 +102,7 @@ pub(crate) mod internal {

/// Compress gzip data
pub fn compress<W: Write + ?Sized, R: Read>(input: R, output: &mut W, level: Option<u32>) -> Result<usize, Error> {
let level = level.unwrap_or_else(|| 6);
let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL);

let mut encoder = DeflateEncoder::new(input, Compression::new(level));
let n_bytes = std::io::copy(&mut encoder, output)?;
Expand Down
34 changes: 33 additions & 1 deletion src/gzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ use pyo3::wrap_pyfunction;
use pyo3::PyResult;
use std::io::Cursor;

const DEFAULT_COMPRESSION_LEVEL: u32 = 6;

pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compress, m)?)?;
m.add_function(wrap_pyfunction!(decompress, m)?)?;
m.add_function(wrap_pyfunction!(compress_into, m)?)?;
m.add_function(wrap_pyfunction!(decompress_into, m)?)?;
m.add_class::<Compressor>()?;
Ok(())
}

Expand Down Expand Up @@ -53,7 +56,36 @@ pub fn decompress_into(input: BytesType, mut output: BytesType) -> PyResult<usiz
Ok(r)
}

/// GZIP Compressor object for streaming compression
#[pyclass]
pub struct Compressor {
inner: Option<flate2::write::GzEncoder<Cursor<Vec<u8>>>>,
}

#[pymethods]
impl Compressor {
/// Initialize a new `Compressor` instance.
#[new]
pub fn __init__(level: Option<u32>) -> PyResult<Self> {
let level = level.unwrap_or(DEFAULT_COMPRESSION_LEVEL);
let inner = flate2::write::GzEncoder::new(Cursor::new(vec![]), flate2::Compression::new(level));
Ok(Self { inner: Some(inner) })
}

/// Compress input into the current compressor's stream.
pub fn compress(&mut self, input: &[u8]) -> PyResult<usize> {
crate::io::stream_compress(&mut self.inner, input)
}

/// Consume the current compressor state and return the compressed stream
/// **NB** The compressor will not be usable after this method is called.
pub fn finish(&mut self) -> PyResult<RustyBuffer> {
crate::io::stream_finish(&mut self.inner, |inner| inner.finish().map(|c| c.into_inner()))
}
}

pub(crate) mod internal {
use crate::gzip::DEFAULT_COMPRESSION_LEVEL;
use flate2::read::{GzEncoder, MultiGzDecoder};
use flate2::Compression;
use std::io::prelude::*;
Expand All @@ -70,7 +102,7 @@ pub(crate) mod internal {

/// Compress gzip data
pub fn compress<W: Write + ?Sized, R: Read>(input: R, output: &mut W, level: Option<u32>) -> Result<usize, Error> {
let level = level.unwrap_or_else(|| 6);
let level = level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL);
let mut encoder = GzEncoder::new(input, Compression::new(level));
let n_bytes = std::io::copy(&mut encoder, output)?;
Ok(n_bytes as usize)
Expand Down
40 changes: 40 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::fs::{File, OpenOptions};
use std::io::{copy, Cursor, Read, Seek, SeekFrom, Write};

use crate::exceptions::CompressionError;
use crate::BytesType;
use numpy::PyArray1;
use pyo3::class::buffer::PyBufferProtocol;
Expand Down Expand Up @@ -595,3 +596,42 @@ impl Read for RustyFile {
self.inner.read(buf)
}
}

// general stream compression interface. Can't use associated types due to pyo3::pyclass
// not supporting generic structs.
#[inline(always)]
pub(crate) fn stream_compress<W: Write>(encoder: &mut Option<W>, input: &[u8]) -> PyResult<usize> {
match encoder {
Some(encoder) => {
let result = std::io::copy(&mut Cursor::new(input), encoder).map(|v| v as usize);
crate::to_py_err!(CompressionError -> result)
}
None => Err(CompressionError::new_err(
"Compressor looks to have been consumed via `finish()`. \
please create a new compressor instance.",
)),
}
}

// general stream finish interface. Can't use associated types due to pyo3::pyclass
// not supporting generic structs.
#[inline(always)]
pub(crate) fn stream_finish<W, F, E>(encoder: &mut Option<W>, into_vec: F) -> PyResult<RustyBuffer>
where
W: Write,
E: ToString,
F: Fn(W) -> Result<Vec<u8>, E>,
{
// &mut encoder is part of a Compressor, often the .finish portion consumes
// the struct; which cannot be done with pyclass. So we'll swap it out for None
let mut detached_encoder = None;
std::mem::swap(&mut detached_encoder, encoder);

match detached_encoder {
Some(encoder) => {
let result = crate::to_py_err!(CompressionError -> into_vec(encoder))?;
Ok(RustyBuffer::from(result))
}
None => Ok(RustyBuffer::from(vec![])),
}
}
39 changes: 38 additions & 1 deletion src/lz4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use pyo3::wrap_pyfunction;
use pyo3::PyResult;
use std::io::Cursor;

const DEFAULT_COMPRESSION_LEVEL: u32 = 4;

pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compress, m)?)?;
m.add_function(wrap_pyfunction!(decompress, m)?)?;
m.add_function(wrap_pyfunction!(compress_block, m)?)?;
m.add_function(wrap_pyfunction!(decompress_block, m)?)?;
m.add_function(wrap_pyfunction!(compress_into, m)?)?;
m.add_function(wrap_pyfunction!(decompress_into, m)?)?;
m.add_class::<Compressor>()?;
Ok(())
}

Expand Down Expand Up @@ -117,7 +120,41 @@ pub fn compress_block(
Ok(RustyBuffer::from(out))
}

/// Snappy Compressor object for streaming compression
#[pyclass]
pub struct Compressor {
inner: Option<lz4::Encoder<Cursor<Vec<u8>>>>,
}

#[pymethods]
impl Compressor {
/// Initialize a new `Compressor` instance.
#[new]
pub fn __init__(level: Option<u32>) -> PyResult<Self> {
let inner = lz4::EncoderBuilder::new()
.auto_flush(true)
.level(level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL))
.build(Cursor::new(vec![]))?;
Ok(Self { inner: Some(inner) })
}

/// Compress input into the current compressor's stream.
pub fn compress(&mut self, input: &[u8]) -> PyResult<usize> {
crate::io::stream_compress(&mut self.inner, input)
}

/// Consume the current compressor state and return the compressed stream
/// **NB** The compressor will not be usable after this method is called.
pub fn finish(&mut self) -> PyResult<RustyBuffer> {
crate::io::stream_finish(&mut self.inner, |inner| {
let (cursor, result) = inner.finish();
result.map(|_| cursor.into_inner())
})
}
}

pub(crate) mod internal {
use crate::lz4::DEFAULT_COMPRESSION_LEVEL;
use lz4::{Decoder, EncoderBuilder};
use std::io::{Error, Read, Seek, SeekFrom, Write};

Expand All @@ -138,7 +175,7 @@ pub(crate) mod internal {
let start_pos = output.seek(SeekFrom::Current(0))?;
let mut encoder = EncoderBuilder::new()
.auto_flush(true)
.level(level.unwrap_or_else(|| 4))
.level(level.unwrap_or_else(|| DEFAULT_COMPRESSION_LEVEL))
.build(output)?;

// this returns, bytes read from uncompressed, input; we want bytes written
Expand Down
28 changes: 28 additions & 0 deletions src/snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub(crate) fn init_py_module(m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(decompress_raw_into, m)?)?;
m.add_function(wrap_pyfunction!(compress_raw_max_len, m)?)?;
m.add_function(wrap_pyfunction!(decompress_raw_len, m)?)?;
m.add_class::<Compressor>()?;
Ok(())
}

Expand Down Expand Up @@ -123,6 +124,33 @@ pub fn decompress_raw_len(data: BytesType) -> PyResult<usize> {
to_py_err!(DecompressionError -> snap::raw::decompress_len(data.as_bytes()))
}

/// Snappy Compressor object for streaming compression
#[pyclass]
pub struct Compressor {
inner: Option<snap::write::FrameEncoder<Cursor<Vec<u8>>>>,
}

#[pymethods]
impl Compressor {
/// Initialize a new `Compressor` instance.
#[new]
pub fn __init__() -> PyResult<Self> {
let inner = snap::write::FrameEncoder::new(Cursor::new(vec![]));
Ok(Self { inner: Some(inner) })
}

/// Compress input into the current compressor's stream.
pub fn compress(&mut self, input: &[u8]) -> PyResult<usize> {
crate::io::stream_compress(&mut self.inner, input)
}

/// Consume the current compressor state and return the compressed stream
/// **NB** The compressor will not be usable after this method is called.
pub fn finish(&mut self) -> PyResult<RustyBuffer> {
crate::io::stream_finish(&mut self.inner, |inner| inner.into_inner().map(|c| c.into_inner()))
}
}

pub(crate) mod internal {
use snap::read::{FrameDecoder, FrameEncoder};
use std::io::{Error, Read, Write};
Expand Down
Loading

0 comments on commit 8ecb0fa

Please sign in to comment.