Skip to content

Commit

Permalink
add read timeout for inbound connection
Browse files Browse the repository at this point in the history
  • Loading branch information
neevek committed Apr 5, 2024
1 parent f9b9bdc commit a9dfe9a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "omnip"
version = "0.4.18"
version = "0.4.19"
edition = "2021"

[lib]
Expand Down
32 changes: 18 additions & 14 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::net::SocketAddr;
use std::path::Path;
use std::str::{self, FromStr};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -384,7 +384,7 @@ impl Server {
.await
.map_err(|e| match e {
ProxyError::BadRequest | ProxyError::BadGateway(_) => {
error!("{:?}", e);
error!("err: {e:?}");
}
_ => {}
})
Expand All @@ -395,7 +395,8 @@ impl Server {
}

Err(e) => {
error!("access server failed, err: {e}");
error!("access server will wait due to err: {e}");
tokio::time::sleep(Duration::from_secs(5)).await
}
}
}
Expand Down Expand Up @@ -459,11 +460,11 @@ impl Server {
let mut buffer = [0u8; 512];
let mut proxy_handler = None;
loop {
let nread = inbound_stream
.read(&mut buffer)
.await
.context("failed to read from inbound_stream")
.map_err(|_| ProxyError::BadRequest)?;
let nread =
tokio::time::timeout(Duration::from_secs(2), inbound_stream.read(&mut buffer))
.await
.map_err(|_| ProxyError::BadRequest)?
.map_err(|_| ProxyError::BadRequest)?;

if proxy_handler.is_none() {
proxy_handler = Some(Self::create_proxy_handler(
Expand Down Expand Up @@ -545,7 +546,10 @@ impl Server {
}

if outbound_type == OutboundType::Direct {
debug!("serve request directly: {addr}");
debug!(
"serve request directly: {addr} from {}",
inbound_stream.peer_addr().unwrap()
);
outbound_stream = match &addr.host {
Host::Domain(domain) => {
let resolver = if addr.is_internal_domain() {
Expand Down Expand Up @@ -628,14 +632,14 @@ impl Server {
match dashboard_addr {
Some(addr) => {
debug!("dashboard request: {}", inbound_stream.peer_addr().unwrap());
Ok(Self::create_tcp_stream(addr.clone()).await)
Ok(Self::create_tcp_stream(addr).await)
}
None => {
log::warn!(
"request routing to the proxy server itself is rejected: {}",
inbound_stream.peer_addr().unwrap()
);
return Err(ProxyError::BadRequest);
Err(ProxyError::BadRequest)
}
}
}
Expand Down Expand Up @@ -671,9 +675,9 @@ impl Server {
let result = match tokio::io::copy_bidirectional(&mut a_stream, &mut b_stream).await {
Ok((tx_bytes, rx_bytes)) => {
debug!(
"transfer, out:{tx_bytes}, in:{rx_bytes}, {:?} <-> {:?}",
a_stream.local_addr(),
b_stream.local_addr()
"transfer, out:{tx_bytes}, in:{rx_bytes}, {} <-> {:?}",
a_stream.local_addr().unwrap(),
b_stream.local_addr().unwrap(),
);

stats_sender
Expand Down

0 comments on commit a9dfe9a

Please sign in to comment.