Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling for delay reset queue #50

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [1.8.2] - 2025-02-08

* Better handling for delay reset queue
* Fix connection level window size handling

## [1.8.1] - 2025-01-31
Expand Down
12 changes: 4 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

/// Http2 connection configuration
pub(crate) struct ConfigInner {
pub(crate) settings: Cell<Settings>,
/// Initial window size of locally initiated streams
pub(crate) window_sz: Cell<WindowSize>,
pub(crate) window_sz_threshold: Cell<WindowSize>,
/// How long a locally reset stream should ignore frames
pub(crate) reset_duration: Cell<Duration>,
/// Maximum number of locally reset streams to keep at a time
pub(crate) reset_max: Cell<usize>,
pub(crate) settings: Cell<Settings>,
/// Initial window size for new connections.
pub(crate) connection_window_sz: Cell<WindowSize>,
pub(crate) connection_window_sz_threshold: Cell<WindowSize>,
Expand Down Expand Up @@ -91,7 +89,6 @@
connection_window_sz_threshold,
dispatcher_config,
settings: Cell::new(settings),
reset_max: Cell::new(consts::DEFAULT_RESET_STREAM_MAX),
reset_duration: Cell::new(consts::DEFAULT_RESET_STREAM_SECS.into()),
remote_max_concurrent_streams: Cell::new(None),
max_header_continuations: Cell::new(consts::DEFAULT_MAX_COUNTINUATIONS),
Expand Down Expand Up @@ -236,8 +233,9 @@
/// error, forcing the connection to terminate.
///
/// The default value is 30.
pub fn max_concurrent_reset_streams(&self, max: usize) -> &Self {
self.0.reset_max.set(max);
#[doc(hidden)]
#[deprecated]
pub fn max_concurrent_reset_streams(&self, _: usize) -> &Self {

Check warning on line 238 in src/config.rs

View check run for this annotation

Codecov / codecov/patch

src/config.rs#L238

Added line #L238 was not covered by tests
self
}

Expand Down Expand Up @@ -357,7 +355,6 @@
.field("window_sz", &self.0.window_sz.get())
.field("window_sz_threshold", &self.0.window_sz_threshold.get())
.field("reset_duration", &self.0.reset_duration.get())
.field("reset_max", &self.0.reset_max.get())
.field("connection_window_sz", &self.0.connection_window_sz.get())
.field(
"connection_window_sz_threshold",
Expand All @@ -378,7 +375,6 @@
.field("window_sz", &self.window_sz.get())
.field("window_sz_threshold", &self.window_sz_threshold.get())
.field("reset_duration", &self.reset_duration.get())
.field("reset_max", &self.reset_max.get())
.field("connection_window_sz", &self.connection_window_sz.get())
.field(
"connection_window_sz_threshold",
Expand Down
123 changes: 62 additions & 61 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{cell::Cell, cell::RefCell, fmt, mem, rc::Rc};
use std::{collections::VecDeque, time::Instant};
use std::{collections::VecDeque, time::Duration, time::Instant};

use ntex_bytes::{ByteString, Bytes};
use ntex_http::{HeaderMap, Method};
use ntex_io::IoRef;
use ntex_util::time::{self, now, sleep};
use ntex_util::{channel::pool, future::Either, spawn, HashMap, HashSet};
use ntex_util::{channel::pool, future::Either, spawn, HashMap};

use crate::config::{Config, ConfigInner};
use crate::error::{ConnectionError, OperationError, StreamError, StreamErrorInner};
Expand All @@ -22,7 +22,6 @@
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct ConnectionFlags: u8 {
const SETTINGS_PROCESSED = 0b0000_0001;
const DELAY_DROP_TASK_STARTED = 0b0000_0010;
const DISCONNECT_WHEN_READY = 0b0000_1000;
const SECURE = 0b0001_0000;
const STREAM_REFUSED = 0b0010_0000;
Expand Down Expand Up @@ -55,8 +54,7 @@
// Max frame size
remote_frame_size: Cell<u32>,
// Locally reset streams
local_reset_queue: RefCell<VecDeque<(StreamId, Instant)>>,
local_reset_ids: RefCell<HashSet<StreamId>>,
local_pending_reset: Pending,
// protocol level error
error: Cell<Option<OperationError>>,
// connection state flags
Expand Down Expand Up @@ -112,8 +110,7 @@
next_stream_id: Cell::new(StreamId::new(1)),
local_config: config,
local_max_concurrent_streams: Cell::new(None),
local_reset_ids: RefCell::new(HashSet::default()),
local_reset_queue: RefCell::new(VecDeque::new()),
local_pending_reset: Default::default(),
remote_window_sz: Cell::new(frame::DEFAULT_INITIAL_WINDOW_SIZE),
error: Cell::new(None),
flags: Cell::new(if secure {
Expand Down Expand Up @@ -410,20 +407,8 @@
return;
}

let mut ids = self.0.local_reset_ids.borrow_mut();
let mut queue = self.0.local_reset_queue.borrow_mut();

// check queue size
if queue.len() >= self.0.local_config.0.reset_max.get() {
if let Some((id, _)) = queue.pop_front() {
ids.remove(&id);
}
}
ids.insert(id);
queue.push_back((id, now() + self.0.local_config.0.reset_duration.get()));
if !flags.contains(ConnectionFlags::DELAY_DROP_TASK_STARTED) {
let _ = spawn(delay_drop_task(self.clone()));
}
// Add ids to pending queue
self.0.local_pending_reset.add(id);
}

pub(crate) fn recv_half(&self) -> RecvHalfConnection {
Expand Down Expand Up @@ -482,7 +467,8 @@
Ok(item) => Ok(item.map(move |msg| (stream, msg))),
Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))),
}
} else if self.0.local_reset_ids.borrow().contains(&id) {
} else if !self.0.local_config.is_server() && self.0.local_pending_reset.is_pending(id) {
// if client and no stream, then it was closed
self.encode(frame::Reset::new(id, frame::Reason::STREAM_CLOSED));
Ok(None)
} else if id < self.0.next_stream_id.get() {
Expand Down Expand Up @@ -562,7 +548,7 @@
Ok(item) => Ok(item.map(move |msg| (stream, msg))),
Err(kind) => Err(Either::Right(StreamErrorInner::new(stream, kind))),
}
} else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) {
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
self.encode(frame::Reset::new(
frm.stream_id(),
frame::Reason::STREAM_CLOSED,
Expand Down Expand Up @@ -701,7 +687,7 @@
stream
.recv_window_update(frm)
.map_err(|kind| Either::Right(StreamErrorInner::new(stream, kind)))
} else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) {
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
Ok(())
} else {
log::trace!("Unknown WINDOW_UPDATE {:?}", frm);
Expand Down Expand Up @@ -738,7 +724,7 @@
stream,
StreamError::Reset(frm.reason()),
)))
} else if self.0.local_reset_ids.borrow().contains(&frm.stream_id()) {
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
self.update_rst_count()
} else {
self.update_rst_count()?;
Expand Down Expand Up @@ -851,42 +837,6 @@
}
}

