Skip to content

Commit

Permalink
Acquisition time param
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Nov 15, 2024
1 parent 04ce4aa commit 1ff31b8
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 66 deletions.
17 changes: 13 additions & 4 deletions src/receiver/writer/objectwriterbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub struct ObjectWriterBuffer {
pub data: Vec<u8>,
/// Metadata of the object
pub meta: ObjectMetadata,
/// Time when the object reception started
pub start_time: SystemTime,
/// Time when the object reception ended
pub end_time: Option<SystemTime>,
}

impl ObjectWriterBufferBuilder {
Expand All @@ -54,13 +58,15 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder {
_tsi: &u64,
_toi: &u128,
meta: &ObjectMetadata,
_now: std::time::SystemTime,
now: std::time::SystemTime,
) -> Box<dyn ObjectWriter> {
let obj = Rc::new(RefCell::new(ObjectWriterBuffer {
complete: false,
error: false,
data: Vec::new(),
meta: meta.clone(),
start_time: now,
end_time: None,
}));

let obj_wrapper = Box::new(ObjectWriterBufferWrapper { inner: obj.clone() });
Expand Down Expand Up @@ -103,21 +109,24 @@ impl ObjectWriter for ObjectWriterBufferWrapper {
inner.data.extend(data);
}

fn complete(&self, _now: SystemTime) {
fn complete(&self, now: SystemTime) {
let mut inner = self.inner.borrow_mut();
log::info!("Object complete !");
inner.complete = true;
inner.end_time = Some(now);
}

fn error(&self, _now: SystemTime) {
fn error(&self, now: SystemTime) {
let mut inner = self.inner.borrow_mut();
log::error!("Object received with error");
inner.error = true;
inner.end_time = Some(now);
}

fn interrupted(&self, _now: SystemTime) {
fn interrupted(&self, now: SystemTime) {
let mut inner = self.inner.borrow_mut();
log::error!("Object reception interrupted");
inner.error = true;
inner.end_time = Some(now);
}
}
95 changes: 61 additions & 34 deletions src/sender/filedesc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,62 @@ struct TransferInfo {
total_nb_transfer: u64,
last_transfer: Option<SystemTime>,
next_transfer_timestamp: Option<SystemTime>,
packet_transmission_tick: Option<std::time::Duration>,
}

impl TransferInfo {
fn init(&mut self, object: &ObjectDesc, oti: &oti::Oti, now: SystemTime) {
self.transferring = true;
let mut packet_transmission_tick = None;
if let Some(target_acquisition_latency) = object.target_acquisition.as_ref() {
packet_transmission_tick = match target_acquisition_latency {
crate::sender::objectdesc::TargetAcquisition::AsFastAsPossible => None,
crate::sender::objectdesc::TargetAcquisition::WithinDuration(duration) => {
let nb_packets = object
.transfer_length
.div_ceil(oti.encoding_symbol_length as u64);
// TODO should we take into account the FEC encoding symbol length ?
Some(duration.div_f64(nb_packets as f64))
}
crate::sender::objectdesc::TargetAcquisition::WithinTime(target_time) => {
let duration = target_time.duration_since(now).unwrap_or_default();
if duration.is_zero() {
log::warn!("Target acquisition time is in the past");
}
let nb_packets = object
.transfer_length
.div_ceil(oti.encoding_symbol_length as u64);
Some(duration.div_f64(nb_packets as f64))
}
}
}

self.packet_transmission_tick = packet_transmission_tick;
if self.packet_transmission_tick.is_some() {
self.next_transfer_timestamp = Some(now)
}

if self.transfer_count == object.max_transfer_count && object.carousel_delay.is_some() {
self.transfer_count = 0;
}
}

fn done(&mut self, now: SystemTime) {
self.transferring = false;
self.transfer_count += 1;
self.total_nb_transfer += 1;
self.last_transfer = Some(now);
}

fn tick(&mut self) {
if let Some(tick) = self.packet_transmission_tick {
if let Some(next_transfer_timestamp) = self.next_transfer_timestamp.as_mut() {
if let Some(next) = next_transfer_timestamp.checked_add(tick) {
*next_transfer_timestamp = next;
}
}
}
}
}

#[derive(Debug)]
Expand All @@ -27,7 +83,6 @@ pub struct FileDesc {
pub published: AtomicBool,
pub toi: u128,
transfer_info: RwLock<TransferInfo>,
packet_transmission_tick: Option<std::time::Duration>,
}

impl FileDesc {
Expand All @@ -52,15 +107,6 @@ impl FileDesc {
)));
}

let mut packet_transmission_tick = None;
if let Some(target_acquisition_latency) = object.target_acquisition_latency.as_ref() {
let nb_packets = object
.transfer_length
.div_ceil(oti.encoding_symbol_length as u64);
// TODO should we take into account the FEC encoding symbol length ?
packet_transmission_tick = Some(target_acquisition_latency.div_f64(nb_packets as f64));
}

if oti.fec_encoding_id == oti::FECEncodingID::RaptorQ
|| oti.fec_encoding_id == oti::FECEncodingID::Raptor
{
Expand Down Expand Up @@ -124,10 +170,10 @@ impl FileDesc {
last_transfer: None,
total_nb_transfer: 0,
next_transfer_timestamp: None,
packet_transmission_tick: None,
}),
published: AtomicBool::new(false),
toi,
packet_transmission_tick,
})
}

