Skip to content

Commit

Permalink
Merge pull request #13 from editso/dev
Browse files Browse the repository at this point in the history
修复加密导致数据传输不完整问题
  • Loading branch information
editso authored Dec 9, 2021
2 parents adfd5b3 + a07af23 commit 45f238d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 39 deletions.
35 changes: 29 additions & 6 deletions fuso-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,29 @@ pub fn now_mills() -> u64 {
.as_secs()
}

#[inline]
pub async fn copy<R, W>(mut reader: R, mut writer: W) -> std::io::Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
loop {
let mut buf = Vec::new();
buf.resize(0x2000, 0);

let n = reader.read(&mut buf).await?;

if n == 0 {
let _ = writer.close().await;
break Ok(());
}

buf.truncate(n);

writer.write_all(&mut buf).await?;
}
}

#[derive(Debug, Clone)]
pub struct Packet {
magic: u32,
Expand Down Expand Up @@ -317,12 +340,12 @@ where
let (reader_s, writer_s) = self.split();
let (reader_t, writer_t) = to.split();

smol::future::race(
smol::io::copy(reader_t, writer_s),
smol::io::copy(reader_s, writer_t),
)
.await
.map_err(|e| error::Error::with_io(e))?;
smol::future::race(copy(reader_t, writer_s), copy(reader_s, writer_t))
.await
.map_err(|e| {
log::warn!("{}", e);
error::Error::with_io(e)
})?;

Ok(())
}
Expand Down
55 changes: 22 additions & 33 deletions fuso-core/src/ciphe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait Cipher {
pub struct Crypt<T, C> {
buf: Arc<Mutex<Buffer<u8>>>,
target: T,
cipher: Arc<Mutex<C>>,
cipher: C,
}

#[derive(Clone)]
Expand Down Expand Up @@ -74,7 +74,7 @@ where
Crypt {
target: self,
buf: Arc::new(Mutex::new(Buffer::new())),
cipher: Arc::new(Mutex::new(c)),
cipher: c,
}
}
}
Expand All @@ -95,36 +95,31 @@ where
let mut io_buf = io_buf.lock().unwrap();

if !io_buf.is_empty() {
log::info!("read buffer");
Pin::new(&mut *io_buf).poll_read(cx, buf)
} else {
match Pin::new(&mut self.target).poll_read(cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(0)) => Poll::Ready(Ok(0)),
Poll::Ready(Ok(n)) => {
let mut cipher = self.cipher.lock().unwrap();

match Pin::new(&mut *cipher).poll_decode(cx, &buf[..n]) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(data)) => {
let total = buf.len();
let mut cur = Cursor::new(buf);

let write_len = if total >= data.len() {
cur.write_all(&data).unwrap();
data.len()
} else {
cur.write_all(&data[..total]).unwrap();
io_buf.push_back(&data[total..]);
total
};

Poll::Ready(Ok(write_len))
}
Poll::Ready(Ok(n)) => match Pin::new(&mut self.cipher).poll_decode(cx, &buf[..n]) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(data)) => {
let total = buf.len();
let mut cur = Cursor::new(buf);

let write_len = if total >= data.len() {
cur.write_all(&data).unwrap();
data.len()
} else {
cur.write_all(&data[..total]).unwrap();
io_buf.push_back(&data[total..]);
total
};

Poll::Ready(Ok(write_len))
}
}
},
}
}
}
Expand All @@ -142,16 +137,10 @@ where
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let cipher = self.cipher.clone();
let mut cipher = cipher.lock().unwrap();

match Pin::new(&mut *cipher).poll_encode(cx, buf) {
match Pin::new(&mut self.cipher).poll_encode(cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(data)) => {
let _ = Pin::new(&mut self.target).poll_write(cx, &data)?;
Poll::Ready(Ok(buf.len()))
}
Poll::Ready(Ok(data)) => Pin::new(&mut self.target).poll_write(cx, &data),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn main() {
if let Err(e) = from.forward(to).await {
log::debug!("[fuc] Forwarding failed {}", e);
}

}
.detach()
})
Expand Down

0 comments on commit 45f238d

Please sign in to comment.