async fn delay_drop_task(state: Connection) {
state.set_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED);

loop {
let next = if let Some(item) = state.0.local_reset_queue.borrow().front() {
item.1 - now()
} else {
break;
};
sleep(next).await;

if state.is_closed() {
return;
}

let now = now();
let mut ids = state.0.local_reset_ids.borrow_mut();
let mut queue = state.0.local_reset_queue.borrow_mut();
loop {
if let Some(item) = queue.front() {
if item.1 <= now {
log::trace!("{}: dropping {:?} after delay", state.tag(), item.0);
ids.remove(&item.0);
queue.pop_front();
} else {
break;
}
} else {
state.unset_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED);
return;
}
}
}
state.unset_flags(ConnectionFlags::DELAY_DROP_TASK_STARTED);
}

async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
log::debug!("start http client ping/pong task");

Expand Down Expand Up @@ -917,3 +867,54 @@
st.0.pings_count.set(st.0.pings_count.get() + 1);
}
}

const BLOCKS: usize = 6;
const BLOCK_SIZE: Duration = Duration::from_secs(30);
const LAST_BLOCK: usize = 5;

#[derive(Default)]
struct Pending {
times: RefCell<[Option<(StreamId, Instant)>; BLOCKS]>,
}

impl Pending {
fn add(&self, id: StreamId) {
let cur = now();
let mut times = self.times.borrow_mut();

if let Some(item) = &times[0] {
// check if we need to insert new block
if item.1 < (cur - BLOCK_SIZE) {
// shift blocks
let mut i = LAST_BLOCK - 1;

Check warning on line 889 in src/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/connection.rs#L889

Added line #L889 was not covered by tests
loop {
times[i + 1] = times[i];
if i == 0 {
break;
}
i -= 1;

Check warning on line 895 in src/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/connection.rs#L891-L895

Added lines #L891 - L895 were not covered by tests
}
// insert new item
times[0] = Some((id, cur));

Check warning on line 898 in src/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/connection.rs#L898

Added line #L898 was not covered by tests
}
} else {
// insert new item
times[0] = Some((id, cur));
}
}

fn is_pending(&self, id: StreamId) -> bool {
let times = self.times.borrow();
let mut idx = LAST_BLOCK;
loop {
if let Some(item) = &times[idx] {
return id >= item.0;
} else if idx == 0 {
break;
} else {
idx -= 1;
}
}
false
}
}
1 change: 0 additions & 1 deletion src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::frame::WindowSize;

// Constants
pub(crate) const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub(crate) const DEFAULT_RESET_STREAM_MAX: usize = 30;
pub(crate) const DEFAULT_RESET_STREAM_SECS: Seconds = Seconds(10);
pub(crate) const DEFAULT_CONNECTION_WINDOW_SIZE: WindowSize = 1_048_576;
pub(crate) const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 48 * 1024;
Expand Down
Loading