Expand All @@ -138,25 +184,12 @@ impl FileDesc {

pub fn transfer_started(&self, now: SystemTime) {
let mut info = self.transfer_info.write().unwrap();
info.transferring = true;
if self.packet_transmission_tick.is_some() {
info.next_transfer_timestamp = Some(now)
}

if info.transfer_count == self.object.max_transfer_count
&& self.object.carousel_delay.is_some()
{
info.transfer_count = 0;
}
info.init(&self.object, &self.oti, now);
}

pub fn transfer_done(&self, now: SystemTime) {
let mut info = self.transfer_info.write().unwrap();
debug_assert!(info.transferring);
info.transferring = false;
info.transfer_count += 1;
info.total_nb_transfer += 1;
info.last_transfer = Some(now);
info.done(now);
}

pub fn is_expired(&self) -> bool {
Expand All @@ -178,14 +211,8 @@ impl FileDesc {
}

pub fn inc_next_transfer_timestamp(&self) {
if let Some(tick) = self.packet_transmission_tick.as_ref() {
let mut info = self.transfer_info.write().unwrap();
if let Some(next_transfer_timestamp) = info.next_transfer_timestamp.as_mut() {
if let Some(next) = next_transfer_timestamp.checked_add(*tick) {
*next_transfer_timestamp = next;
}
}
}
let mut info = self.transfer_info.write().unwrap();
info.tick();
}

pub fn is_last_transfer(&self) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod objectsenderlogger;
pub use crate::common::Profile;
pub use objectdesc::CacheControl;
pub use objectdesc::ObjectDesc;
pub use objectdesc::TargetAcquisition;
pub use observer::Event;
pub use observer::FileInfo;
pub use observer::Subscriber;
Expand All @@ -27,3 +28,4 @@ pub use sender::PriorityQueue;
pub use sender::Sender;
pub use sender::TOIMaxLength;
pub use toiallocator::Toi;

39 changes: 23 additions & 16 deletions src/sender/objectdesc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ pub fn create_fdt_cache_control(cc: &CacheControl, now: SystemTime) -> fdtinstan
}
}

///
/// Target Acquisition for Object
///
#[derive(Debug, Clone)]
pub enum TargetAcquisition {
/// Transfer the object as fast as possible
AsFastAsPossible,
/// Transfer the object within the specified duration
WithinDuration(std::time::Duration),
/// Transfer the object within the specified timestamp
WithinTime(std::time::SystemTime),
}

///
/// Object (file) that can be send over FLUTE
///
Expand Down Expand Up @@ -83,13 +96,7 @@ pub struct ObjectDesc {
/// Repeat the transfer the same object multiple times
pub max_transfer_count: u32,
/// Specifies the desired duration for transferring the object to the receiver.
///
/// - If `Some(duration)` is provided, the transfer will attempt to complete within this duration.
/// - If `None`, the transfer will proceed as quickly as possible.
///
/// **Note:** The transfer may take longer if the sender's scheduling is slower than expected or
/// if other delays occur during the transfer process.
pub target_acquisition_latency: Option<std::time::Duration>,
pub target_acquisition: Option<TargetAcquisition>,
/// If defined, object is transmitted in a carousel every `carousel_delay_ns`
pub carousel_delay: Option<std::time::Duration>,
/// Define object cache control
Expand Down Expand Up @@ -122,7 +129,7 @@ impl ObjectDesc {
cache_in_ram: bool,
max_transfer_count: u32,
carousel_delay: Option<std::time::Duration>,
target_acquisition_latency: Option<std::time::Duration>,
target_acquisition: Option<TargetAcquisition>,
cache_control: Option<CacheControl>,
groups: Option<Vec<String>>,
cenc: lct::Cenc,
Expand Down Expand Up @@ -151,7 +158,7 @@ impl ObjectDesc {
content_location,
max_transfer_count,
carousel_delay,
target_acquisition_latency,
target_acquisition,
cache_control,
groups,
cenc,
Expand All @@ -166,7 +173,7 @@ impl ObjectDesc {
content_location,
max_transfer_count,
carousel_delay,
target_acquisition_latency,
target_acquisition,
cache_control,
groups,
cenc,
Expand All @@ -184,7 +191,7 @@ impl ObjectDesc {
content_location: &url::Url,
max_transfer_count: u32,
carousel_delay: Option<std::time::Duration>,
target_acquisition_latency: Option<std::time::Duration>,
target_acquisition: Option<TargetAcquisition>,
cache_control: Option<CacheControl>,
groups: Option<Vec<String>>,
cenc: lct::Cenc,
Expand All @@ -199,7 +206,7 @@ impl ObjectDesc {
content_location.clone(),
max_transfer_count,
carousel_delay,
target_acquisition_latency,
target_acquisition,
cache_control,
groups,
cenc,
Expand All @@ -216,7 +223,7 @@ impl ObjectDesc {
content_location: url::Url,
max_transfer_count: u32,
carousel_delay: Option<std::time::Duration>,
target_acquisition_latency: Option<std::time::Duration>,
target_acquisition: Option<TargetAcquisition>,
cache_control: Option<CacheControl>,
groups: Option<Vec<String>>,
cenc: lct::Cenc,
Expand Down Expand Up @@ -259,7 +266,7 @@ impl ObjectDesc {
oti,
max_transfer_count,
carousel_delay,
target_acquisition_latency,
target_acquisition,
cache_control,
groups,
toi: None,
Expand All @@ -273,7 +280,7 @@ impl ObjectDesc {
content_location: url::Url,
max_transfer_count: u32,
carousel_delay: Option<std::time::Duration>,
target_acquisition_latency: Option<std::time::Duration>,
target_acquisition: Option<TargetAcquisition>,
cache_control: Option<CacheControl>,
groups: Option<Vec<String>>,
cenc: lct::Cenc,
Expand Down Expand Up @@ -311,7 +318,7 @@ impl ObjectDesc {
oti,
max_transfer_count,
carousel_delay,
target_acquisition_latency,
target_acquisition,
cache_control,
groups,
toi: None,
Expand Down
Loading

0 comments on commit 1ff31b8

Please sign in to comment.