Skip to content

Commit

Permalink
Fix connection level window size handling (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Feb 8, 2025
1 parent 97f10aa commit 5575d92
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 21 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Changes

## [1.8.2] - 2025-02-08

* Fix connection level window size handling

## [1.8.1] - 2025-01-31

* Fix connection level error check.
* Fix connection level error check

## [1.8.0] - 2025-01-31

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "1.8.1"
version = "1.8.2"
license = "MIT OR Apache-2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "An HTTP/2 client and server"
Expand Down
5 changes: 2 additions & 3 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Client {
// wait for available connection
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
let _ = rx.await?;
rx.await?;
}
Ok(())
}
Expand All @@ -117,10 +117,9 @@ impl Client {
} else if connections[idx].is_disconnecting() {
let con = connections.remove(idx);
let timeout = self.inner.disconnect_timeout;
let f = ntex_util::spawn(async move {
let _ = ntex_util::spawn(async move {
let _ = con.disconnect().disconnect_timeout(timeout).await;
});
drop(f);
} else {
idx += 1;
}
Expand Down
20 changes: 17 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,13 @@ impl Connection {
}
}

/// Consume connection level send capacity (window)
pub(crate) fn consume_send_window(&self, cap: u32) {
self.0.send_window.set(self.0.send_window.get().dec(cap));
}

/// added new capacity, update recevice window size
pub(crate) fn add_capacity(&self, size: u32) {
pub(crate) fn add_recv_capacity(&self, size: u32) {
let mut recv_window = self.0.recv_window.get().dec(size);

// update connection window size
Expand All @@ -242,6 +247,10 @@ impl Connection {
self.0.recv_window.set(recv_window);
}

pub(crate) fn send_window_size(&self) -> WindowSize {
self.0.send_window.get().window_size()
}

pub(crate) fn remote_window_size(&self) -> WindowSize {
self.0.remote_window_sz.get()
}
Expand Down Expand Up @@ -432,7 +441,7 @@ impl RecvHalfConnection {
}

fn query(&self, id: StreamId) -> Option<StreamRef> {
self.0.streams.borrow_mut().get(&id).cloned()
self.0.streams.borrow().get(&id).cloned()
}

fn flags(&self) -> ConnectionFlags {
Expand Down Expand Up @@ -560,7 +569,7 @@ impl RecvHalfConnection {
));
Ok(None)
} else {
Err(Either::Left(ConnectionError::InvalidStreamId(
Err(Either::Left(ConnectionError::UnknownStream(
"Received data",
)))
}
Expand Down Expand Up @@ -681,6 +690,11 @@ impl RecvHalfConnection {
.inc(frm.size_increment())
.map_err(|_| Either::Left(ConnectionError::WindowValueOverflow))?;
self.0.send_window.set(window);

// wake up streams if needed
for stream in self.0.streams.borrow().values() {
stream.recv_window_update_connection();
}
Ok(())
}
} else if let Some(stream) = self.query(frm.stream_id()) {
Expand Down
3 changes: 1 addition & 2 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ where
if let Err(e) = self.inner.publish.poll(cx) {
let inner = self.inner.clone();
let con = self.connection.connection();
let f = ntex_util::spawn(async move {
let _ = ntex_util::spawn(async move {
if inner.control.call_nowait(Control::error(e)).await.is_ok() {
con.close();
}
});
drop(f);
}
self.inner.control.poll(cx).map_err(|_| ())
}
Expand Down
48 changes: 37 additions & 11 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Capacity {

impl Capacity {
fn new(size: u32, stream: &Rc<StreamState>) -> Self {
stream.add_capacity(size);
stream.add_recv_capacity(size);

Self {
size: Cell::new(size),
Expand Down Expand Up @@ -115,6 +115,7 @@ bitflags::bitflags! {
const REMOTE = 0b0000_0001;
const FAILED = 0b0000_0010;
const DISCONNECT_ON_DROP = 0b0000_0100;
const WAIT_FOR_CAPACITY = 0b0000_1000;
}
}

Expand Down Expand Up @@ -183,9 +184,7 @@ impl StreamState {
}

fn set_failed(&self) {
let mut flags = self.flags.get();
flags.insert(StreamFlags::FAILED);
self.flags.set(flags);
self.insert_flag(StreamFlags::FAILED);
self.send_cap.wake();
self.send_reset.wake();
}
Expand Down Expand Up @@ -216,6 +215,18 @@ impl StreamState {
self.review_state();
}

fn insert_flag(&self, f: StreamFlags) {
let mut flags = self.flags.get();
flags.insert(f);
self.flags.set(flags);
}

fn remove_flag(&self, f: StreamFlags) {
let mut flags = self.flags.get();
flags.remove(f);
self.flags.set(flags);
}

fn check_error(&self) -> Result<(), OperationError> {
if let Some(err) = self.error.take() {
self.error.set(Some(err.clone()));
Expand Down Expand Up @@ -250,7 +261,7 @@ impl StreamState {
}

/// added new capacity, update recevice window size
fn add_capacity(&self, size: u32) {
fn add_recv_capacity(&self, size: u32) {
let cap = self.recv_size.get();
self.recv_size.set(cap + size);
self.recv_window.set(self.recv_window.get().dec(size));
Expand All @@ -263,7 +274,7 @@ impl StreamState {
);

// connection level recv window
self.con.add_capacity(size);
self.con.add_recv_capacity(size);
}

/// check and update recevice window size
Expand Down Expand Up @@ -361,9 +372,7 @@ impl StreamRef {
}

pub(crate) fn disconnect_on_drop(&self) {
let mut flags = self.0.flags.get();
flags.insert(StreamFlags::DISCONNECT_ON_DROP);
self.0.flags.set(flags);
self.0.insert_flag(StreamFlags::DISCONNECT_ON_DROP);
}

pub(crate) fn is_disconnect_on_drop(&self) -> bool {
Expand Down Expand Up @@ -524,6 +533,14 @@ impl StreamRef {
self.0.remote_reset_stream(frm.reason())
}

pub(crate) fn recv_window_update_connection(&self) {
if self.0.flags.get().contains(StreamFlags::WAIT_FOR_CAPACITY)
&& self.0.send_window.get().window_size() > 0
{
self.0.send_cap.wake();
}
}

pub(crate) fn recv_window_update(&self, frm: WindowUpdate) -> Result<(), StreamError> {
if frm.size_increment() == 0 {
Err(StreamError::WindowZeroUpdateValue)
Expand Down Expand Up @@ -666,6 +683,10 @@ impl StreamRef {
self.0
.send_window
.set(self.0.send_window.get().dec(size as u32));

// update connection send window
self.0.con.consume_send_window(size as u32);

// write to io buffer
self.0.con.encode(data);
if res.is_empty() {
Expand Down Expand Up @@ -700,7 +721,10 @@ impl StreamRef {
}

pub fn available_send_capacity(&self) -> WindowSize {
self.0.send_window.get().window_size()
cmp::min(
self.0.send_window.get().window_size(),
self.0.con.send_window_size(),
)
}

pub async fn send_capacity(&self) -> Result<WindowSize, OperationError> {
Expand All @@ -712,10 +736,12 @@ impl StreamRef {
self.0.check_error()?;
self.0.con.check_error()?;

let win = self.0.send_window.get().window_size();
let win = self.available_send_capacity();
if win > 0 {
self.0.remove_flag(StreamFlags::WAIT_FOR_CAPACITY);
Poll::Ready(Ok(win))
} else {
self.0.insert_flag(StreamFlags::WAIT_FOR_CAPACITY);
self.0.send_cap.register(cx.waker());
Poll::Pending
}
Expand Down

0 comments on commit 5575d92

Please sign in to comment.