From 9ecb53d0aad63d05f1e7aef69c0d26adc117dd90 Mon Sep 17 00:00:00 2001 From: bits0rcerer <25325997+bits0rcerer@users.noreply.github.com> Date: Sat, 11 Jan 2025 21:28:59 +0100 Subject: [PATCH] fix: Fix memory leak for receive buffers (#46) --- CHANGELOG.md | 7 +++ README.md | 9 ---- breakwater/src/connection_buffer.rs | 76 +++++++++++++++++++++++++++++ breakwater/src/main.rs | 1 + breakwater/src/server.rs | 34 +++++-------- breakwater/src/tests.rs | 8 --- 6 files changed, 95 insertions(+), 40 deletions(-) create mode 100644 breakwater/src/connection_buffer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 22e67ac..d7f4cfa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Fixed + +- Receive buffers for pixelflut connections that were leaked before will now be properly deallocated ([#46]) +- Errors while allocating receive buffers for pixelflut connections are now handled properly ([#46]) + +[#46]: https://github.com/sbernauer/breakwater/pull/46 + ## [0.16.2] - 2024-12-30 ### Fixed diff --git a/README.md b/README.md index 21ff203..0cec8c1 100644 --- a/README.md +++ b/README.md @@ -178,15 +178,6 @@ I never used this for a longer time period, so happy about feedback! # Known issues -## Failed to memadvise sequential read access for buffer to kernel - -During high traffic at GPN22 a few warnings where raised as breakwater was not able to memadvise the created buffer to the kernel. - -This *should* not happen, as we allocate the memory area directly before but who knows. -The memadvise is only on a best-effort base, client connections while still be served even if the call fails. -The worst thing that should happen is a minimal performance degradation. -Have a look at [the issue report](https://github.com/sbernauer/breakwater/issues/28) for details. - # Performance ## Laptop diff --git a/breakwater/src/connection_buffer.rs b/breakwater/src/connection_buffer.rs new file mode 100644 index 0000000..eb0a6a4 --- /dev/null +++ b/breakwater/src/connection_buffer.rs @@ -0,0 +1,76 @@ +use std::alloc::{self, LayoutError}; + +use log::warn; +use memadvise::{Advice, MemAdviseError}; +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to create memory layout"))] + CreateMemoryLayout { + source: LayoutError, + buffer_size: usize, + page_size: usize, + }, + + #[snafu(display("Allocation failed (alloc::alloc returned null ptr) for layout {layout:?}"))] + AllocationFailed { layout: alloc::Layout }, +} + +pub struct ConnectionBuffer { + ptr: *mut u8, + layout: alloc::Layout, +} + +/// Safety: +/// - `ConnectionBuffer` has exclusive ownership of the memory behind `ptr` +/// - safe access through `ConnectionBuffer::as_slice_mut()` +unsafe impl Send for ConnectionBuffer {} + +/// Allocates a memory slice with the specified size, which can be used for client connections. +/// +/// It takes care of de-allocating the memory slice on [`Drop`]. +/// It also `memadvise`s the memory slice, so that the Kernel is aware that we are going to +/// sequentially read it. +impl ConnectionBuffer { + pub fn new(buffer_size: usize) -> Result { + let page_size = page_size::get(); + let layout = alloc::Layout::from_size_align(buffer_size, page_size).context( + CreateMemoryLayoutSnafu { + buffer_size, + page_size, + }, + )?; + + let ptr = unsafe { alloc::alloc(layout) }; + + if ptr.is_null() { + AllocationFailedSnafu { layout }.fail()?; + } + + if let Err(err) = memadvise::advise(ptr as _, layout.size(), Advice::Sequential) { + // [`MemAdviseError`] does not implement Debug... + let err = match err { + MemAdviseError::NullAddress => "NullAddress", + MemAdviseError::InvalidLength => "InvalidLength", + MemAdviseError::UnalignedAddress => "UnalignedAddress", + MemAdviseError::InvalidRange => "InvalidRange", + }; + warn!("Failed to memadvise sequential read access for buffer to kernel. This should not effect any client connections, but might having some minor performance degration: {err}"); + } + + Ok(Self { ptr, layout }) + } + + pub fn as_slice_mut(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.ptr, self.layout.size()) } + } +} + +impl Drop for ConnectionBuffer { + fn drop(&mut self) { + unsafe { + alloc::dealloc(self.ptr, self.layout); + } + } +} diff --git a/breakwater/src/main.rs b/breakwater/src/main.rs index f0340bd..7cc88c0 100644 --- a/breakwater/src/main.rs +++ b/breakwater/src/main.rs @@ -25,6 +25,7 @@ use crate::sinks::native_display::NativeDisplaySink; use crate::sinks::vnc::VncSink; mod cli_args; +mod connection_buffer; mod prometheus_exporter; mod server; mod sinks; diff --git a/breakwater/src/server.rs b/breakwater/src/server.rs index 392badf..267081f 100644 --- a/breakwater/src/server.rs +++ b/breakwater/src/server.rs @@ -1,11 +1,10 @@ -use std::alloc; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::{cmp::min, net::IpAddr, sync::Arc, time::Duration}; use breakwater_parser::{FrameBuffer, OriginalParser, Parser}; -use log::{debug, info, warn}; -use memadvise::{Advice, MemAdviseError}; +use log::{debug, info}; +use memadvise::Advice; use snafu::{ResultExt, Snafu}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -14,7 +13,10 @@ use tokio::{ time::Instant, }; -use crate::statistics::StatisticsEvent; +use crate::{ + connection_buffer::{self, ConnectionBuffer}, + statistics::StatisticsEvent, +}; const CONNECTION_DENIED_TEXT: &[u8] = b"Connection denied as connection limit is reached"; @@ -39,6 +41,9 @@ pub enum Error { WriteToStatisticsChannel { source: mpsc::error::SendError, }, + + #[snafu(display("Failed to allocate network connection buffer"))] + BufferAllocation { source: connection_buffer::Error }, } pub struct Server { @@ -79,9 +84,6 @@ impl Server { mpsc::unbounded_channel::(); let connection_dropped_tx = self.max_connections_per_ip.map(|_| connection_dropped_tx); - let page_size = page_size::get(); - debug!("System has a page size of {page_size} bytes"); - loop { let (mut socket, socket_addr) = self .listener @@ -132,7 +134,6 @@ impl Server { ip, fb_for_thread, statistics_tx_for_thread, - page_size, network_buffer_size, connection_dropped_tx_clone, ) @@ -147,7 +148,6 @@ pub async fn handle_connection( ip: IpAddr, fb: Arc, statistics_tx: mpsc::Sender, - page_size: usize, network_buffer_size: usize, connection_dropped_tx: Option>, ) -> Result<(), Error> { @@ -158,22 +158,10 @@ pub async fn handle_connection( .await .context(WriteToStatisticsChannelSnafu)?; - let layout = alloc::Layout::from_size_align(network_buffer_size, page_size).unwrap(); - let ptr = unsafe { alloc::alloc(layout) }; - let buffer = unsafe { std::slice::from_raw_parts_mut(ptr, network_buffer_size) }; + let mut recv_buf = ConnectionBuffer::new(network_buffer_size).context(BufferAllocationSnafu)?; + let buffer = recv_buf.as_slice_mut(); let mut response_buf = Vec::new(); - if let Err(err) = memadvise::advise(buffer.as_ptr() as _, buffer.len(), Advice::Sequential) { - // [`MemAdviseError`] does not implement Debug... - let err = match err { - MemAdviseError::NullAddress => "NullAddress", - MemAdviseError::InvalidLength => "InvalidLength", - MemAdviseError::UnalignedAddress => "UnalignedAddress", - MemAdviseError::InvalidRange => "InvalidRange", - }; - warn!("Failed to memadvise sequential read access for buffer to kernel. This should not effect any client connections, but might having some minor performance degration: {err}"); - } - // Number bytes left over **on the first bytes of the buffer** from the previous loop iteration let mut leftover_bytes_in_buffer = 0; diff --git a/breakwater/src/tests.rs b/breakwater/src/tests.rs index cc2370b..541b260 100644 --- a/breakwater/src/tests.rs +++ b/breakwater/src/tests.rs @@ -116,7 +116,6 @@ async fn test_safe( fb.clone(), statistics_channel.0, DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -191,7 +190,6 @@ async fn test_drawing_rect( Arc::clone(&fb), statistics_channel.0.clone(), DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -206,7 +204,6 @@ async fn test_drawing_rect( Arc::clone(&fb), statistics_channel.0.clone(), DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -221,7 +218,6 @@ async fn test_drawing_rect( Arc::clone(&fb), statistics_channel.0.clone(), DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -236,7 +232,6 @@ async fn test_drawing_rect( Arc::clone(&fb), statistics_channel.0.clone(), DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -278,7 +273,6 @@ async fn test_binary_set_pixel( fb, statistics_channel.0, DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -452,7 +446,6 @@ async fn test_binary_sync_pixels_larger_than_buffer(fb: Arc fb, statistics_channel().0, DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await @@ -469,7 +462,6 @@ async fn assert_returns(input: &[u8], expected: &str) { fb(), statistics_channel().0, DEFAULT_NETWORK_BUFFER_SIZE, - page_size::get(), None, ) .await