diff --git a/Cargo.toml b/Cargo.toml index 7f68b61..eac728f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flute" -version = "1.6.9" +version = "1.7.0" authors = ["Yannick Poirier "] edition = "2021" license = "MIT" diff --git a/src/lib.rs b/src/lib.rs index 50caa09..6f10493 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! let mut sender = Sender::new(endpoint, tsi, &oti, &config); //! //! // Add object(s) (files) to the FLUTE sender (priority queue 0) -//! let obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain", +//! let obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain", //! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap(); //! sender.add_object(0, obj); //! @@ -191,11 +191,11 @@ //! let mut sender = Sender::new(endpoint, 1, &Default::default(), &config); //! //! // Create an ObjectDesc for a low priority file -//! let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority", "text/plain", +//! let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority".to_vec(), "text/plain", //! &url::Url::parse("file:///low_priority.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap(); //! //! // Create an ObjectDesc for a high priority file -//! let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority", "text/plain", +//! let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority".to_vec(), "text/plain", //! &url::Url::parse("file:///high_priority.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap(); //! //! // Put Object to the low priority queue @@ -238,7 +238,7 @@ //! let mut sender = Sender::new(endpoint, tsi, &oti, &config); //! //! // Create an Object -//! let mut obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain", +//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain", //! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap(); //! //! // Set the Target Transfer Duration of this object to 2 seconds @@ -286,7 +286,7 @@ //! let mut sender = Sender::new(endpoint, tsi, &oti, &config); //! //! // Create an Object -//! let mut obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain", +//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain", //! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap(); //! //! // Set the Target Transfer End Time of this object to 10 seconds from now diff --git a/src/py/sender/senderpy.rs b/src/py/sender/senderpy.rs index 2fc2d57..800192d 100644 --- a/src/py/sender/senderpy.rs +++ b/src/py/sender/senderpy.rs @@ -36,7 +36,7 @@ impl Sender { let oti = oti.map(|o| o.0.clone()); let object = crate::sender::ObjectDesc::create_from_buffer( - content, + content.to_vec(), content_type, &content_location, 1, diff --git a/src/sender/blockencoder.rs b/src/sender/blockencoder.rs index f013fc9..7023036 100644 --- a/src/sender/blockencoder.rs +++ b/src/sender/blockencoder.rs @@ -1,8 +1,9 @@ -use std::io::Read; use std::sync::Arc; use super::filedesc; +use super::objectdesc::ObjectDataSource; use crate::common::{partition, pkt}; +use crate::error::FluteError; use crate::tools::error::Result; #[derive(Debug)] @@ -20,7 +21,6 @@ pub struct BlockEncoder { read_end: bool, source_size_transferred: usize, nb_pkt_sent: usize, - fd: Option, stopped: bool, closabled_object: bool, } @@ -33,10 +33,11 @@ impl BlockEncoder { block_multiplex_windows: usize, closabled_object: bool, ) -> Result { - let mut fd = None; - if let (None, Some(path)) = (file.object.content.as_ref(), file.object.path.as_ref()) { - log::info!("Open file {:?}", path); - fd = Some(std::fs::File::open(path)?); + match &file.object.source { + ObjectDataSource::Buffer(_) => {} + ObjectDataSource::Stream(stream) => { + stream.lock().unwrap().seek(std::io::SeekFrom::Start(0))?; + } } let mut block = BlockEncoder { @@ -53,7 +54,6 @@ impl BlockEncoder { read_end: false, source_size_transferred: 0, nb_pkt_sent: 0, - fd, stopped: false, closabled_object, }; @@ -149,19 +149,22 @@ impl BlockEncoder { fn read_block(&mut self) -> Result<()> { debug_assert!(!self.read_end); - - if self.fd.is_some() { - return self.read_fd_block(); - } - - if self.file.object.content.is_none() { - self.read_end = true; - return Ok(()); + let source = &self.file.object.source; + match source { + ObjectDataSource::Buffer(_) => self.read_block_buffer(), + ObjectDataSource::Stream(_) => self.read_block_stream(), } + } + fn read_block_buffer(&mut self) -> Result<()> { log::debug!("Read block nb {}", self.curr_sbn); + + let content = match &self.file.object.source { + ObjectDataSource::Buffer(buffer) => Ok(buffer), + _ => Err(FluteError::new("Not a data source buffer")), + }?; + let oti = &self.file.oti; - let content = self.file.object.content.as_ref().unwrap(); let block_length = match self.curr_sbn as u64 { value if value < self.nb_a_large => self.a_large, _ => self.a_small, @@ -186,14 +189,17 @@ impl BlockEncoder { content.len(), self.read_end ); - Ok(()) } - fn read_fd_block(&mut self) -> Result<()> { - let fd = self.fd.as_mut().unwrap(); - + fn read_block_stream(&mut self) -> Result<()> { log::info!("Read block nb {}", self.curr_sbn); + + let mut stream = match &self.file.object.source { + ObjectDataSource::Stream(stream) => Ok(stream.lock().unwrap()), + _ => Err(FluteError::new("Not a data source stream")), + }?; + let oti = &self.file.oti; let block_length = match self.curr_sbn as u64 { value if value < self.nb_a_large => self.a_large, @@ -201,7 +207,7 @@ impl BlockEncoder { }; let mut buffer: Vec = vec![0; block_length as usize * oti.encoding_symbol_length as usize]; - let result = match fd.read(&mut buffer) { + let result = match stream.read(&mut buffer) { Ok(s) => s, Err(e) => { log::error!("Fail to read file {:?}", e.to_string()); diff --git a/src/sender/fdt.rs b/src/sender/fdt.rs index 3f64bb7..c64eae6 100644 --- a/src/sender/fdt.rs +++ b/src/sender/fdt.rs @@ -199,7 +199,7 @@ impl Fdt { log::info!("TSI={} Publish new FDT", self._tsi); let content = self.to_xml(now)?; let mut obj = objectdesc::ObjectDesc::create_from_buffer( - &content, + content, "text/xml", &url::Url::parse("file:///").unwrap(), 1, @@ -407,7 +407,7 @@ mod tests { Some(vec!["Group1".to_owned()]), ); let obj1 = objectdesc::ObjectDesc::create_from_buffer( - &Vec::new(), + Vec::new(), "plain/txt", &url::Url::parse("file:///object1").unwrap(), 2, @@ -423,7 +423,7 @@ mod tests { .unwrap(); let obj2 = objectdesc::ObjectDesc::create_from_buffer( - &Vec::new(), + Vec::new(), "plain/txt", &url::Url::parse("file:///object2").unwrap(), 2, diff --git a/src/sender/mod.rs b/src/sender/mod.rs index e87d0e3..4799498 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -19,6 +19,9 @@ mod objectsenderlogger; pub use crate::common::Profile; pub use objectdesc::CacheControl; pub use objectdesc::ObjectDesc; +pub use objectdesc::ObjectDataSource; +pub use objectdesc::ObjectDataStream; +pub use objectdesc::ObjectDataStreamTrait; pub use objectdesc::TargetAcquisition; pub use observer::Event; pub use observer::FileInfo; diff --git a/src/sender/objectdesc.rs b/src/sender/objectdesc.rs index b729c45..f12d260 100644 --- a/src/sender/objectdesc.rs +++ b/src/sender/objectdesc.rs @@ -8,7 +8,9 @@ use crate::tools; use crate::tools::error::Result; use std::collections::HashMap; use std::ffi::OsStr; -use std::io::{BufReader, Read}; +use std::io::BufReader; +use std::io::{Read, Seek}; +use std::sync::Mutex; use std::time::SystemTime; /// Cache Control @@ -50,7 +52,7 @@ pub fn create_fdt_cache_control(cc: &CacheControl, now: SystemTime) -> fdtinstan /// /// Target Acquisition for Object -/// +/// #[derive(Debug, Clone)] pub enum TargetAcquisition { /// Transfer the object as fast as possible @@ -61,6 +63,66 @@ pub enum TargetAcquisition { WithinTime(std::time::SystemTime), } +/// +/// Object Data Stream Trait +/// +pub trait ObjectDataStreamTrait: + std::io::Read + std::io::Seek + Send + Sync + std::fmt::Debug +{ +} +impl ObjectDataStreamTrait for T {} +/// Boxed Object Data Stream +pub type ObjectDataStream = Box; + +/// Object Data Source +#[derive(Debug)] +pub enum ObjectDataSource { + /// Source from a stream + Stream(Mutex), + /// Source from a buffer + Buffer(Vec), +} + +impl ObjectDataSource { + /// Create an Object Data Source from a buffer + pub fn from_buffer(buffer: &[u8], cenc: lct::Cenc) -> Result { + let data = match cenc { + lct::Cenc::Null => Ok(buffer.to_vec()), + _ => compress::compress(buffer, cenc), + }?; + + Ok(ObjectDataSource::Buffer(data)) + } + + /// Create an Object Data Source from a vector + pub fn from_vec(buffer: Vec, cenc: lct::Cenc) -> Result { + let data = match cenc { + lct::Cenc::Null => Ok(buffer.to_vec()), + _ => compress::compress(&buffer, cenc), + }?; + + Ok(ObjectDataSource::Buffer(data)) + } + + /// Create an Object Data Source from a stream + pub fn from_stream(stream: ObjectDataStream) -> Self { + ObjectDataSource::Stream(Mutex::new(stream)) + } + + fn len(&mut self) -> Result { + match self { + ObjectDataSource::Buffer(buffer) => Ok(buffer.len() as u64), + ObjectDataSource::Stream(stream) => { + let mut stream = stream.lock().unwrap(); + let current_pos = stream.stream_position()?; + let end_pos = stream.seek(std::io::SeekFrom::End(0))?; + stream.seek(std::io::SeekFrom::Start(current_pos))?; + Ok(end_pos) + } + } + } +} + /// /// Object (file) that can be send over FLUTE /// @@ -69,10 +131,8 @@ pub struct ObjectDesc { /// supply the resource location for this object /// as defined in [rfc2616 14.14](https://www.rfc-editor.org/rfc/rfc2616#section-14.14) pub content_location: url::Url, - /// Optional path to the file - pub path: Option, - /// Optional buffer contening the content of this object - pub content: Option>, + /// Data Source of the object + pub source: ObjectDataSource, /// Media type of the object /// as defined in [rfc2616 14.17](https://www.rfc-editor.org/rfc/rfc2616#section-14.17) pub content_type: String, @@ -153,7 +213,6 @@ impl ObjectDesc { let content = std::fs::read(path)?; Self::create_with_content( content, - Some(path.to_path_buf()), content_type.to_string(), content_location, max_transfer_count, @@ -167,16 +226,21 @@ impl ObjectDesc { md5, ) } else { - Self::create_with_path( - path.to_path_buf(), - content_type.to_string(), - content_location, + if cenc != lct::Cenc::Null { + return Err(FluteError::new( + "Compressed object is not compatible with file path", + )); + } + let file = std::fs::File::open(path)?; + Self::create_from_stream( + Box::new(file), + content_type, + &content_location, max_transfer_count, carousel_delay, target_acquisition, cache_control, groups, - cenc, inband_cenc, oti, md5, @@ -184,9 +248,56 @@ impl ObjectDesc { } } + /// Create an Object Description from a stream + pub fn create_from_stream( + mut stream: ObjectDataStream, + content_type: &str, + content_location: &url::Url, + max_transfer_count: u32, + carousel_delay: Option, + target_acquisition: Option, + cache_control: Option, + groups: Option>, + inband_cenc: bool, + oti: Option, + md5: bool, + ) -> Result> { + let md5 = match md5 { + // https://www.rfc-editor.org/rfc/rfc2616#section-14.15 + true => { + let md5 = Self::compute_file_md5(&mut stream)?; + Some(base64::engine::general_purpose::STANDARD.encode(md5.0)) + } + false => None, + }; + + let mut source = ObjectDataSource::from_stream(stream); + let transfer_length = source.len()?; + + Ok(Box::new(ObjectDesc { + content_location: content_location.clone(), + source, + content_type: content_type.to_string(), + content_length: transfer_length, + transfer_length, + cenc: lct::Cenc::Null, + inband_cenc, + md5, + oti, + max_transfer_count, + carousel_delay, + target_acquisition, + cache_control, + groups, + toi: None, + optel_propagator: None, + e_tag: None, + })) + } + /// Return an `ObjectDesc` from a buffer pub fn create_from_buffer( - content: &[u8], + content: Vec, content_type: &str, content_location: &url::Url, max_transfer_count: u32, @@ -200,8 +311,7 @@ impl ObjectDesc { md5: bool, ) -> Result> { ObjectDesc::create_with_content( - content.to_vec(), - None, + content, content_type.to_string(), content_location.clone(), max_transfer_count, @@ -217,8 +327,7 @@ impl ObjectDesc { } fn create_with_content( - mut content: Vec, - path: Option, + content: Vec, content_type: String, content_location: url::Url, max_transfer_count: u32, @@ -241,75 +350,14 @@ impl ObjectDesc { false => None, }; - if cenc != lct::Cenc::Null { - content = compress::compress(&content, cenc)?; - log::info!( - "compress content from {} to {}", - content_length, - content.len() - ); - } - - let transfer_length = content.len(); + let mut source = ObjectDataSource::from_vec(content, cenc)?; + let transfer_length = source.len()?; Ok(Box::new(ObjectDesc { content_location, - path, - content: Some(content), + source, content_type, content_length: content_length as u64, - transfer_length: transfer_length as u64, - cenc, - inband_cenc, - md5, - oti, - max_transfer_count, - carousel_delay, - target_acquisition, - cache_control, - groups, - toi: None, - optel_propagator: None, - e_tag: None, - })) - } - - fn create_with_path( - path: std::path::PathBuf, - content_type: String, - content_location: url::Url, - max_transfer_count: u32, - carousel_delay: Option, - target_acquisition: Option, - cache_control: Option, - groups: Option>, - cenc: lct::Cenc, - inband_cenc: bool, - oti: Option, - md5: bool, - ) -> Result> { - if cenc != lct::Cenc::Null { - return Err(FluteError::new( - "Compressed object is not compatible with file path", - )); - } - let file = std::fs::File::open(path.clone())?; - let transfer_length = file.metadata()?.len(); - - let md5 = match md5 { - // https://www.rfc-editor.org/rfc/rfc2616#section-14.15 - true => Some( - base64::engine::general_purpose::STANDARD.encode(Self::compute_file_md5(&file).0), - ), - false => None, - }; - - Ok(Box::new(ObjectDesc { - content_location, - path: Some(path.to_path_buf()), - content: None, - content_type, - content_length: transfer_length, transfer_length, cenc, inband_cenc, @@ -326,19 +374,21 @@ impl ObjectDesc { })) } - fn compute_file_md5(file: &std::fs::File) -> md5::Digest { - let mut reader = BufReader::new(file); + fn compute_file_md5(stream: &mut ObjectDataStream) -> Result { + stream.seek(std::io::SeekFrom::Start(0))?; + let mut reader = BufReader::new(stream); let mut context = md5::Context::new(); let mut buffer = vec![0; 102400]; loop { - let count = reader.read(&mut buffer).unwrap(); + let count = reader.read(&mut buffer)?; if count == 0 { break; } context.consume(&buffer[0..count]); } - context.compute() + reader.seek(std::io::SeekFrom::Start(0))?; + Ok(context.compute()) } } diff --git a/src/sender/sender.rs b/src/sender/sender.rs index 24c50e1..a35a0e8 100644 --- a/src/sender/sender.rs +++ b/src/sender/sender.rs @@ -414,7 +414,7 @@ mod tests { fn create_obj(length: usize) -> Box { let buffer = vec![0u8; length]; objectdesc::ObjectDesc::create_from_buffer( - &buffer, + buffer, "text", &url::Url::parse("file:///hello").unwrap(), 1, diff --git a/tests/flute.rs b/tests/flute.rs index ceff079..7144464 100644 --- a/tests/flute.rs +++ b/tests/flute.rs @@ -84,7 +84,7 @@ mod tests { let (buffer, content_location) = create_file_buffer(transfer_file_size); ( sender::ObjectDesc::create_from_buffer( - &buffer, + buffer.clone(), &content_type, &content_location, 1,