Skip to content

Commit

Permalink
differentiate between FPP_MAX and FFP_MAX_ADVERTISED; unfinished expe…
Browse files Browse the repository at this point in the history
…riment
  • Loading branch information
teodly committed Mar 22, 2024
1 parent 742c91e commit 3941765
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 38 deletions.
2 changes: 1 addition & 1 deletion inferno_aoip/src/channels_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl ChannelsSubscriberInternal {
flows: Arc::new(Mutex::new(vec![])),
flows_recv,
buffered_samples_per_channel: 524288,
min_latency_ns: 10_000_000, // TODO dehardcode
min_latency_ns: 30_000_000, // TODO dehardcode
control_client: Arc::new(FlowsControlClient::new(self_info)),
mdns_client,
subscriptions_info,
Expand Down
5 changes: 3 additions & 2 deletions inferno_aoip/src/flows_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use cirb::Output as RBOutput;
use cirb::Input as RBInput;

pub const FPP_MIN: u16 = 2;
pub const FPP_MAX: u16 = 32;
pub const FPP_MAX: u16 = 256;
pub const FPP_MAX_ADVERTISED: u16 = 32;
pub const MAX_FLOWS: u32 = 128;
pub const MAX_CHANNELS_IN_FLOW: u16 = 8;
pub const KEEPALIVE_TIMEOUT_SECONDS: usize = 4;
Expand Down Expand Up @@ -313,7 +314,7 @@ impl FlowsTransmitter {
cirb::RTHistory::<Sample>::new(BUFFERED_SAMPLES_PER_CHANNEL, 0).split()
}).unzip();
// TODO dehardcode latency_ns
let thread_join = run_future_in_new_thread("flows TX", move || Self::run(rx, srate, 10_000_000, channels_outputs).boxed_local());
let thread_join = run_future_in_new_thread("flows TX", move || Self::run(rx, srate, 30_000_000, channels_outputs).boxed_local());
return (Self {
commands_sender: tx,
self_info: self_info.clone(),
Expand Down
4 changes: 2 additions & 2 deletions inferno_aoip/src/mdns_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use searchfire::{
};
use std::{net::IpAddr, sync::Arc};

use crate::device_info::DeviceInfo;
use crate::{device_info::DeviceInfo, flows_tx::FPP_MAX_ADVERTISED};
use crate::protocol::{proto_arc::PORT as ARC_PORT, proto_cmc::PORT as CMC_PORT, flows_control::PORT as FLOWS_CONTROL_PORT};
use crate::flows_tx::{FPP_MIN, FPP_MAX, MAX_CHANNELS_IN_FLOW};

Expand Down Expand Up @@ -68,7 +68,7 @@ pub fn start_server(self_info: Arc<DeviceInfo>) -> BroadcasterHandle {
.add_txt_truncated(kv("enc", self_info.bits_per_sample))
.add_txt_truncated(kv("en", self_info.bits_per_sample))
.add_txt_truncated(kv("latency_ns", self_info.latency_ns))
.add_txt_truncated(format!("fpp={},{}", FPP_MAX, FPP_MIN))
.add_txt_truncated(format!("fpp={},{}", FPP_MAX_ADVERTISED, FPP_MIN))
.add_txt_truncated(kv("nchan", MAX_CHANNELS_IN_FLOW.min(self_info.tx_channels.len() as u16)));
if default {
b = b.add_txt_truncated("default");
Expand Down
43 changes: 27 additions & 16 deletions inferno_aoip/src/samples_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ pub struct RealTimeSamplesReceiver {
}

impl RealTimeSamplesReceiver {
fn get_min_max_end_timestamps(&self) -> Option<(Clock, Clock)> {
get_min_max_end_timestamps(self.channels.iter().map(|chrecv|chrecv.get()))
}
pub fn get_available_num_samples(&mut self, start_timestamp: Clock) -> usize {
self.get_min_max_end_timestamps().map(|(end_ts, _)| {
let diff = wrapped_diff(end_ts, start_timestamp);
if diff > 0 {
diff as Clock
} else {
0
}
}).unwrap_or(0)
}
pub fn get_samples(&mut self, start_timestamp: Clock, channel_index: usize, buffer: &mut [Sample]) -> bool {
let chrecv = &mut self.channels[channel_index];
chrecv.update();
Expand Down Expand Up @@ -156,24 +169,22 @@ struct PeriodicSamplesCollector {
callback: SamplesCallback,
}

fn get_min_max_end_timestamps<'a>(channels: impl IntoIterator<Item = &'a Option<Channel>>) -> Option<(Clock, Clock)> {
let clocks = channels
.into_iter()
.filter_map(|opt| opt.as_ref())
.map(|ch| ch.source.readable_until())
.filter_map(|opt| opt)
.collect_vec();
Some((
clocks.iter().min_by(|&&a, &&b| wrapped_diff(a, b).cmp(&0))?.to_owned(),
clocks.iter().max_by(|&&a, &&b| wrapped_diff(a, b).cmp(&0))?.to_owned()
))
}

impl PeriodicSamplesCollector {
fn get_end_timestamps(&self) -> impl Iterator<Item = Clock> + '_ {
self
.channels
.iter()
.filter_map(|opt| opt.as_ref())
.map(|ch| ch.source.readable_until())
.filter_map(|opt| opt)
}
fn get_min_max_end_timestamps(&self) -> Option<(Clock, Clock)> {
let clocks = self.get_end_timestamps().collect_vec();
if clocks.is_empty() {
return None;
}
return Some((
clocks.iter().min_by(|&a, &b| wrapped_diff(*a, *b).cmp(&0)).unwrap().to_owned(),
clocks.iter().max_by(|&a, &b| wrapped_diff(*a, *b).cmp(&0)).unwrap().to_owned()
));
get_min_max_end_timestamps(&self.channels)
}
async fn run(&mut self) {
let mut clock = None;
Expand Down
50 changes: 33 additions & 17 deletions inferno_wired/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use spa::param::format_utils;
use spa::pod::Pod;
#[cfg(feature = "v0_3_44")]
use spa::WritableDict;
use std::cell::RefCell;
use std::convert::TryInto;
use std::ops::Deref;
use std::thread::sleep;
Expand Down Expand Up @@ -57,8 +58,9 @@ impl Direction {
}
}

fn create_stream<F>(dir: Direction, core: &pw::core::Core, app_name: &str, sample_rate: u32, num_channels: u32, state: Arc<StreamState>, mut process_channel_cb: F) -> Result<(Stream, StreamListener<UserData>), pw::Error>
where F: FnMut(usize, usize, &mut [i32]) -> bool + 'static {
fn create_stream<FGetNumFrames, FProcessBuffer>(dir: Direction, core: &pw::core::Core, app_name: &str, sample_rate: u32, num_channels: u32, state: Arc<StreamState>, mut get_num_frames: FGetNumFrames, mut process_channel_cb: FProcessBuffer) -> Result<(Stream, StreamListener<UserData>), pw::Error>
where FProcessBuffer: FnMut(usize, usize, &mut [i32]) -> bool + 'static,
FGetNumFrames: FnMut(usize) -> usize + 'static {
let data = UserData {
format: Default::default()
};
Expand All @@ -75,8 +77,10 @@ where F: FnMut(usize, usize, &mut [i32]) -> bool + 'static {
* the data.
*/
let props = properties! {
"clock.name" => dir.select("network.inferno.from_network", "network.inferno.to_network"),
"clock.id" => dir.select("network.inferno.from_network", "network.inferno.to_network"),
/* "clock.name" => dir.select("network.inferno.from_network", "network.inferno.to_network"),
"clock.id" => dir.select("network.inferno.from_network", "network.inferno.to_network"), */
"clock.name" => "network.inferno",
"clock.id" => "network.inferno",
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => dir.select("Playback", "Capture"),
*pw::keys::MEDIA_ROLE => "Production",
Expand Down Expand Up @@ -143,25 +147,25 @@ where F: FnMut(usize, usize, &mut [i32]) -> bool + 'static {
return;
}
let n_channels = user_data.format.channels();

let n_frames_internal = get_num_frames(state.next_ts.load(Ordering::Acquire));
let n_samples_opt = match dir {
Direction::FromNetwork => datas.iter_mut().take(n_channels as usize).filter_map(|data|data.data()).map(|bytes|bytes.len()/mem::size_of::<i32>()).min(),
Direction::ToNetwork => datas.iter_mut().take(n_channels as usize).map(|data|data.chunk().size() as usize/mem::size_of::<i32>()).min()
};
let max_n_samples = n_samples_opt.unwrap_or(0);
let mut n_samples = match dir {
Direction::FromNetwork => max_n_samples.min(state.frames_per_callback),
Direction::FromNetwork => max_n_samples.min(n_frames_internal),
Direction::ToNetwork => max_n_samples
};
if n_samples == 0 {
log::warn!("{dir:?} pipewire gave us buffer of 0 frames");
return;
} else if n_samples != state.frames_per_callback {
log::warn!("{dir:?} pipewire gave us buffer of {n_samples} frames but we want {}", state.frames_per_callback);
} else if n_samples != n_frames_internal {
log::warn!("{dir:?} pipewire gave us buffer of {n_samples} frames but we want {}", n_frames_internal);
}
let to_catchup = state.to_catchup.load(Ordering::Acquire);
if to_catchup > 0 {
let add_samples = to_catchup.min(state.frames_per_callback as isize);
let add_samples = to_catchup.min(n_frames_internal as isize);
state.to_catchup.fetch_sub(add_samples, Ordering::AcqRel);
log::warn!("{dir:?} catching up: before add: {n_samples}");
n_samples = (n_samples + add_samples as usize).min(max_n_samples);
Expand Down Expand Up @@ -288,9 +292,16 @@ pub fn main() -> Result<(), pw::Error> {
frames_per_callback
});

let from_network_stream = match create_stream(Direction::FromNetwork, &core, &self_info.friendly_hostname, sample_rate, rx_channels as u32, from_network_state.clone(), move |start_ts, channel_index, mut samples| {
samples_receiver.get_samples(start_ts, channel_index, &mut samples)
}) {
let from_network_stream = match create_stream(Direction::FromNetwork, &core, &self_info.friendly_hostname, sample_rate, rx_channels as u32, from_network_state.clone(),
move |_start_ts| {
// TODO
//samples_receiver.get_available_num_samples(start_ts)
frames_per_callback
},
move |start_ts, channel_index, mut samples| {
samples_receiver.get_samples(start_ts, channel_index, &mut samples)
}
) {
Ok((stream, stream_listener)) => {
Box::leak(Box::new(stream_listener));
stream
Expand All @@ -307,10 +318,15 @@ pub fn main() -> Result<(), pw::Error> {
remaining_process: 0.into(),
frames_per_callback
});
let to_network_stream = match create_stream(Direction::ToNetwork, &core, &self_info.friendly_hostname, sample_rate, tx_channels as u32, to_network_state.clone(), move |start_ts, channel_index, samples| {
tx_inputs[channel_index].write_from_at(start_ts, samples[0..].into_iter().map(|e|*e));
true
}) {
let to_network_stream = match create_stream(Direction::ToNetwork, &core, &self_info.friendly_hostname, sample_rate, tx_channels as u32, to_network_state.clone(),
move |_start_ts| {
frames_per_callback
},
move |start_ts, channel_index, samples| {
tx_inputs[channel_index].write_from_at(start_ts, samples[0..].into_iter().map(|e|*e));
true
}
) {
Ok((stream, stream_listener)) => {
Box::leak(Box::new(stream_listener));
stream
Expand Down Expand Up @@ -368,7 +384,7 @@ pub fn main() -> Result<(), pw::Error> {
state.next_ts.store(now, Ordering::Release);
}
}
let too_early = (!too_late) && drift < 0;;
let too_early = (!too_late) && drift < 0;
if !too_early {
// do not call if realtime thread is too early - we may try to dequeue not yet ready samples then
trig_count += 1;
Expand Down

0 comments on commit 3941765

Please sign in to comment.