From 7c4b445603a2b3fc726c94c189792ff463ab12f2 Mon Sep 17 00:00:00 2001 From: Ferran Maura Date: Mon, 4 Mar 2024 20:21:15 +0100 Subject: [PATCH 1/3] Fixed bitrate calculation at client for Statistics/plots and Adaptive bitrate --- alvr/client_core/src/connection.rs | 16 ++++++++- alvr/client_core/src/statistics.rs | 5 +++ alvr/packets/src/lib.rs | 1 + alvr/server/src/bitrate.rs | 3 +- alvr/server/src/connection.rs | 2 ++ alvr/server/src/statistics.rs | 8 +---- alvr/sockets/src/stream_socket.rs | 52 +++++++++++++++++++++++++++++- 7 files changed, 77 insertions(+), 10 deletions(-) diff --git a/alvr/client_core/src/connection.rs b/alvr/client_core/src/connection.rs index 9884b26f27..eb1d26f55e 100644 --- a/alvr/client_core/src/connection.rs +++ b/alvr/client_core/src/connection.rs @@ -271,6 +271,7 @@ fn connection_pipeline(capabilities: ClientCapabilities) -> ConResult { stream_socket.subscribe_to_stream::(HAPTICS, MAX_UNREAD_PACKETS); let statistics_sender = stream_socket.request_stream(STATISTICS); + let mut actual_throughput_inseconds: f32 = 30_000_000.0; let video_receive_thread = thread::spawn(move || { let mut stream_corrupted = false; while is_streaming() { @@ -283,8 +284,21 @@ fn connection_pipeline(capabilities: ClientCapabilities) -> ConResult { return; }; + let dt_throughput = data.get_throughput_timediff(); + + if dt_throughput != Duration::ZERO { // TODO: Assuming UDP for 42.0, for TCP it would be 54.0 instead. This loses a bit of accuracy for TCP (it's still a good estimate) but I could need to import session settings in the future + + let data_including_headers = (data.get_size_frame_bytes()) as f32 + 42.0 *(data.get_shards_in_frame() as f32 ); // UDP and IPv4 headers count as bytes for throughput too + actual_throughput_inseconds = data_including_headers * 8.0 / dt_throughput.as_secs_f32(); // bitrate for encoder is in bits per second, here we had bytes; so we need to multiply by 8 the data size + } + else{ + actual_throughput_inseconds = 0.0; + } + + if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { - stats.report_video_packet_received(header.timestamp); + stats.report_video_packet_received(header.timestamp ); + stats.report_throughput_client(header.timestamp, actual_throughput_inseconds) } if header.is_idr { diff --git a/alvr/client_core/src/statistics.rs b/alvr/client_core/src/statistics.rs index af9791d06e..bf855291d7 100644 --- a/alvr/client_core/src/statistics.rs +++ b/alvr/client_core/src/statistics.rs @@ -94,6 +94,11 @@ impl StatisticsManager { } } + pub fn report_throughput_client(&mut self, target_timestamp: Duration, throughput_client: f32){ + if let Some(frame) = self.history_buffer.iter_mut().find(|frame| frame.client_stats.target_timestamp == target_timestamp) { + frame.client_stats.throughput_client = throughput_client; + } + } // vsync_queue is the latency between this call and the vsync. it cannot be measured by ALVR and // should be reported by the VR runtime pub fn report_submit(&mut self, target_timestamp: Duration, vsync_queue: Duration) { diff --git a/alvr/packets/src/lib.rs b/alvr/packets/src/lib.rs index c002a74624..f11c7d5fe9 100644 --- a/alvr/packets/src/lib.rs +++ b/alvr/packets/src/lib.rs @@ -314,6 +314,7 @@ pub struct ClientStatistics { pub rendering: Duration, pub vsync_queue: Duration, pub total_pipeline_latency: Duration, + pub throughput_client: f32, } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/alvr/server/src/bitrate.rs b/alvr/server/src/bitrate.rs index 2d1889d408..dcf461317f 100644 --- a/alvr/server/src/bitrate.rs +++ b/alvr/server/src/bitrate.rs @@ -99,6 +99,7 @@ impl BitrateManager { timestamp: Duration, network_latency: Duration, decoder_latency: Duration, + throughput_reported_client: f32, ) { if network_latency.is_zero() { return; @@ -109,7 +110,7 @@ impl BitrateManager { while let Some(&(timestamp_, size_bits)) = self.packet_sizes_bits_history.front() { if timestamp_ == timestamp { self.bitrate_average - .submit_sample(size_bits as f32 / network_latency.as_secs_f32()); + .submit_sample(throughput_reported_client); self.packet_sizes_bits_history.pop_front(); diff --git a/alvr/server/src/connection.rs b/alvr/server/src/connection.rs index 89e3dfb3bb..ea8f97ca82 100644 --- a/alvr/server/src/connection.rs +++ b/alvr/server/src/connection.rs @@ -1015,6 +1015,7 @@ fn connection_pipeline( if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { let timestamp = client_stats.target_timestamp; let decoder_latency = client_stats.video_decode; + let throughput_client: f32 = client_stats.throughput_client; let network_latency = stats.report_statistics(client_stats); let server_data_lock = SERVER_DATA_MANAGER.read(); @@ -1023,6 +1024,7 @@ fn connection_pipeline( timestamp, network_latency, decoder_latency, + throughput_client, ); } } diff --git a/alvr/server/src/statistics.rs b/alvr/server/src/statistics.rs index 879f6efa06..ad66f472a7 100644 --- a/alvr/server/src/statistics.rs +++ b/alvr/server/src/statistics.rs @@ -271,13 +271,7 @@ impl StatisticsManager { self.packets_lost_partial_sum = 0; } - // While not accurate, this prevents NaNs and zeros that would cause a crash or pollute - // the graph - let bitrate_bps = if network_latency != Duration::ZERO { - frame.video_packet_bytes as f32 * 8.0 / network_latency.as_secs_f32() - } else { - 0.0 - }; + let bitrate_bps = client_stats.throughput_client; // todo: use target timestamp in nanoseconds. the dashboard needs to use the first // timestamp as the graph time origin. diff --git a/alvr/sockets/src/stream_socket.rs b/alvr/sockets/src/stream_socket.rs index ce882fc47d..814bb2fe57 100644 --- a/alvr/sockets/src/stream_socket.rs +++ b/alvr/sockets/src/stream_socket.rs @@ -30,6 +30,7 @@ use std::{ net::{IpAddr, TcpListener, UdpSocket}, sync::{mpsc, Arc}, time::Duration, + time::Instant, }; const SHARD_PREFIX_SIZE: usize = mem::size_of::() // packet length - field itself (4 bytes) @@ -169,13 +170,24 @@ pub struct ReceiverData { used_buffer_queue: mpsc::Sender>, had_packet_loss: bool, _phantom: PhantomData, + throughput_time_diff_frame: Duration, + shards_in_frame: usize, } impl ReceiverData { pub fn had_packet_loss(&self) -> bool { self.had_packet_loss } -} + + pub fn get_throughput_timediff(&self) -> Duration { + self.throughput_time_diff_frame + } + pub fn get_size_frame_bytes(&self) -> usize{ + self.size + } + pub fn get_shards_in_frame(&self) -> usize{ + self.shards_in_frame + }} impl ReceiverData { pub fn get(&self) -> Result<(H, &[u8])> { @@ -202,8 +214,12 @@ struct ReconstructedPacket { index: u32, buffer: Vec, size: usize, // contains prefix + throughput_diff_frame: Duration, + shards_in_frame: usize, } +pub const VIDEO_ID: u16 = 3; + pub struct StreamReceiver { packet_receiver: mpsc::Receiver, used_buffer_queue: mpsc::Sender>, @@ -257,6 +273,8 @@ impl StreamReceiver { used_buffer_queue: self.used_buffer_queue.clone(), had_packet_loss, _phantom: PhantomData, + throughput_time_diff_frame: packet.throughput_diff_frame, + shards_in_frame: packet.shards_in_frame, }) } } @@ -323,6 +341,10 @@ impl StreamSocketBuilder { receive_socket, shard_recv_state: None, stream_recv_components: HashMap::new(), + first_shard_frame_rx: Some(Instant::now()), + last_shard_frame_rx: Some(Instant::now()), + throughput_time_diff: Some(Duration::from_millis(100)), + last_new_frame_id: 0, }) } @@ -368,6 +390,10 @@ impl StreamSocketBuilder { receive_socket, shard_recv_state: None, stream_recv_components: HashMap::new(), + first_shard_frame_rx: Some(Instant::now()), + last_shard_frame_rx: Some(Instant::now()), + throughput_time_diff: Some(Duration::from_millis(100)), + last_new_frame_id: 0, }) } } @@ -405,6 +431,11 @@ pub struct StreamSocket { receive_socket: Box, shard_recv_state: Option, stream_recv_components: HashMap, + first_shard_frame_rx: Option, + last_shard_frame_rx: Option, + throughput_time_diff: Option, + last_new_frame_id: u32, + } impl StreamSocket { @@ -475,6 +506,21 @@ impl StreamSocket { let packet_index = u32::from_be_bytes(bytes[6..10].try_into().unwrap()); let shards_count = u32::from_be_bytes(bytes[10..14].try_into().unwrap()) as usize; let shard_index = u32::from_be_bytes(bytes[14..18].try_into().unwrap()) as usize; + + if stream_id == VIDEO_ID { + if packet_index != self.last_new_frame_id { + self.last_new_frame_id = packet_index; // only if we receive a new frame + if shard_index == 0 { + self.first_shard_frame_rx = Some(Instant::now()) ; + } + } + else if shard_index == shards_count - 1 { // if it's the same frame as in first_shard_frame_rx and we haven't received a new frame yet, compute time difference + self.throughput_time_diff = Some( Instant::now().saturating_duration_since( self.last_shard_frame_rx.unwrap_or_else( + || Instant::now() - Duration::from_millis(555))) ); + self.last_shard_frame_rx = Some(Instant::now()); + } + } + self.shard_recv_state.insert(RecvState { shard_length, @@ -593,6 +639,7 @@ impl StreamSocket { // Check if packet is complete and send if in_progress_packet.received_shard_indices.len() == shard_recv_state_mut.shards_count { let size = in_progress_packet.buffer_length; + let get_shards_in_frame = shard_recv_state_mut.shards_count; components .packet_queue .send(ReconstructedPacket { @@ -603,6 +650,9 @@ impl StreamSocket { .unwrap() .buffer, size, + throughput_diff_frame: self.throughput_time_diff.unwrap_or(Duration::from_millis(100)), + shards_in_frame: get_shards_in_frame, + }) .ok(); From f41fee1666d250b9eef3b4df2b016542120fa8e2 Mon Sep 17 00:00:00 2001 From: Ferran Maura Date: Mon, 4 Mar 2024 20:38:22 +0100 Subject: [PATCH 2/3] formatted code --- alvr/client_core/src/connection.rs | 19 ++++---- alvr/client_core/src/statistics.rs | 10 +++-- alvr/packets/src/lib.rs | 2 +- alvr/server/src/bitrate.rs | 2 +- alvr/server/src/connection.rs | 4 +- alvr/server/src/statistics.rs | 2 +- alvr/sockets/src/stream_socket.rs | 70 ++++++++++++++++-------------- 7 files changed, 59 insertions(+), 50 deletions(-) diff --git a/alvr/client_core/src/connection.rs b/alvr/client_core/src/connection.rs index eb1d26f55e..3d164ea906 100644 --- a/alvr/client_core/src/connection.rs +++ b/alvr/client_core/src/connection.rs @@ -286,18 +286,19 @@ fn connection_pipeline(capabilities: ClientCapabilities) -> ConResult { let dt_throughput = data.get_throughput_timediff(); - if dt_throughput != Duration::ZERO { // TODO: Assuming UDP for 42.0, for TCP it would be 54.0 instead. This loses a bit of accuracy for TCP (it's still a good estimate) but I could need to import session settings in the future - - let data_including_headers = (data.get_size_frame_bytes()) as f32 + 42.0 *(data.get_shards_in_frame() as f32 ); // UDP and IPv4 headers count as bytes for throughput too - actual_throughput_inseconds = data_including_headers * 8.0 / dt_throughput.as_secs_f32(); // bitrate for encoder is in bits per second, here we had bytes; so we need to multiply by 8 the data size - } - else{ - actual_throughput_inseconds = 0.0; - } + if dt_throughput != Duration::ZERO { + // TODO: Assuming UDP for 42.0, for TCP it would be 54.0 instead. This loses a bit of accuracy for TCP (it's still a good estimate) but I could need to import session settings in the future + let data_including_headers = (data.get_size_frame_bytes()) as f32 + + 42.0 * (data.get_shards_in_frame() as f32); // UDP and IPv4 headers count as bytes for throughput too + actual_throughput_inseconds = + data_including_headers * 8.0 / dt_throughput.as_secs_f32(); // bitrate for encoder is in bits per second, here we had bytes; so we need to multiply by 8 the data size + } else { + actual_throughput_inseconds = 0.0; + } if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { - stats.report_video_packet_received(header.timestamp ); + stats.report_video_packet_received(header.timestamp); stats.report_throughput_client(header.timestamp, actual_throughput_inseconds) } diff --git a/alvr/client_core/src/statistics.rs b/alvr/client_core/src/statistics.rs index bf855291d7..e13ff50083 100644 --- a/alvr/client_core/src/statistics.rs +++ b/alvr/client_core/src/statistics.rs @@ -94,9 +94,13 @@ impl StatisticsManager { } } - pub fn report_throughput_client(&mut self, target_timestamp: Duration, throughput_client: f32){ - if let Some(frame) = self.history_buffer.iter_mut().find(|frame| frame.client_stats.target_timestamp == target_timestamp) { - frame.client_stats.throughput_client = throughput_client; + pub fn report_throughput_client(&mut self, target_timestamp: Duration, throughput_client: f32) { + if let Some(frame) = self + .history_buffer + .iter_mut() + .find(|frame| frame.client_stats.target_timestamp == target_timestamp) + { + frame.client_stats.throughput_client = throughput_client; } } // vsync_queue is the latency between this call and the vsync. it cannot be measured by ALVR and diff --git a/alvr/packets/src/lib.rs b/alvr/packets/src/lib.rs index f11c7d5fe9..3ea95690be 100644 --- a/alvr/packets/src/lib.rs +++ b/alvr/packets/src/lib.rs @@ -314,7 +314,7 @@ pub struct ClientStatistics { pub rendering: Duration, pub vsync_queue: Duration, pub total_pipeline_latency: Duration, - pub throughput_client: f32, + pub throughput_client: f32, } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/alvr/server/src/bitrate.rs b/alvr/server/src/bitrate.rs index dcf461317f..c1a93202e8 100644 --- a/alvr/server/src/bitrate.rs +++ b/alvr/server/src/bitrate.rs @@ -99,7 +99,7 @@ impl BitrateManager { timestamp: Duration, network_latency: Duration, decoder_latency: Duration, - throughput_reported_client: f32, + throughput_reported_client: f32, ) { if network_latency.is_zero() { return; diff --git a/alvr/server/src/connection.rs b/alvr/server/src/connection.rs index ea8f97ca82..3af8b0868c 100644 --- a/alvr/server/src/connection.rs +++ b/alvr/server/src/connection.rs @@ -1015,7 +1015,7 @@ fn connection_pipeline( if let Some(stats) = &mut *STATISTICS_MANAGER.lock() { let timestamp = client_stats.target_timestamp; let decoder_latency = client_stats.video_decode; - let throughput_client: f32 = client_stats.throughput_client; + let throughput_client: f32 = client_stats.throughput_client; let network_latency = stats.report_statistics(client_stats); let server_data_lock = SERVER_DATA_MANAGER.read(); @@ -1024,7 +1024,7 @@ fn connection_pipeline( timestamp, network_latency, decoder_latency, - throughput_client, + throughput_client, ); } } diff --git a/alvr/server/src/statistics.rs b/alvr/server/src/statistics.rs index ad66f472a7..53a1d633a8 100644 --- a/alvr/server/src/statistics.rs +++ b/alvr/server/src/statistics.rs @@ -271,7 +271,7 @@ impl StatisticsManager { self.packets_lost_partial_sum = 0; } - let bitrate_bps = client_stats.throughput_client; + let bitrate_bps = client_stats.throughput_client; // todo: use target timestamp in nanoseconds. the dashboard needs to use the first // timestamp as the graph time origin. diff --git a/alvr/sockets/src/stream_socket.rs b/alvr/sockets/src/stream_socket.rs index 814bb2fe57..f9a271cb9c 100644 --- a/alvr/sockets/src/stream_socket.rs +++ b/alvr/sockets/src/stream_socket.rs @@ -30,7 +30,7 @@ use std::{ net::{IpAddr, TcpListener, UdpSocket}, sync::{mpsc, Arc}, time::Duration, - time::Instant, + time::Instant, }; const SHARD_PREFIX_SIZE: usize = mem::size_of::() // packet length - field itself (4 bytes) @@ -170,8 +170,8 @@ pub struct ReceiverData { used_buffer_queue: mpsc::Sender>, had_packet_loss: bool, _phantom: PhantomData, - throughput_time_diff_frame: Duration, - shards_in_frame: usize, + throughput_time_diff_frame: Duration, + shards_in_frame: usize, } impl ReceiverData { @@ -182,12 +182,13 @@ impl ReceiverData { pub fn get_throughput_timediff(&self) -> Duration { self.throughput_time_diff_frame } - pub fn get_size_frame_bytes(&self) -> usize{ + pub fn get_size_frame_bytes(&self) -> usize { self.size } - pub fn get_shards_in_frame(&self) -> usize{ - self.shards_in_frame - }} + pub fn get_shards_in_frame(&self) -> usize { + self.shards_in_frame + } +} impl ReceiverData { pub fn get(&self) -> Result<(H, &[u8])> { @@ -214,8 +215,8 @@ struct ReconstructedPacket { index: u32, buffer: Vec, size: usize, // contains prefix - throughput_diff_frame: Duration, - shards_in_frame: usize, + throughput_diff_frame: Duration, + shards_in_frame: usize, } pub const VIDEO_ID: u16 = 3; @@ -274,7 +275,7 @@ impl StreamReceiver { had_packet_loss, _phantom: PhantomData, throughput_time_diff_frame: packet.throughput_diff_frame, - shards_in_frame: packet.shards_in_frame, + shards_in_frame: packet.shards_in_frame, }) } } @@ -341,10 +342,10 @@ impl StreamSocketBuilder { receive_socket, shard_recv_state: None, stream_recv_components: HashMap::new(), - first_shard_frame_rx: Some(Instant::now()), - last_shard_frame_rx: Some(Instant::now()), - throughput_time_diff: Some(Duration::from_millis(100)), - last_new_frame_id: 0, + first_shard_frame_rx: Some(Instant::now()), + last_shard_frame_rx: Some(Instant::now()), + throughput_time_diff: Some(Duration::from_millis(100)), + last_new_frame_id: 0, }) } @@ -390,10 +391,10 @@ impl StreamSocketBuilder { receive_socket, shard_recv_state: None, stream_recv_components: HashMap::new(), - first_shard_frame_rx: Some(Instant::now()), - last_shard_frame_rx: Some(Instant::now()), - throughput_time_diff: Some(Duration::from_millis(100)), - last_new_frame_id: 0, + first_shard_frame_rx: Some(Instant::now()), + last_shard_frame_rx: Some(Instant::now()), + throughput_time_diff: Some(Duration::from_millis(100)), + last_new_frame_id: 0, }) } } @@ -431,11 +432,10 @@ pub struct StreamSocket { receive_socket: Box, shard_recv_state: Option, stream_recv_components: HashMap, - first_shard_frame_rx: Option, + first_shard_frame_rx: Option, last_shard_frame_rx: Option, throughput_time_diff: Option, last_new_frame_id: u32, - } impl StreamSocket { @@ -506,22 +506,25 @@ impl StreamSocket { let packet_index = u32::from_be_bytes(bytes[6..10].try_into().unwrap()); let shards_count = u32::from_be_bytes(bytes[10..14].try_into().unwrap()) as usize; let shard_index = u32::from_be_bytes(bytes[14..18].try_into().unwrap()) as usize; - + if stream_id == VIDEO_ID { - if packet_index != self.last_new_frame_id { - self.last_new_frame_id = packet_index; // only if we receive a new frame + if packet_index != self.last_new_frame_id { + self.last_new_frame_id = packet_index; // only if we receive a new frame if shard_index == 0 { - self.first_shard_frame_rx = Some(Instant::now()) ; + self.first_shard_frame_rx = Some(Instant::now()); } + } else if shard_index == shards_count - 1 { + // if it's the same frame as in first_shard_frame_rx and we haven't received a new frame yet, compute time difference + self.throughput_time_diff = Some( + Instant::now().saturating_duration_since( + self.last_shard_frame_rx + .unwrap_or_else(|| Instant::now() - Duration::from_millis(555)), + ), + ); + self.last_shard_frame_rx = Some(Instant::now()); } - else if shard_index == shards_count - 1 { // if it's the same frame as in first_shard_frame_rx and we haven't received a new frame yet, compute time difference - self.throughput_time_diff = Some( Instant::now().saturating_duration_since( self.last_shard_frame_rx.unwrap_or_else( - || Instant::now() - Duration::from_millis(555))) ); - self.last_shard_frame_rx = Some(Instant::now()); - } } - self.shard_recv_state.insert(RecvState { shard_length, stream_id, @@ -639,7 +642,7 @@ impl StreamSocket { // Check if packet is complete and send if in_progress_packet.received_shard_indices.len() == shard_recv_state_mut.shards_count { let size = in_progress_packet.buffer_length; - let get_shards_in_frame = shard_recv_state_mut.shards_count; + let get_shards_in_frame = shard_recv_state_mut.shards_count; components .packet_queue .send(ReconstructedPacket { @@ -650,9 +653,10 @@ impl StreamSocket { .unwrap() .buffer, size, - throughput_diff_frame: self.throughput_time_diff.unwrap_or(Duration::from_millis(100)), + throughput_diff_frame: self + .throughput_time_diff + .unwrap_or(Duration::from_millis(100)), shards_in_frame: get_shards_in_frame, - }) .ok(); From b4d2a685f32d09fd8bc244ff113f995edf75706e Mon Sep 17 00:00:00 2001 From: Ferran Maura Date: Tue, 5 Mar 2024 10:25:02 +0100 Subject: [PATCH 3/3] added wiki explanation --- wiki/How-ALVR-works.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wiki/How-ALVR-works.md b/wiki/How-ALVR-works.md index 347bbbd65d..a709d06809 100644 --- a/wiki/How-ALVR-works.md +++ b/wiki/How-ALVR-works.md @@ -274,6 +274,10 @@ The specific packet format used over the network is not clearly defined since AL Since the amount of data streamed is large, the socket buffer size is increased both on the driver side and on the client. +Previous versions of ALVR made an estimation at the server of the network throughput that the client receives (from video), by dividing the size in bits of each frame between the network latency. Unfortunately, some secondary issues came from this due to noisiness and undeterministic changes in the RTT. These problems affected mainly the effective bitrate of the video stream, and could cause huge drops in quality when network latency increased. + +A change has been made so that at the stream_socket level for video streaming, the client keeps in memory the Instant of receiving the last shard of a frame (using shard_index field in header). In this way, when a newer frame is received the time difference at client level can be computed. This time difference is then used at a higher level on client_core/src/connection.rs to compute an estimation of the throughput received by the client at each frame, also taking into account the individual shard headers. This value is passed through ClientStatistics to the server, and from server/src/connection.rs the client throughput is used finally at the BitrateManager; where the reported client value is submitted to the bitrate average. + ## SteamVR driver The driver is the component responsible for most of the streamer functionality. It is implemented as a shared library loaded by SteamVR. It implements the [OpenVR API](https://github.com/ValveSoftware/openvr) in order to interface with SteamVR.