Skip to content

Commit

Permalink
feat(keepalive): add keepalive options
Browse files Browse the repository at this point in the history
  • Loading branch information
fu050409 committed May 23, 2024
1 parent 874c418 commit 7ad83ac
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
18 changes: 7 additions & 11 deletions src/models/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use std::{collections::VecDeque, sync::Arc};

use anyhow::{Error, Result};
use tokio::{net::TcpStream, sync::Mutex, task::JoinHandle};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use tokio::{net::TcpStream, sync::Mutex, task::JoinHandle};

use crate::exceptions::Exception;
#[cfg(feature = "python")]
Expand Down Expand Up @@ -124,7 +124,6 @@ pub struct Client {
pub entrance: String,
pub path: OblivionPath,
pub session: Arc<Session>,
pub responses: Arc<Mutex<VecDeque<Response>>>,
}

impl Client {
Expand All @@ -137,6 +136,8 @@ impl Client {
{
Ok(tcp) => {
tcp.set_ttl(20)?;
tcp.set_nodelay(true)?;
socket2::SockRef::from(&tcp).set_keepalive(true)?;
tcp
}
Err(_) => return Err(Error::from(Exception::ConnectionRefusedError)),
Expand All @@ -150,7 +151,6 @@ impl Client {
entrance: entrance.to_string(),
path,
session: Arc::new(session),
responses: Arc::new(Mutex::new(VecDeque::new())),
})
}

Expand All @@ -166,9 +166,11 @@ impl Client {
Ok(self.session.recv().await?)
}

pub async fn listen(&self) -> Result<JoinHandle<Result<()>>> {
pub async fn listen(
&self,
responses: Arc<Mutex<VecDeque<Response>>>,
) -> Result<JoinHandle<Result<()>>> {
let session = Arc::clone(&self.session);
let responses = Arc::clone(&self.responses);
Ok(tokio::spawn(async move {
loop {
let mut wres = responses.lock().await;
Expand All @@ -195,12 +197,6 @@ impl Client {
}))
}

pub async fn pop(&self) -> Option<Response> {
let responses = Arc::clone(&self.responses);
let mut wres = responses.lock().await;
wres.pop_front()
}

pub async fn close(&self) -> Result<()> {
self.session.close().await
}
Expand Down
3 changes: 3 additions & 0 deletions src/models/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ async fn _handle(router: &Router, stream: TcpStream, peer: SocketAddr) -> Result
#[cfg(feature = "perf")]
let now = std::time::Instant::now();
stream.set_ttl(20)?;
stream.set_nodelay(true)?;
stream.set_linger(Some(std::time::Duration::from_secs(0)))?;
socket2::SockRef::from(&stream).set_keepalive(true)?;
let mut session = Session::new(Socket::new(stream))?;

if let Err(error) = session.handshake(1).await {
Expand Down

0 comments on commit 7ad83ac

Please sign in to comment.