Skip to content

Commit

Permalink
fix: Fix memory leak for receive buffers (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
bits0rcerer authored Jan 11, 2025
1 parent 6accba3 commit 9ecb53d
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 40 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Fixed

- Receive buffers for pixelflut connections that were leaked before will now be properly deallocated ([#46])
- Errors while allocating receive buffers for pixelflut connections are now handled properly ([#46])

[#46]: https://github.com/sbernauer/breakwater/pull/46

## [0.16.2] - 2024-12-30

### Fixed
Expand Down
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,6 @@ I never used this for a longer time period, so happy about feedback!

# Known issues

## Failed to memadvise sequential read access for buffer to kernel

During high traffic at GPN22 a few warnings where raised as breakwater was not able to memadvise the created buffer to the kernel.

This *should* not happen, as we allocate the memory area directly before but who knows.
The memadvise is only on a best-effort base, client connections while still be served even if the call fails.
The worst thing that should happen is a minimal performance degradation.
Have a look at [the issue report](https://github.com/sbernauer/breakwater/issues/28) for details.

# Performance

## Laptop
Expand Down
76 changes: 76 additions & 0 deletions breakwater/src/connection_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::alloc::{self, LayoutError};

use log::warn;
use memadvise::{Advice, MemAdviseError};
use snafu::{ResultExt, Snafu};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to create memory layout"))]
CreateMemoryLayout {
source: LayoutError,
buffer_size: usize,
page_size: usize,
},

#[snafu(display("Allocation failed (alloc::alloc returned null ptr) for layout {layout:?}"))]
AllocationFailed { layout: alloc::Layout },
}

pub struct ConnectionBuffer {
ptr: *mut u8,
layout: alloc::Layout,
}

/// Safety:
/// - `ConnectionBuffer` has exclusive ownership of the memory behind `ptr`
/// - safe access through `ConnectionBuffer::as_slice_mut()`
unsafe impl Send for ConnectionBuffer {}

/// Allocates a memory slice with the specified size, which can be used for client connections.
///
/// It takes care of de-allocating the memory slice on [`Drop`].
/// It also `memadvise`s the memory slice, so that the Kernel is aware that we are going to
/// sequentially read it.
impl ConnectionBuffer {
pub fn new(buffer_size: usize) -> Result<Self, Error> {
let page_size = page_size::get();
let layout = alloc::Layout::from_size_align(buffer_size, page_size).context(
CreateMemoryLayoutSnafu {
buffer_size,
page_size,
},
)?;

let ptr = unsafe { alloc::alloc(layout) };

if ptr.is_null() {
AllocationFailedSnafu { layout }.fail()?;
}

if let Err(err) = memadvise::advise(ptr as _, layout.size(), Advice::Sequential) {
// [`MemAdviseError`] does not implement Debug...
let err = match err {
MemAdviseError::NullAddress => "NullAddress",
MemAdviseError::InvalidLength => "InvalidLength",
MemAdviseError::UnalignedAddress => "UnalignedAddress",
MemAdviseError::InvalidRange => "InvalidRange",
};
warn!("Failed to memadvise sequential read access for buffer to kernel. This should not effect any client connections, but might having some minor performance degration: {err}");
}

Ok(Self { ptr, layout })
}

pub fn as_slice_mut(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.layout.size()) }
}
}

impl Drop for ConnectionBuffer {
fn drop(&mut self) {
unsafe {
alloc::dealloc(self.ptr, self.layout);
}
}
}
1 change: 1 addition & 0 deletions breakwater/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::sinks::native_display::NativeDisplaySink;
use crate::sinks::vnc::VncSink;

mod cli_args;
mod connection_buffer;
mod prometheus_exporter;
mod server;
mod sinks;
Expand Down
34 changes: 11 additions & 23 deletions breakwater/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::alloc;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::{cmp::min, net::IpAddr, sync::Arc, time::Duration};

use breakwater_parser::{FrameBuffer, OriginalParser, Parser};
use log::{debug, info, warn};
use memadvise::{Advice, MemAdviseError};
use log::{debug, info};
use memadvise::Advice;
use snafu::{ResultExt, Snafu};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
Expand All @@ -14,7 +13,10 @@ use tokio::{
time::Instant,
};

use crate::statistics::StatisticsEvent;
use crate::{
connection_buffer::{self, ConnectionBuffer},
statistics::StatisticsEvent,
};

const CONNECTION_DENIED_TEXT: &[u8] = b"Connection denied as connection limit is reached";

Expand All @@ -39,6 +41,9 @@ pub enum Error {
WriteToStatisticsChannel {
source: mpsc::error::SendError<StatisticsEvent>,
},

#[snafu(display("Failed to allocate network connection buffer"))]
BufferAllocation { source: connection_buffer::Error },
}

pub struct Server<FB: FrameBuffer> {
Expand Down Expand Up @@ -79,9 +84,6 @@ impl<FB: FrameBuffer + Send + Sync + 'static> Server<FB> {
mpsc::unbounded_channel::<IpAddr>();
let connection_dropped_tx = self.max_connections_per_ip.map(|_| connection_dropped_tx);

let page_size = page_size::get();
debug!("System has a page size of {page_size} bytes");

loop {
let (mut socket, socket_addr) = self
.listener
Expand Down Expand Up @@ -132,7 +134,6 @@ impl<FB: FrameBuffer + Send + Sync + 'static> Server<FB> {
ip,
fb_for_thread,
statistics_tx_for_thread,
page_size,
network_buffer_size,
connection_dropped_tx_clone,
)
Expand All @@ -147,7 +148,6 @@ pub async fn handle_connection<FB: FrameBuffer>(
ip: IpAddr,
fb: Arc<FB>,
statistics_tx: mpsc::Sender<StatisticsEvent>,
page_size: usize,
network_buffer_size: usize,
connection_dropped_tx: Option<mpsc::UnboundedSender<IpAddr>>,
) -> Result<(), Error> {
Expand All @@ -158,22 +158,10 @@ pub async fn handle_connection<FB: FrameBuffer>(
.await
.context(WriteToStatisticsChannelSnafu)?;

let layout = alloc::Layout::from_size_align(network_buffer_size, page_size).unwrap();
let ptr = unsafe { alloc::alloc(layout) };
let buffer = unsafe { std::slice::from_raw_parts_mut(ptr, network_buffer_size) };
let mut recv_buf = ConnectionBuffer::new(network_buffer_size).context(BufferAllocationSnafu)?;
let buffer = recv_buf.as_slice_mut();
let mut response_buf = Vec::new();

if let Err(err) = memadvise::advise(buffer.as_ptr() as _, buffer.len(), Advice::Sequential) {
// [`MemAdviseError`] does not implement Debug...
let err = match err {
MemAdviseError::NullAddress => "NullAddress",
MemAdviseError::InvalidLength => "InvalidLength",
MemAdviseError::UnalignedAddress => "UnalignedAddress",
MemAdviseError::InvalidRange => "InvalidRange",
};
warn!("Failed to memadvise sequential read access for buffer to kernel. This should not effect any client connections, but might having some minor performance degration: {err}");
}

// Number bytes left over **on the first bytes of the buffer** from the previous loop iteration
let mut leftover_bytes_in_buffer = 0;

Expand Down
8 changes: 0 additions & 8 deletions breakwater/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ async fn test_safe<FB: FrameBuffer>(
fb.clone(),
statistics_channel.0,
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand Down Expand Up @@ -191,7 +190,6 @@ async fn test_drawing_rect<FB: FrameBuffer>(
Arc::clone(&fb),
statistics_channel.0.clone(),
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand All @@ -206,7 +204,6 @@ async fn test_drawing_rect<FB: FrameBuffer>(
Arc::clone(&fb),
statistics_channel.0.clone(),
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand All @@ -221,7 +218,6 @@ async fn test_drawing_rect<FB: FrameBuffer>(
Arc::clone(&fb),
statistics_channel.0.clone(),
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand All @@ -236,7 +232,6 @@ async fn test_drawing_rect<FB: FrameBuffer>(
Arc::clone(&fb),
statistics_channel.0.clone(),
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand Down Expand Up @@ -278,7 +273,6 @@ async fn test_binary_set_pixel<FB: FrameBuffer>(
fb,
statistics_channel.0,
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand Down Expand Up @@ -452,7 +446,6 @@ async fn test_binary_sync_pixels_larger_than_buffer<FB: FrameBuffer>(fb: Arc<FB>
fb,
statistics_channel().0,
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand All @@ -469,7 +462,6 @@ async fn assert_returns(input: &[u8], expected: &str) {
fb(),
statistics_channel().0,
DEFAULT_NETWORK_BUFFER_SIZE,
page_size::get(),
None,
)
.await
Expand Down

0 comments on commit 9ecb53d

Please sign in to comment.