Skip to content

Commit

Permalink
Refactor delay reset queue (#52)
Browse files Browse the repository at this point in the history
* Refactor delay reset queue

* wip
  • Loading branch information
fafhrd91 authored Feb 10, 2025
1 parent 4e556d0 commit b7a7d74
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 51 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changes

## [1.8.2] - 2025-02-09
## [1.8.4] - 2025-02-10

* Refactor delay reset queue

## [1.8.3] - 2025-02-09

* Re-calculate delay reset queue

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "1.8.3"
version = "1.8.4"
license = "MIT OR Apache-2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "An HTTP/2 client and server"
Expand Down
99 changes: 54 additions & 45 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ntex_bytes::{ByteString, Bytes};
use ntex_http::{HeaderMap, Method};
use ntex_io::IoRef;
use ntex_util::time::{self, now, sleep};
use ntex_util::{channel::pool, future::Either, spawn, HashMap};
use ntex_util::{channel::pool, future::Either, spawn, HashMap, HashSet};

use crate::config::{Config, ConfigInner};
use crate::error::{ConnectionError, OperationError, StreamError, StreamErrorInner};
Expand Down Expand Up @@ -107,7 +107,7 @@ impl Connection {
streams_count: Cell::new(0),
pings_count: Cell::new(0),
readiness: RefCell::new(VecDeque::new()),
next_stream_id: Cell::new(StreamId::new(1)),
next_stream_id: Cell::new(StreamId::CLIENT),
local_config: config,
local_max_concurrent_streams: Cell::new(None),
local_pending_reset: Default::default(),
Expand Down Expand Up @@ -471,10 +471,6 @@ impl RecvHalfConnection {
// if client and no stream, then it was closed
self.encode(frame::Reset::new(id, frame::Reason::STREAM_CLOSED));
Ok(None)
} else if id < self.0.next_stream_id.get() {
Err(Either::Left(ConnectionError::InvalidStreamId(
"Received headers",
)))
} else {
// refuse stream if connection is preparing for disconnect
if self
Expand Down Expand Up @@ -525,7 +521,6 @@ impl RecvHalfConnection {
Err(Either::Left(ConnectionError::UnexpectedPseudo("scheme")))
} else {
let stream = StreamRef::new(id, true, Connection(self.0.clone()));
self.0.next_stream_id.set(id);
self.0.streams_count.set(self.0.streams_count.get() + 1);
self.0.streams.borrow_mut().insert(id, stream.clone());
self.0
Expand Down Expand Up @@ -714,17 +709,20 @@ impl RecvHalfConnection {
) -> Result<(), Either<ConnectionError, StreamErrorInner>> {
log::trace!("{}: processing incoming {:#?}", self.tag(), frm);

if frm.stream_id().is_zero() {
Err(Either::Left(ConnectionError::UnknownStream("RST_STREAM")))
} else if let Some(stream) = self.query(frm.stream_id()) {
let id = frm.stream_id();
if id.is_zero() {
Err(Either::Left(ConnectionError::UnknownStream(
"RST_STREAM-zero",
)))
} else if let Some(stream) = self.query(id) {
stream.recv_rst_stream(&frm);
self.update_rst_count()?;

Err(Either::Right(StreamErrorInner::new(
stream,
StreamError::Reset(frm.reason()),
)))
} else if self.0.local_pending_reset.is_pending(frm.stream_id()) {
} else if self.0.local_pending_reset.is_pending(id) {
self.update_rst_count()
} else {
self.update_rst_count()?;
Expand Down Expand Up @@ -868,8 +866,8 @@ async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
}
}

const BLOCKS: usize = 6;
const LAST_BLOCK: usize = 5;
const BLOCKS: usize = 5;
const LAST_BLOCK: usize = 4;

#[cfg(not(test))]
const SECS: u64 = 30;
Expand All @@ -881,58 +879,69 @@ const ALL_BLOCKS: Duration = Duration::from_secs((BLOCKS as u64) * SECS);

#[derive(Default)]
struct Pending {
blocks: RefCell<[Option<(StreamId, Instant)>; BLOCKS]>,
idx: Cell<u8>,
blocks: RefCell<[Block; BLOCKS]>,
}

#[derive(Debug)]
struct Block {
start_time: Instant,
ids: HashSet<StreamId>,
}

impl Pending {
fn add(&self, id: StreamId) {
let cur = now();
let idx = self.idx.get() as usize;
let mut blocks = self.blocks.borrow_mut();

if let Some(item) = &blocks[0] {
// check if we need to insert new block
if item.1 < (cur - BLOCK_SIZE) {
// shift blocks
let mut i = LAST_BLOCK - 1;
loop {
blocks[i + 1] = blocks[i];
if i == 0 {
break;
}
i -= 1;
}
// insert new item
blocks[0] = Some((id, cur));
}
} else {
// check if we need to insert new block
if blocks[idx].start_time < (cur - BLOCK_SIZE) {
// shift blocks
let idx = if idx == 0 { LAST_BLOCK } else { idx - 1 };
// insert new item
blocks[0] = Some((id, cur));
blocks[idx].start_time = cur;
blocks[idx].ids.clear();
blocks[idx].ids.insert(id);
self.idx.set(idx as u8);
} else {
blocks[idx].ids.insert(id);
}
}

fn is_pending(&self, id: StreamId) -> bool {
let mut blocks = self.blocks.borrow_mut();
let mut idx = LAST_BLOCK;
let mut cur = now() - ALL_BLOCKS;
let blocks = self.blocks.borrow_mut();

let max = now() - ALL_BLOCKS;
let mut idx = self.idx.get() as usize;

loop {
if let Some(item) = &blocks[idx] {
if item.1 < cur {
blocks[idx] = None;
} else {
return id >= item.0;
}
} else {
cur += BLOCK_SIZE;
let item = &blocks[idx];
if item.start_time < max {
break;
} else if item.ids.contains(&id) {
return true;
}
if idx == 0 {
idx += 1;
if idx == BLOCKS {
idx = 0;
} else if idx == self.idx.get() as usize {
break;
}
idx -= 1;
}
false
}
}

impl Default for Block {
fn default() -> Self {
Self {
ids: HashSet::default(),
start_time: now() - ALL_BLOCKS,
}
}
}

#[cfg(test)]
mod tests {
use ntex::http::{test::server as test_server, HeaderMap, Method};
Expand Down Expand Up @@ -1027,7 +1036,7 @@ mod tests {
let res = get_reset(io.recv(&codec).await.unwrap().unwrap());
assert_eq!(res.reason(), Reason::STREAM_CLOSED);

sleep(Millis(1100)).await;
sleep(Millis(5100)).await;

// prev closed stream
io.send(pl.into(), &codec).await.unwrap();
Expand Down
4 changes: 0 additions & 4 deletions src/frame/stream_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ impl StreamId {
/// The maximum allowed stream ID.
pub const MAX: StreamId = StreamId(u32::MAX >> 1);

pub(crate) const fn new(src: u32) -> StreamId {
StreamId(src)
}

/// Parse the stream ID
#[inline]
pub fn parse(buf: &[u8]) -> (StreamId, bool) {
Expand Down

0 comments on commit b7a7d74

Please sign in to comment.