diff --git a/breakwater-parser/benches/parsing.rs b/breakwater-parser/benches/parsing.rs index 22153a7..d3f7cb4 100644 --- a/breakwater-parser/benches/parsing.rs +++ b/breakwater-parser/benches/parsing.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; +use std::sync::mpsc::channel; -use breakwater_core::{framebuffer::FrameBuffer, test_helpers::DevNullTcpStream}; +use breakwater_core::{framebuffer::FrameBuffer}; #[cfg(target_arch = "x86_64")] use breakwater_parser::assembler::AssemblerParser; use breakwater_parser::{ @@ -82,6 +83,7 @@ fn invoke_benchmark( c_group.bench_with_input(parse_name, &commands, |b, input| { b.to_async(tokio::runtime::Runtime::new().expect("Failed to start tokio runtime")) .iter(|| async { + let (message_sender, _) = channel(); let mut parser = match parse_name { "original" => { ParserImplementation::Original(OriginalParser::new(fb.clone())) @@ -96,8 +98,7 @@ fn invoke_benchmark( }; parser - .parse(input, DevNullTcpStream::default()) - .await + .parse(input, &message_sender) .expect("Failed to parse commands"); }); }); diff --git a/breakwater-parser/src/assembler.rs b/breakwater-parser/src/assembler.rs index eb1484c..8379926 100644 --- a/breakwater-parser/src/assembler.rs +++ b/breakwater-parser/src/assembler.rs @@ -1,6 +1,5 @@ use std::arch::asm; - -use tokio::io::AsyncWriteExt; +use std::sync::mpsc::Sender; use crate::{Parser, ParserError}; @@ -10,10 +9,10 @@ const PARSER_LOOKAHEAD: usize = "PX 1234 1234 rrggbbaa\n".len(); // Longest poss pub struct AssemblerParser {} impl Parser for AssemblerParser { - async fn parse( + fn parse( &mut self, buffer: &[u8], - _stream: impl AsyncWriteExt + Send + Unpin, + _message_sender: &Sender>, ) -> Result { let mut last_byte_parsed = 0; diff --git a/breakwater-parser/src/lib.rs b/breakwater-parser/src/lib.rs index 797def6..f7c8cb7 100644 --- a/breakwater-parser/src/lib.rs +++ b/breakwater-parser/src/lib.rs @@ -1,9 +1,9 @@ // Needed for simple implementation #![feature(portable_simd)] +use std::sync::mpsc::Sender; use enum_dispatch::enum_dispatch; use snafu::Snafu; -use tokio::io::AsyncWriteExt; #[cfg(target_arch = "x86_64")] pub mod assembler; @@ -14,17 +14,17 @@ pub mod refactored; #[derive(Debug, Snafu)] pub enum ParserError { #[snafu(display("Failed to write to TCP socket"))] - WriteToTcpSocket { source: std::io::Error }, + WriteToTcpSocket { source: std::sync::mpsc::SendError> }, } #[enum_dispatch(ParserImplementation)] // According to https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html #[trait_variant::make(SendParser: Send)] pub trait Parser { - async fn parse( + fn parse( &mut self, buffer: &[u8], - stream: impl AsyncWriteExt + Send + Unpin, + message_sender: &Sender>, ) -> Result; // Sadly this cant be const (yet?) (https://github.com/rust-lang/rust/issues/71971 and https://github.com/rust-lang/rfcs/pull/2632) diff --git a/breakwater-parser/src/memchr.rs b/breakwater-parser/src/memchr.rs index 8cb8a7c..b9f3823 100644 --- a/breakwater-parser/src/memchr.rs +++ b/breakwater-parser/src/memchr.rs @@ -1,7 +1,7 @@ use std::sync::Arc; +use std::sync::mpsc::Sender; use breakwater_core::framebuffer::FrameBuffer; -use tokio::io::AsyncWriteExt; use crate::{Parser, ParserError}; @@ -16,10 +16,10 @@ impl MemchrParser { } impl Parser for MemchrParser { - async fn parse( + fn parse( &mut self, buffer: &[u8], - _stream: impl AsyncWriteExt + Send + Unpin, + _message_sender: &Sender>, ) -> Result { let mut last_char_after_newline = 0; for newline in memchr::memchr_iter(b'\n', buffer) { diff --git a/breakwater-parser/src/original.rs b/breakwater-parser/src/original.rs index 993043c..bd3b667 100644 --- a/breakwater-parser/src/original.rs +++ b/breakwater-parser/src/original.rs @@ -2,9 +2,9 @@ use std::{ simd::{num::SimdUint, u32x8, Simd}, sync::Arc, }; +use std::sync::mpsc::Sender; use breakwater_core::{framebuffer::FrameBuffer, HELP_TEXT}; -use tokio::io::AsyncWriteExt; use crate::{Parser, ParserError}; @@ -32,10 +32,10 @@ impl OriginalParser { } impl Parser for OriginalParser { - async fn parse( + fn parse( &mut self, buffer: &[u8], - mut stream: impl AsyncWriteExt + Send + Unpin, + message_sender: &Sender>, ) -> Result { let mut last_byte_parsed = 0; @@ -130,22 +130,15 @@ impl Parser for OriginalParser { last_byte_parsed = i; i += 1; if let Some(rgb) = self.fb.get(x, y) { - match stream - .write_all( - format!( - "PX {} {} {:06x}\n", - // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted - x - self.connection_x_offset, - y - self.connection_y_offset, - rgb.to_be() >> 8 - ) - .as_bytes(), - ) - .await - { - Ok(_) => (), - Err(_) => continue, - } + message_sender.send( + format!( + "PX {} {} {:06x}\n", + // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted + x - self.connection_x_offset, + y - self.connection_y_offset, + rgb.to_be() >> 8 + ).into_boxed_str().into_boxed_bytes() + ).expect("Failed to write message"); } continue; } @@ -166,22 +159,18 @@ impl Parser for OriginalParser { i += 4; last_byte_parsed = i - 1; - stream - .write_all( + message_sender.send( format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()) - .as_bytes(), + .into_boxed_str().into_boxed_bytes() ) - .await - .expect("Failed to write bytes to tcp socket"); + .expect("Failed to write message"); continue; } else if current_command & 0xffff_ffff == HELP_PATTERN { i += 4; last_byte_parsed = i - 1; - stream - .write_all(HELP_TEXT) - .await - .expect("Failed to write bytes to tcp socket"); + message_sender.send(HELP_TEXT.into()) + .expect("Failed to write message"); continue; } diff --git a/breakwater-parser/src/refactored.rs b/breakwater-parser/src/refactored.rs index ced033c..267258c 100644 --- a/breakwater-parser/src/refactored.rs +++ b/breakwater-parser/src/refactored.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use breakwater_core::{framebuffer::FrameBuffer, HELP_TEXT}; use snafu::ResultExt; -use tokio::io::AsyncWriteExt; +use std::sync::mpsc::Sender; use crate::{ original::{ @@ -29,11 +29,11 @@ impl RefactoredParser { } #[inline(always)] - async fn handle_pixel( + fn handle_pixel( &self, buffer: &[u8], mut idx: usize, - stream: &mut (impl AsyncWriteExt + Send + Unpin), + message_sender: &Sender>, ) -> Result<(usize, usize), ParserError> { let previous = idx; idx += 3; @@ -75,7 +75,7 @@ impl RefactoredParser { // End of command to read Pixel value else if unsafe { *buffer.get_unchecked(idx) } == b'\n' { idx += 1; - self.handle_get_pixel(stream, x, y).await?; + self.handle_get_pixel(message_sender, x, y)?; Ok((idx, idx)) } else { Ok((idx, previous)) @@ -97,27 +97,25 @@ impl RefactoredParser { } #[inline(always)] - async fn handle_size( + fn handle_size( &self, - stream: &mut (impl AsyncWriteExt + Send + Unpin), + message_sender: &Sender>, ) -> Result<(), ParserError> { - stream - .write_all( - format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).as_bytes(), + message_sender + .send( + format!("SIZE {} {}\n", self.fb.get_width(), self.fb.get_height()).into_boxed_str().into_boxed_bytes(), ) - .await .context(crate::WriteToTcpSocketSnafu)?; Ok(()) } #[inline(always)] - async fn handle_help( + fn handle_help( &self, - stream: &mut (impl AsyncWriteExt + Send + Unpin), + message_sender: &Sender>, ) -> Result<(), ParserError> { - stream - .write_all(HELP_TEXT) - .await + message_sender + .send(HELP_TEXT.into()) .context(crate::WriteToTcpSocketSnafu)?; Ok(()) } @@ -173,15 +171,15 @@ impl RefactoredParser { } #[inline(always)] - async fn handle_get_pixel( + fn handle_get_pixel( &self, - stream: &mut (impl AsyncWriteExt + Send + Unpin), + message_sender: &Sender>, x: usize, y: usize, ) -> Result<(), ParserError> { if let Some(rgb) = self.fb.get(x, y) { - stream - .write_all( + message_sender + .send( format!( "PX {} {} {:06x}\n", // We don't want to return the actual (absolute) coordinates, the client should also get the result offseted @@ -189,9 +187,8 @@ impl RefactoredParser { y - self.connection_y_offset, rgb.to_be() >> 8 ) - .as_bytes(), + .into_boxed_str().into_boxed_bytes(), ) - .await .context(crate::WriteToTcpSocketSnafu)?; } Ok(()) @@ -199,10 +196,10 @@ impl RefactoredParser { } impl Parser for RefactoredParser { - async fn parse( + fn parse( &mut self, buffer: &[u8], - mut stream: impl AsyncWriteExt + Send + Unpin, + message_sender: &Sender>, ) -> Result { let mut last_byte_parsed = 0; @@ -213,7 +210,7 @@ impl Parser for RefactoredParser { let current_command = unsafe { (buffer.as_ptr().add(i) as *const u64).read_unaligned() }; if current_command & 0x00ff_ffff == PX_PATTERN { - (i, last_byte_parsed) = self.handle_pixel(buffer, i, &mut stream).await?; + (i, last_byte_parsed) = self.handle_pixel(buffer, i, message_sender)?; } else if current_command & 0x00ff_ffff_ffff_ffff == OFFSET_PATTERN { i += 7; self.handle_offset(&mut i, buffer); @@ -221,11 +218,11 @@ impl Parser for RefactoredParser { } else if current_command & 0xffff_ffff == SIZE_PATTERN { i += 4; last_byte_parsed = i; - self.handle_size(&mut stream).await?; + self.handle_size(message_sender)?; } else if current_command & 0xffff_ffff == HELP_PATTERN { i += 4; last_byte_parsed = i; - self.handle_help(&mut stream).await?; + self.handle_help(message_sender)?; } else { i += 1; } diff --git a/breakwater/src/server.rs b/breakwater/src/server.rs index c4c3cb1..9f36281 100644 --- a/breakwater/src/server.rs +++ b/breakwater/src/server.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::{cmp::min, net::IpAddr, sync::Arc, time::Duration}; +use std::sync::mpsc::channel; use breakwater_core::framebuffer::FrameBuffer; use breakwater_parser::{original::OriginalParser, Parser, ParserError}; @@ -155,6 +156,8 @@ pub async fn handle_connection( // Instead we bulk the statistics and send them pre-aggregated. let mut last_statistics = Instant::now(); let mut statistics_bytes_read: u64 = 0; + + let (message_sender, message_receiver) = channel::>(); loop { // Fill the buffer up with new data from the socket @@ -204,8 +207,7 @@ pub async fn handle_connection( } let last_byte_parsed = parser - .parse(&buffer[..data_end + parser_lookahead], &mut stream) - .await + .parse(&buffer[..data_end + parser_lookahead], &message_sender) .context(ParsePixelflutCommandsSnafu)?; // IMPORTANT: We have to subtract 1 here, as e.g. we have "PX 0 0\n" data_end is 7 and parser_state.last_byte_parsed is 6. @@ -223,6 +225,10 @@ pub async fn handle_connection( 0, ); } + + while let Ok(message) = message_receiver.try_recv() { + stream.write_all(message.as_ref()).await.expect("Failed to write to tcp stream") + } } }