diff --git a/CHANGELOG.md b/CHANGELOG.md index 59acdce..fbd0cbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [1.8.2] - 2025-02-08 +* Better handling for delay reset queue * Fix connection level window size handling ## [1.8.1] - 2025-01-31 diff --git a/src/config.rs b/src/config.rs index 76f587b..5d50410 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,14 +19,12 @@ pub struct Config(pub(crate) Rc); /// Http2 connection configuration pub(crate) struct ConfigInner { + pub(crate) settings: Cell, /// Initial window size of locally initiated streams pub(crate) window_sz: Cell, pub(crate) window_sz_threshold: Cell, /// How long a locally reset stream should ignore frames pub(crate) reset_duration: Cell, - /// Maximum number of locally reset streams to keep at a time - pub(crate) reset_max: Cell, - pub(crate) settings: Cell, /// Initial window size for new connections. pub(crate) connection_window_sz: Cell, pub(crate) connection_window_sz_threshold: Cell, @@ -91,7 +89,6 @@ impl Config { connection_window_sz_threshold, dispatcher_config, settings: Cell::new(settings), - reset_max: Cell::new(consts::DEFAULT_RESET_STREAM_MAX), reset_duration: Cell::new(consts::DEFAULT_RESET_STREAM_SECS.into()), remote_max_concurrent_streams: Cell::new(None), max_header_continuations: Cell::new(consts::DEFAULT_MAX_COUNTINUATIONS), @@ -236,8 +233,9 @@ impl Config { /// error, forcing the connection to terminate. /// /// The default value is 30. - pub fn max_concurrent_reset_streams(&self, max: usize) -> &Self { - self.0.reset_max.set(max); + #[doc(hidden)] + #[deprecated] + pub fn max_concurrent_reset_streams(&self, _: usize) -> &Self { self } @@ -357,7 +355,6 @@ impl fmt::Debug for Config { .field("window_sz", &self.0.window_sz.get()) .field("window_sz_threshold", &self.0.window_sz_threshold.get()) .field("reset_duration", &self.0.reset_duration.get()) - .field("reset_max", &self.0.reset_max.get()) .field("connection_window_sz", &self.0.connection_window_sz.get()) .field( "connection_window_sz_threshold", @@ -378,7 +375,6 @@ impl fmt::Debug for ConfigInner { .field("window_sz", &self.window_sz.get()) .field("window_sz_threshold", &self.window_sz_threshold.get()) .field("reset_duration", &self.reset_duration.get()) - .field("reset_max", &self.reset_max.get()) .field("connection_window_sz", &self.connection_window_sz.get()) .field( "connection_window_sz_threshold", diff --git a/src/connection.rs b/src/connection.rs index 9ccfd0a..87e49d1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,11 +1,11 @@ use std::{cell::Cell, cell::RefCell, fmt, mem, rc::Rc}; -use std::{collections::VecDeque, time::Instant}; +use std::{collections::VecDeque, time::Duration, time::Instant}; use ntex_bytes::{ByteString, Bytes}; use ntex_http::{HeaderMap, Method}; use ntex_io::IoRef; use ntex_util::time::{self, now, sleep}; -use ntex_util::{channel::pool, future::Either, spawn, HashMap, HashSet}; +use ntex_util::{channel::pool, future::Either, spawn, HashMap}; use crate::config::{Config, ConfigInner}; use crate::error::{ConnectionError, OperationError, StreamError, StreamErrorInner}; @@ -22,7 +22,6 @@ bitflags::bitflags! { #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct ConnectionFlags: u8 { const SETTINGS_PROCESSED = 0b0000_0001; - const DELAY_DROP_TASK_STARTED = 0b0000_0010; const DISCONNECT_WHEN_READY = 0b0000_1000; const SECURE = 0b0001_0000; const STREAM_REFUSED = 0b0010_0000; @@ -55,8 +54,7 @@ struct ConnectionState { // Max frame size remote_frame_size: Cell, // Locally reset streams - local_reset_queue: RefCell>, - local_reset_ids: RefCell>, + local_pending_reset: Pending, // protocol level error error: Cell>, // connection state flags @@ -112,8 +110,7 @@ impl Connection { next_stream_id: Cell::new(StreamId::new(1)), local_config: config, local_max_concurrent_streams: Cell::new(None), - local_reset_ids: RefCell::new(HashSet::default()), - local_reset_queue: RefCell::new(VecDeque::new()), + local_pending_reset: Default::default(), remote_window_sz: Cell::new(frame::DEFAULT_INITIAL_WINDOW_SIZE), error: Cell::new(None), flags: Cell::new(if secure { @@ -410,20 +407,8 @@ impl Connection { return; } - let mut ids = self.0.local_reset_ids.borrow_mut(); - let mut queue = self.0.local_reset_queue.borrow_mut(); - - // check queue size - if queue.len() >= self.0.local_config.0.reset_max.get() { - if let Some((id, _)) = queue.pop_front() { - ids.remove(&id); - } - } - ids.insert(id); - queue.push_back((id, now() + self.0.local_config.0.reset_duration.get())); - if !flags.contains(ConnectionFlags::DELAY_DROP_TASK_STARTED) { - let _ = spawn(delay_drop_task(self.clone())); - } + // Add ids to pending queue + self.0.local_pending_reset.add(id); } pub(crate) fn recv_half(&self) -> RecvHalfConnection { @@ -482,7 +467,8 @@ impl RecvHalfConnection { Ok(item) => Ok(item.map(move |msg| (stream, msg))), Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))), } - } else if self.0.local_reset_ids.borrow().contains(&id) { + } else if !self.0.local_config.is_server() && self.0.local_pending_reset.is_pending(id) { + // if client and no stream, then it was closed self.encode(frame::Reset::new(id, frame::Reason::STREAM_CLOSED)); Ok(None) } else if id < self.0.next_stream_id.get() { @@ -562,7 +548,7 @@ impl RecvHalfConnection { Ok(item) => Ok(item.map(move |msg| (stream, msg))), Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))), } - } else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) { + } else if self.0.local_pending_reset.is_pending(frm.stream_id()) { self.encode(frame::Reset::new( frm.stream_id(), frame::Reason::STREAM_CLOSED, @@ -701,7 +687,7 @@ impl RecvHalfConnection { stream .recv_window_update(frm) .map_err(|kind| Either::Right(StreamErrorInner::new(stream, kind))) - } else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) { + } else if self.0.local_pending_reset.is_pending(frm.stream_id()) { Ok(()) } else { log::trace!("Unknown WINDOW_UPDATE {:?}", frm); @@ -738,7 +724,7 @@ impl RecvHalfConnection { stream, StreamError::Reset(frm.reason()), ))) - } else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) { + } else if self.0.local_pending_reset.is_pending(frm.stream_id()) { self.update_rst_count() } else { self.update_rst_count()?; @@ -851,42 +837,6 @@ impl fmt::Debug for Connection { } } -async fn delay_drop_task(state: Connection) { - state.set_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED); - - loop { - let next = if let Some(item) = state.0.local_reset_queue.borrow().front() { - item.1 - now() - } else { - break; - }; - sleep(next).await; - - if state.is_closed() { - return; - } - - let now = now(); - let mut ids = state.0.local_reset_ids.borrow_mut(); - let mut queue = state.0.local_reset_queue.borrow_mut(); - loop { - if let Some(item) = queue.front() { - if item.1 <= now { - log::trace!("{}: dropping {:?} after delay", state.tag(), item.0); - ids.remove(&item.0); - queue.pop_front(); - } else { - break; - } - } else { - state.unset_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED); - return; - } - } - } - state.unset_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED); -} - async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) { log::debug!("start http client ping/pong task"); @@ -917,3 +867,54 @@ async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) { st.0.pings_count.set(st.0.pings_count.get() + 1); } } + +const BLOCKS: usize = 6; +const BLOCK_SIZE: Duration = Duration::from_secs(30); +const LAST_BLOCK: usize = 5; + +#[derive(Default)] +struct Pending { + times: RefCell<[Option<(StreamId, Instant)>; BLOCKS]>, +} + +impl Pending { + fn add(&self, id: StreamId) { + let cur = now(); + let mut times = self.times.borrow_mut(); + + if let Some(item) = ×[0] { + // check if we need to insert new block + if item.1 < (cur - BLOCK_SIZE) { + // shift blocks + let mut i = LAST_BLOCK - 1; + loop { + times[i + 1] = times[i]; + if i == 0 { + break; + } + i -= 1; + } + // insert new item + times[0] = Some((id, cur)); + } + } else { + // insert new item + times[0] = Some((id, cur)); + } + } + + fn is_pending(&self, id: StreamId) -> bool { + let times = self.times.borrow(); + let mut idx = LAST_BLOCK; + loop { + if let Some(item) = ×[idx] { + return id >= item.0; + } else if idx == 0 { + break; + } else { + idx -= 1; + } + } + false + } +} diff --git a/src/consts.rs b/src/consts.rs index aaa3377..772859c 100644 --- a/src/consts.rs +++ b/src/consts.rs @@ -5,7 +5,6 @@ use crate::frame::WindowSize; // Constants pub(crate) const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; -pub(crate) const DEFAULT_RESET_STREAM_MAX: usize = 30; pub(crate) const DEFAULT_RESET_STREAM_SECS: Seconds = Seconds(10); pub(crate) const DEFAULT_CONNECTION_WINDOW_SIZE: WindowSize = 1_048_576; pub(crate) const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 48 * 1024;