diff --git a/Cargo.toml b/Cargo.toml index 315f6f1..920169e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ raptorq = "2.0" raptor-code = "1.0.6" opentelemetry = { version = "0.27", optional = true } opentelemetry-semantic-conventions = { version = "0.27" , optional = true } -rand = "0.8" +rand = "0.9" utoipa = { version = "5", optional = true } [dev-dependencies] diff --git a/src/sender/compress.rs b/src/sender/compress.rs index 6623bf8..9c3d3cb 100644 --- a/src/sender/compress.rs +++ b/src/sender/compress.rs @@ -1,10 +1,74 @@ -use std::io::Write; +//! This module provides functions for compressing data using various compression algorithms. +//! +//! The supported compression algorithms are: +//! - Zlib +//! - Deflate +//! - Gzip +//! +//! The module offers two main functions for compression: +//! - `compress_buffer`: Compresses a byte slice and returns the compressed data as a vector of bytes. +//! - `compress_stream`: Compresses data from an input stream and writes the compressed data to an output stream. +//! +//! # Examples +//! +//! ## Compressing a Byte Slice +//! +//! ```rust +//! use flute::sender::compress::compress_buffer; +//! use flute::core::lct::Cenc; +//! +//! let data = b"example data"; +//! let compressed_data = compress_buffer(data, Cenc::Gzip).expect("Compression failed"); +//! ``` +//! +//! ## Compressing Data from a Stream +//! +//! ```rust +//! use flute::sender::compress::compress_stream; +//! use flute::core::lct::Cenc; +//! use std::io::Cursor; +//! +//! let data = b"example data"; +//! let mut input = Cursor::new(data); +//! let mut output = Vec::new(); +//! compress_stream(&mut input, Cenc::Gzip, &mut output).expect("Compression failed"); +//! ``` use crate::common::lct; use crate::tools::error::{FluteError, Result}; use flate2::write::{DeflateEncoder, GzEncoder, ZlibEncoder}; +use std::io::Write; -pub fn compress(data: &[u8], cenc: lct::Cenc) -> Result> { +/// Compresses the given data using the specified compression encoding. +/// +/// # Arguments +/// +/// * `data` - A byte slice containing the data to be compressed. +/// * `cenc` - The compression encoding to use. This can be one of the following: +/// - `lct::Cenc::Null`: No compression (returns an error). +/// - `lct::Cenc::Zlib`: Compress using the Zlib algorithm. +/// - `lct::Cenc::Deflate`: Compress using the Deflate algorithm. +/// - `lct::Cenc::Gzip`: Compress using the Gzip algorithm. +/// +/// # Returns +/// +/// A `Result` containing a vector of compressed bytes on success, or a `FluteError` on failure. +/// +/// # Errors +/// +/// This function will return an error if the specified compression encoding is `lct::Cenc::Null` +/// or if there is an issue during the compression process. +/// +/// # Examples +/// +/// ``` +/// use flute::sender::compress::compress_buffer; +/// use flute::core::lct::Cenc; +/// +/// let data = b"example data"; +/// let compressed_data = compress_buffer(data, Cenc::Gzip).expect("Compression failed"); +/// ``` +pub fn compress_buffer(data: &[u8], cenc: lct::Cenc) -> Result> { match cenc { lct::Cenc::Null => Err(FluteError::new("Null compression ?")), lct::Cenc::Zlib => compress_zlib(data), @@ -13,6 +77,41 @@ pub fn compress(data: &[u8], cenc: lct::Cenc) -> Result> { } } +/// Compresses data from an input stream and writes the compressed data to an output stream +/// using the specified compression encoding. +/// +/// # Arguments +/// +/// * `input` - A mutable reference to a type that implements the `std::io::Read` trait, representing the input stream. +/// * `cenc` - The compression encoding to use. This can be one of the following: +/// - `lct::Cenc::Null`: No compression (returns an error). +/// - `lct::Cenc::Zlib`: Compress using the Zlib algorithm. +/// - `lct::Cenc::Deflate`: Compress using the Deflate algorithm. +/// - `lct::Cenc::Gzip`: Compress using the Gzip algorithm. +/// * `output` - A mutable reference to a type that implements the `std::io::Write` trait, representing the output stream. +/// +/// # Returns +/// +/// A `Result` containing `()` on success, or a `FluteError` on failure. +/// +/// # Errors +/// +/// This function will return an error if the specified compression encoding is `lct::Cenc::Null` +/// or if there is an issue during the compression process. +/// ``` +pub fn compress_stream( + input: &mut dyn std::io::Read, + cenc: lct::Cenc, + output: &mut dyn std::io::Write, +) -> Result<()> { + match cenc { + lct::Cenc::Null => Err(FluteError::new("Null compression ?")), + lct::Cenc::Zlib => stream_compress_zlib(input, output), + lct::Cenc::Deflate => stream_compress_deflate(input, output), + lct::Cenc::Gzip => stream_compress_gzip(input, output), + } +} + fn compress_gzip(data: &[u8]) -> Result> { log::debug!("Create GZIP encoder"); let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default()); @@ -34,3 +133,60 @@ fn compress_zlib(data: &[u8]) -> Result> { let output = encoder.finish()?; Ok(output) } + +fn stream_compress_gzip( + input: &mut dyn std::io::Read, + output: &mut dyn std::io::Write, +) -> Result<()> { + log::debug!("Create GZIP encoder"); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + let mut buffer = vec![0; 1024 * 1024]; + loop { + let read = input.read(&mut buffer)?; + if read == 0 { + break; + } + encoder.write_all(&buffer[..read])?; + } + + encoder.finish()?; + Ok(()) +} + +fn stream_compress_zlib( + input: &mut dyn std::io::Read, + output: &mut dyn std::io::Write, +) -> Result<()> { + log::debug!("Create GZIP encoder"); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + let mut buffer = vec![0; 1024 * 1024]; + loop { + let read = input.read(&mut buffer)?; + if read == 0 { + break; + } + encoder.write_all(&buffer[..read])?; + } + + encoder.finish()?; + Ok(()) +} + +fn stream_compress_deflate( + input: &mut dyn std::io::Read, + output: &mut dyn std::io::Write, +) -> Result<()> { + log::debug!("Create GZIP encoder"); + let mut encoder = DeflateEncoder::new(output, flate2::Compression::default()); + let mut buffer = vec![0; 1024 * 1024]; + loop { + let read = input.read(&mut buffer)?; + if read == 0 { + break; + } + encoder.write_all(&buffer[..read])?; + } + + encoder.finish()?; + Ok(()) +} diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 4799498..b78af27 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -4,7 +4,6 @@ mod block; mod blockencoder; -mod compress; mod fdt; mod filedesc; mod objectdesc; @@ -16,6 +15,7 @@ mod toiallocator; #[cfg(feature = "opentelemetry")] mod objectsenderlogger; +pub mod compress; pub use crate::common::Profile; pub use objectdesc::CacheControl; pub use objectdesc::ObjectDesc; diff --git a/src/sender/objectdesc.rs b/src/sender/objectdesc.rs index f12d260..9901e57 100644 --- a/src/sender/objectdesc.rs +++ b/src/sender/objectdesc.rs @@ -88,7 +88,7 @@ impl ObjectDataSource { pub fn from_buffer(buffer: &[u8], cenc: lct::Cenc) -> Result { let data = match cenc { lct::Cenc::Null => Ok(buffer.to_vec()), - _ => compress::compress(buffer, cenc), + _ => compress::compress_buffer(buffer, cenc), }?; Ok(ObjectDataSource::Buffer(data)) @@ -98,7 +98,7 @@ impl ObjectDataSource { pub fn from_vec(buffer: Vec, cenc: lct::Cenc) -> Result { let data = match cenc { lct::Cenc::Null => Ok(buffer.to_vec()), - _ => compress::compress(&buffer, cenc), + _ => compress::compress_buffer(&buffer, cenc), }?; Ok(ObjectDataSource::Buffer(data)) diff --git a/src/sender/toiallocator.rs b/src/sender/toiallocator.rs index bfbae93..85aaa5a 100644 --- a/src/sender/toiallocator.rs +++ b/src/sender/toiallocator.rs @@ -44,8 +44,8 @@ impl ToiAllocatorInternal { Some(0) => 1, Some(n) => n, None => { - let mut rng = rand::thread_rng(); - rng.gen() + let mut rng = rand::rng(); + rng.random() } }; diff --git a/tests/flute.rs b/tests/flute.rs index 7144464..d613a32 100644 --- a/tests/flute.rs +++ b/tests/flute.rs @@ -237,7 +237,7 @@ mod tests { input_file_buffer.extend(vec![0; file_size]); // Random buffer - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); rng.fill_bytes(input_file_buffer.as_mut()); (input_file_buffer, input_content_location)