Skip to content

Commit

Permalink
support aliased subscriptions in alsa plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
teodly committed Feb 17, 2025
1 parent a678ed4 commit 122c760
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 77 deletions.
23 changes: 20 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ This project makes no claim to be either authorized or approved by Audinate.
* `inferno2pipe` - capture audio, writing interleaved 32-bit integer samples into an Unix named pipe (or a raw file). Helper script for recording to more convenient format is also provided. **Start here if you want to use Inferno for capturing audio without setting up whole audio stack**
* `alsa_pcm_inferno` - virtual soundcard for ALSA. **Start here if you want functionality similar to DVS**
* `searchfire` - fork of [Searchlight](https://github.com/WilliamVenner/searchlight) mDNS crate, modified for compatibility with Dante's mDNS
* `cirb` - Clock-Indexed Ring-Buffer - fork of [`rt-history`](https://github.com/HadrienG2/rt-history) crate with emphasis on allowing reordered incoming packets and clock synchronization


# Environment variables
Expand Down Expand Up @@ -134,7 +133,7 @@ Please use editor respecting `.editorconfig` (for example, VSCode needs an exten
## 0.2.0
* audio transmitter
* alpha version of Inferno Wired - virtual audio source & sink for PipeWire
* receiving clock from [Statime](https://github.com/teowoz/statime) modified for PTPv1 and virtual clock support - Linux-only for now (because CLOCK_TAI is Linux-only)
* receiving clock from [Statime](https://github.com/teodly/statime) modified for PTPv1 and virtual clock support - Linux-only for now (because CLOCK_TAI is Linux-only)
* increased receive thread priority to reduce chance of OS UDP input queue overflow

## 0.1.0
Expand All @@ -150,6 +149,7 @@ likely in order they'll be implementated

At this point, Inferno will roughly become alternative to Dante Virtual Soundcard.

* read configuration from sources other then env vars: ALSA plugin parameters, text files - useful for multiple sources & sinks in PipeWire
* send statistics (clock, latency, signal levels)
* ability to work as a clock source (PTP leader)
* automated integration test that will launch several instances, stream audio data between them and check for its correctness
Expand All @@ -161,7 +161,7 @@ At this point, Inferno will roughly become alternative to Dante Virtual Soundcar


# Design
* 99% safe Rust (unsafe is required only because PipeWire Rust bindings return raw buffers and because ALSA plugin API doesn't have safe Rust bindings)
* 99% safe Rust (unsafe is required only because ALSA plugin API doesn't have safe Rust bindings)
* no external libraries needed, the only dependencies are Rust crates


Expand Down
1 change: 1 addition & 0 deletions alsa_pcm_inferno/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ log = "0.4.20"
tokio = { version = "1.36.0", features = ["sync"] }
futures-util = "0.3.30"
lazy_static = "1.5.0"
itertools = "0.14.0"

2 changes: 2 additions & 0 deletions alsa_pcm_inferno/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
3. Add `pcm.` device with type `inferno` to your `.asoundrc` (example [`asoundrc`](asoundrc))
4. In the application there should be a place where usually device name is entered, e.g. `hw:1`. Enter the name of the device created in your `.asoundrc` there. If you've copied the example `asoundrc`, the name is just `inferno`.

Making Dante<->analog converter at a fraction of the price of an off-the-shelf device, using an SBC and PipeWire, is left as an exercise for the reader.

Application must support 32-bit signed integer audio samples. (Audacity doesn't, but generally using Audacity directly with this plugin is not a good idea) `plug` plugin shipped with ALSA should work as automatic converter for apps that don't support that format (not tested yet).

To change the number of channels, specify the environment variables `INFERNO_RX_CHANNELS` and `INFERNO_TX_CHANNELS`.
Expand Down
68 changes: 63 additions & 5 deletions alsa_pcm_inferno/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ use alsa_sys_all::*;

use futures_util::FutureExt;
use inferno_aoip::utils::{run_future_in_new_thread, LogAndForget};
use inferno_aoip::{AtomicSample, Clock, DeviceInfo, DeviceServer, ExternalBufferParameters, MediaClock, RealTimeClockReceiver, Sample, SelfInfoBuilder};
use inferno_aoip::{AtomicSample, Clock, ClockDiff, DeviceInfo, DeviceServer, ExternalBufferParameters, MediaClock, PositionReportDestination, RealTimeClockReceiver, Sample, SelfInfoBuilder};
use lazy_static::lazy_static;
use libc::{c_char, c_int, c_uint, c_void, eventfd, free, malloc, EBUSY, EFD_CLOEXEC, EPIPE, POLLIN};
use log::error;
use tokio::sync::{mpsc, oneshot};
use core::slice;
use std::borrow::BorrowMut;
use std::collections::VecDeque;
use std::env;
use std::num::Wrapping;
use std::ptr::{null_mut, null};
use std::mem::zeroed;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, Instant};
use itertools::Itertools;

struct StartArgs {
channels: Vec<ExternalBufferParameters<Sample>>,
Expand Down Expand Up @@ -123,7 +125,7 @@ struct StreamInfo {
struct MyIOPlug {
io: snd_pcm_ioplug_t,
callbacks: snd_pcm_ioplug_callback_t,
self_info: DeviceInfo,
self_info: DeviceInfo, // TODO: this is needlessly duplicated in both TX and RX instance
ref_time: Instant,
stream_info: Option<StreamInfo>,
buffers_valid: Arc<RwLock<bool>>,
Expand All @@ -134,6 +136,9 @@ struct MyIOPlug {
current_timestamp: Arc<AtomicUsize>,
on_transfer_eventfd: libc::c_int,
on_transfer: Box<dyn Fn() + Send + Sync>,
last_transfer_buffer_offset: snd_pcm_uframes_t,
transfer_offset_add: Wrapping<Clock>,
readable_pos_per_channel: Arc<Vec<AtomicUsize>>,
// TODO refactor multiple Options to single Option<struct>
}

Expand Down Expand Up @@ -233,12 +238,19 @@ unsafe extern "C" fn plugin_prepare(io: *mut snd_pcm_ioplug_t) -> c_int {
}
println!("period size: {}", (*io).period_size);

let channels_buffers = channels_areas.iter().map(|area| {
let channels_buffers = channels_areas.iter().enumerate().map(|(ch_index, area)| {
let readable_pos_dest = if (*io).stream==SND_PCM_STREAM_CAPTURE {
Some(PositionReportDestination::new(this.readable_pos_per_channel.clone(), ch_index))
} else {
None
};

ExternalBufferParameters::<Sample>::new(
area.addr.byte_offset((area.first/8) as isize) as *const AtomicSample,
((*io).buffer_size as usize) * channels_areas.len() - ((area.first/bits_per_sample) as usize),
(area.step/bits_per_sample) as usize,
this.buffers_valid.clone()
this.buffers_valid.clone(),
readable_pos_dest
)
}).collect();

Expand Down Expand Up @@ -386,7 +398,49 @@ unsafe extern "C" fn plugin_stop(io: *mut snd_pcm_ioplug_t) -> c_int {

unsafe extern "C" fn plugin_capture_transfer(io: *mut snd_pcm_ioplug_t, areas: *const snd_pcm_channel_area_t, offset: snd_pcm_uframes_t, size: snd_pcm_uframes_t) -> snd_pcm_sframes_t {
let this = get_private(io);
// FIXME: it doesn't work at all and dunno why
if offset < this.last_transfer_buffer_offset {
// FIXME: strange things may happen if process freezes for a time longer than buffer_size...
// maybe this whole zero filling should be handled in flows_rx ???
this.transfer_offset_add += (*io).buffer_size as Clock;
}
this.last_transfer_buffer_offset = offset + size;
let offset = (offset as Clock).wrapping_add(this.transfer_offset_add.0);
let need_samples_until = (offset as Clock).wrapping_add(size as Clock);
//println!("transfer: need samples from {offset} until {need_samples_until}");
let channels_areas = std::slice::from_raw_parts(areas, (*io).channels as usize);
let bits_per_sample = (8 * size_of::<Sample>()) as u32;
for (chi, area) in channels_areas.iter().enumerate() {
let have_samples = this.readable_pos_per_channel[chi as usize].load(Ordering::Acquire);
let diff = (need_samples_until as ClockDiff).wrapping_sub(have_samples as ClockDiff);
if diff > 0 {
// TODO: DRY with plugin_prepare, or use ringbuffer?
let ptr = area.addr.byte_offset((area.first/8) as isize) as *const AtomicSample;
let all_length = ((*io).buffer_size as usize) * channels_areas.len() - ((area.first/bits_per_sample) as usize);
let stride = (area.step/bits_per_sample) as usize;
let samples = slice::from_raw_parts(ptr, all_length);
let ch_len = (*io).buffer_size as usize;
debug_assert_eq!(ch_len, (all_length+stride-1) / stride);
let fill_start = if (have_samples as ClockDiff).wrapping_sub(offset as ClockDiff) < 0 {
offset as Clock
} else {
have_samples as Clock
};
let fill_end = need_samples_until as Clock;
let mut i = fill_start;
//let mut index = fill_start.wrapping_mul(stride);
//let end_index = need_samples_until.wrapping_mul(stride);
if have_samples != 0 {
error!("channel index {chi}: no samples available, filling with silence: {fill_start}..{fill_end}");
}
while i != fill_end {
samples[(i % ch_len).wrapping_mul(stride)].store(0, Ordering::Relaxed);
i = i.wrapping_add(1);
}
// TODO: suboptimal, use something like ring_buffer::for_in_ring
atomic::fence(Ordering::Release);
}
}

//println!("plugin_transfer called, size: {:?}", size);
size as snd_pcm_sframes_t
Expand Down Expand Up @@ -444,12 +498,16 @@ unsafe extern "C" fn plugin_define(pcmp: *mut *mut snd_pcm_t, name: *const c_cha
on_transfer: Box::new(move || {
libc::write(efd, [1u64].as_ptr() as *const c_void, 8);
}),
last_transfer_buffer_offset: 0,
transfer_offset_add: Wrapping(0),
readable_pos_per_channel: Arc::new((0..(if stream==SND_PCM_STREAM_CAPTURE { *rx_channels_count } else { 0 })).map(|_|0.into()).collect_vec()),
}));

let io = &mut (*myio).io;
io.version = (1<<16) | (0<<8) | 2;
io.name = PLUGIN_NAME.as_ptr() as *const _;
io.callback = &(*myio).callbacks;
io.flags = SND_PCM_IOPLUG_FLAG_BOUNDARY_WA;
io.mmap_rw = 1;

// despite ALSA PCM plugin documentation saying that poll_fd is optional,
Expand Down
1 change: 1 addition & 0 deletions inferno_aoip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mio = { version = "1.0.2", features = ["net", "os-ext", "os-poll"] }
bool_vec = "0.2.1"
bytemuck = "1.17.1"
usrvclock = { git = "https://gitlab.com/lumifaza/usrvclock-rs", version = "0.1.0" }
arrayvec = "0.7.6"

[dependencies.serde]
version = "1.0.159"
Expand Down
39 changes: 27 additions & 12 deletions inferno_aoip/src/channels_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
protocol::flows_control::FlowsControlClient,
};

use crate::ring_buffer::{self, ExternalBuffer, ExternalBufferParameters, OwnedBuffer, ProxyToSamplesBuffer, RBInput, RBOutput};
use crate::ring_buffer::{self, ExternalBuffer, ExternalBufferParameters, OwnedBuffer, ProxyToSamplesBuffer, RBInput, RBOutput, RingBufferShared};

use atomic::Atomic;
use futures::{future::join_all, Future, FutureExt};
Expand Down Expand Up @@ -149,8 +149,8 @@ impl ChannelsSubscriber {


pub trait ChannelsBuffering<P: ProxyToSamplesBuffer> {
fn connect_channel(&self, start_time: Clock, rb_output: &mut Option<RBOutput<Sample, P>>, channel_index: usize, latency_samples: usize) -> Option<RBInput<Sample, P>>;
fn disconnect_channel(&self, channel_index: usize);
fn connect_channel(&mut self, start_time: Clock, rb_output: &mut Option<RBOutput<Sample, P>>, channel_index: usize, latency_samples: usize) -> Option<RBInput<Sample, P>>;
fn disconnect_channel(&mut self, channel_index: usize, flow_index: usize, channel_in_flow: usize, flows_recv: Arc<FlowsReceiver<P>>);
}

pub struct OwnedBuffering {
Expand All @@ -166,7 +166,7 @@ impl OwnedBuffering {
}

impl ChannelsBuffering<OwnedBuffer<Atomic<Sample>>> for OwnedBuffering {
fn connect_channel(&self, start_time: Clock, rb_output: &mut Option<RBOutput<Sample, OwnedBuffer<Atomic<Sample>>>>, channel_index: usize, latency_samples: usize) -> Option<RBInput<Sample, OwnedBuffer<Atomic<Sample>>>> {
fn connect_channel(&mut self, start_time: Clock, rb_output: &mut Option<RBOutput<Sample, OwnedBuffer<Atomic<Sample>>>>, channel_index: usize, latency_samples: usize) -> Option<RBInput<Sample, OwnedBuffer<Atomic<Sample>>>> {
let (sink, source) = if rb_output.is_none() {
let (sink, source) =
ring_buffer::new_owned::<Sample>(self.buffer_length, start_time as usize, self.hole_fix_wait);
Expand All @@ -181,7 +181,7 @@ impl ChannelsBuffering<OwnedBuffer<Atomic<Sample>>> for OwnedBuffering {
});
sink
}
fn disconnect_channel(&self, channel_index: usize) {
fn disconnect_channel(&mut self, channel_index: usize, _flow_index: usize, _channel_in_flow: usize, _flows_recv: Arc<FlowsReceiver<OwnedBuffer<Atomic<Sample>>>>) {
let sc = self.samples_collector.clone();
tokio::spawn(async move {
sc.disconnect_channel(channel_index).await;
Expand All @@ -192,23 +192,35 @@ impl ChannelsBuffering<OwnedBuffer<Atomic<Sample>>> for OwnedBuffering {
pub struct ExternalBuffering {
channels: Vec<ExternalBufferParameters<Sample>>,
hole_fix_wait: usize,
ring_buffers: Vec<Option<Arc<RingBufferShared<Sample, ExternalBuffer<Atomic<Sample>>>>>>,
//on_disconnect: Vec<Option<Box<dyn FnOnce(Arc<RingBufferShared<Sample, ExternalBuffer<Atomic<Sample>>>>) -> ()>>>,
}

impl ExternalBuffering {
pub fn new(channels: Vec<ExternalBufferParameters<Sample>>, hole_fix_wait: usize) -> Self {
let channels_count = channels.len();
Self {
channels,
hole_fix_wait
hole_fix_wait,
ring_buffers: (0..channels_count).map(|_|None).collect_vec(),
}
}
}

impl ChannelsBuffering<ExternalBuffer<Atomic<Sample>>> for ExternalBuffering {
fn connect_channel(&self, start_time: Clock, rb_output: &mut Option<RBOutput<Sample, ExternalBuffer<Atomic<Sample>>>>, channel_index: usize, latency_samples: usize) -> Option<RBInput<Sample, ExternalBuffer<Atomic<Sample>>>> {
debug_assert!(rb_output.is_none());
Some(ring_buffer::wrap_external_sink(&self.channels[channel_index], start_time as usize, self.hole_fix_wait))
fn connect_channel(&mut self, start_time: Clock, rb_output: &mut Option<RBOutput<Sample, ExternalBuffer<Atomic<Sample>>>>, channel_index: usize, _latency_samples: usize) -> Option<RBInput<Sample, ExternalBuffer<Atomic<Sample>>>> {
debug_assert!(rb_output.is_none()); // ringbuffer not cached & not reused in this case
let rb_input = ring_buffer::wrap_external_sink(&self.channels[channel_index], start_time as usize, self.hole_fix_wait);
self.ring_buffers[channel_index] = Some(rb_input.shared().clone());
Some(rb_input)
}
fn disconnect_channel(&self, _channel_index: usize) {
fn disconnect_channel(&mut self, channel_index: usize, flow_index: usize, channel_in_flow: usize, flows_recv: Arc<FlowsReceiver<ExternalBuffer<Atomic<Sample>>>>) {
// TODO: this is a bit illogical, flows_recv.{connect_channel, disconnect_channel} should be in one place
if let Some(ringbuf) = self.ring_buffers[channel_index].take() {
tokio::spawn(async move {
flows_recv.disconnect_channel(flow_index, channel_in_flow, ringbuf).await;
});
}
}
}

Expand Down Expand Up @@ -358,9 +370,9 @@ impl<P: ProxyToSamplesBuffer + Sync + Send + 'static, B: ChannelsBuffering<P>> C
.log_and_forget();
}
pub async fn unsubscribe(&mut self, local_channel_index: usize, remove_from_info: bool) {
self.channels_buffering.disconnect_channel(local_channel_index);
if let Some(subscription) = &self.channels[local_channel_index] {
if let Some(remote) = subscription.remote.as_ref() {
self.channels_buffering.disconnect_channel(local_channel_index, remote.local_flow_index, remote.channel_in_flow, self.flows_recv.clone());
match &mut self.flows.lock().unwrap()[remote.local_flow_index] {
Some(flow) => {
flow.channels_refcount[remote.channel_in_flow] -= 1;
Expand All @@ -375,6 +387,8 @@ impl<P: ProxyToSamplesBuffer + Sync + Send + 'static, B: ChannelsBuffering<P>> C
self.subscriptions_info.write().unwrap()[local_channel_index] = None;
}
self.notify_channels_change([local_channel_index]).await;
} else {
error!("BUG: trying to unsubscribe not subscribed channel index {local_channel_index}");
}
}
pub async fn subscribe(
Expand Down Expand Up @@ -1011,6 +1025,7 @@ impl<P: ProxyToSamplesBuffer + Sync + Send + 'static, B: ChannelsBuffering<P>> C
&& chsub.remote.as_ref().unwrap().local_flow_index == flow_index
{
if remove {
let channel_in_flow = chsub.remote.as_ref().unwrap().channel_in_flow;
chsub.remote = None;
self.needs_resolving = true;
self.resolve_now = true;
Expand All @@ -1023,7 +1038,7 @@ impl<P: ProxyToSamplesBuffer + Sync + Send + 'static, B: ChannelsBuffering<P>> C
subi.status = SubscriptionStatus::Unresolved;
channels_changed.push(chi);
}
self.channels_buffering.disconnect_channel(chi);
self.channels_buffering.disconnect_channel(chi, flow_index, channel_in_flow, self.flows_recv.clone());
} else if receiving {
if let Some(subi) = self.subscriptions_info.write().unwrap()[chi].as_mut()
{
Expand Down
Loading

0 comments on commit 122c760

Please sign in to comment.