Skip to content

Commit

Permalink
Possibility to transmit Object implementing std::io::Read trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Feb 10, 2025
1 parent 8358726 commit b0cd6b4
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 118 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flute"
version = "1.6.9"
version = "1.7.0"
authors = ["Yannick Poirier <contact@yannickpoirier.fr>"]
edition = "2021"
license = "MIT"
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
//!
//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
//! let obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain",
//! let obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
//! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
//! sender.add_object(0, obj);
//!
Expand Down Expand Up @@ -191,11 +191,11 @@
//! let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);
//!
//! // Create an ObjectDesc for a low priority file
//! let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority", "text/plain",
//! let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority".to_vec(), "text/plain",
//! &url::Url::parse("file:///low_priority.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
//!
//! // Create an ObjectDesc for a high priority file
//! let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority", "text/plain",
//! let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority".to_vec(), "text/plain",
//! &url::Url::parse("file:///high_priority.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
//!
//! // Put Object to the low priority queue
Expand Down Expand Up @@ -238,7 +238,7 @@
//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
//!
//! // Create an Object
//! let mut obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain",
//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
//! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
//!
//! // Set the Target Transfer Duration of this object to 2 seconds
Expand Down Expand Up @@ -286,7 +286,7 @@
//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
//!
//! // Create an Object
//! let mut obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain",
//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
//! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
//!
//! // Set the Target Transfer End Time of this object to 10 seconds from now
Expand Down
2 changes: 1 addition & 1 deletion src/py/sender/senderpy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Sender {

let oti = oti.map(|o| o.0.clone());
let object = crate::sender::ObjectDesc::create_from_buffer(
content,
content.to_vec(),
content_type,
&content_location,
1,
Expand Down
48 changes: 27 additions & 21 deletions src/sender/blockencoder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::io::Read;
use std::sync::Arc;

use super::filedesc;
use super::objectdesc::ObjectDataSource;
use crate::common::{partition, pkt};
use crate::error::FluteError;
use crate::tools::error::Result;

#[derive(Debug)]
Expand All @@ -20,7 +21,6 @@ pub struct BlockEncoder {
read_end: bool,
source_size_transferred: usize,
nb_pkt_sent: usize,
fd: Option<std::fs::File>,
stopped: bool,
closabled_object: bool,
}
Expand All @@ -33,10 +33,11 @@ impl BlockEncoder {
block_multiplex_windows: usize,
closabled_object: bool,
) -> Result<BlockEncoder> {
let mut fd = None;
if let (None, Some(path)) = (file.object.content.as_ref(), file.object.path.as_ref()) {
log::info!("Open file {:?}", path);
fd = Some(std::fs::File::open(path)?);
match &file.object.source {
ObjectDataSource::Buffer(_) => {}
ObjectDataSource::Stream(stream) => {
stream.lock().unwrap().seek(std::io::SeekFrom::Start(0))?;
}
}

let mut block = BlockEncoder {
Expand All @@ -53,7 +54,6 @@ impl BlockEncoder {
read_end: false,
source_size_transferred: 0,
nb_pkt_sent: 0,
fd,
stopped: false,
closabled_object,
};
Expand Down Expand Up @@ -149,19 +149,22 @@ impl BlockEncoder {

fn read_block(&mut self) -> Result<()> {
debug_assert!(!self.read_end);

if self.fd.is_some() {
return self.read_fd_block();
}

if self.file.object.content.is_none() {
self.read_end = true;
return Ok(());
let source = &self.file.object.source;
match source {
ObjectDataSource::Buffer(_) => self.read_block_buffer(),
ObjectDataSource::Stream(_) => self.read_block_stream(),
}
}

fn read_block_buffer(&mut self) -> Result<()> {
log::debug!("Read block nb {}", self.curr_sbn);

let content = match &self.file.object.source {
ObjectDataSource::Buffer(buffer) => Ok(buffer),
_ => Err(FluteError::new("Not a data source buffer")),
}?;

let oti = &self.file.oti;
let content = self.file.object.content.as_ref().unwrap();
let block_length = match self.curr_sbn as u64 {
value if value < self.nb_a_large => self.a_large,
_ => self.a_small,
Expand All @@ -186,22 +189,25 @@ impl BlockEncoder {
content.len(),
self.read_end
);

Ok(())
}

fn read_fd_block(&mut self) -> Result<()> {
let fd = self.fd.as_mut().unwrap();

fn read_block_stream(&mut self) -> Result<()> {
log::info!("Read block nb {}", self.curr_sbn);

let mut stream = match &self.file.object.source {
ObjectDataSource::Stream(stream) => Ok(stream.lock().unwrap()),
_ => Err(FluteError::new("Not a data source stream")),
}?;

let oti = &self.file.oti;
let block_length = match self.curr_sbn as u64 {
value if value < self.nb_a_large => self.a_large,
_ => self.a_small,
};
let mut buffer: Vec<u8> =
vec![0; block_length as usize * oti.encoding_symbol_length as usize];
let result = match fd.read(&mut buffer) {
let result = match stream.read(&mut buffer) {
Ok(s) => s,
Err(e) => {
log::error!("Fail to read file {:?}", e.to_string());
Expand Down
6 changes: 3 additions & 3 deletions src/sender/fdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl Fdt {
log::info!("TSI={} Publish new FDT", self._tsi);
let content = self.to_xml(now)?;
let mut obj = objectdesc::ObjectDesc::create_from_buffer(
&content,
content,
"text/xml",
&url::Url::parse("file:///").unwrap(),
1,
Expand Down Expand Up @@ -407,7 +407,7 @@ mod tests {
Some(vec!["Group1".to_owned()]),
);
let obj1 = objectdesc::ObjectDesc::create_from_buffer(
&Vec::new(),
Vec::new(),
"plain/txt",
&url::Url::parse("file:///object1").unwrap(),
2,
Expand All @@ -423,7 +423,7 @@ mod tests {
.unwrap();

let obj2 = objectdesc::ObjectDesc::create_from_buffer(
&Vec::new(),
Vec::new(),
"plain/txt",
&url::Url::parse("file:///object2").unwrap(),
2,
Expand Down
3 changes: 3 additions & 0 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ mod objectsenderlogger;
pub use crate::common::Profile;
pub use objectdesc::CacheControl;
pub use objectdesc::ObjectDesc;
pub use objectdesc::ObjectDataSource;
pub use objectdesc::ObjectDataStream;
pub use objectdesc::ObjectDataStreamTrait;
pub use objectdesc::TargetAcquisition;
pub use observer::Event;
pub use observer::FileInfo;
Expand Down
Loading

0 comments on commit b0cd6b4

Please sign in to comment.