Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unchunking more carefully. #81

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions transports/src/udp_chunking.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::Result;
use log::{error, warn};
use messages::Message;
use parity_scale_codec::{Decode, Encode};
use parity_scale_codec_derive::{Decode as ParityDecode, Encode as ParityEncode};
Expand All @@ -8,7 +9,7 @@ use crate::chunking::MessageContainer;

#[derive(Clone, Debug, ParityDecode, ParityEncode)]
pub struct SimpleChunk {
// Random 16-bit ID used to identify the message this chunk belongs to
// Sender-specific 16-bit ID used to identify the message this chunk belongs to
pub message_id: u16,
// Sequence number indicates the order of reassembly
pub sequence_number: u16,
Expand All @@ -31,6 +32,8 @@ pub struct SimpleChunker {
recv_buffer: BTreeMap<u16, BTreeMap<u16, SimpleChunk>>,
// Last received message_id to optimize reassembly searching
last_recv_msg_id: u16,
pending: Vec<u16>,
next_outgoing_msg_id: u16,
}

impl SimpleChunker {
Expand All @@ -39,6 +42,8 @@ impl SimpleChunker {
mtu,
recv_buffer: BTreeMap::<u16, BTreeMap<u16, SimpleChunk>>::new(),
last_recv_msg_id: 0,
pending: Vec::new(),
next_outgoing_msg_id: 0,
}
}

Expand All @@ -56,9 +61,27 @@ impl SimpleChunker {
}

fn attempt_msg_assembly(&mut self) -> Result<Option<Message>> {
// TODO: This needs to be expanded beyond just assembling off the last received message id
// TODO: Data needs to be removed from the map once the message is assembled correctly
// TODO: Stale data in the map needs to be cleaned up periodically
if let Some(result) = self.attempt_assemble(self.last_recv_msg_id)? {
return Ok(Some(result));
}
if self.pending.is_empty() {
warn!("This should not have occurred.");
return Ok(None);
}
let index = rand::random::<u16>() as usize % self.pending.len();
let id = self.pending[index];
if let Some(result) = self.attempt_assemble(id)? {
return Ok(Some(result));
}
if self.pending.len() > 0xFF {
let id = self.pending.remove(0);
error!("Giving up on receiving message {id}");
self.drop_pending(id);
}
Ok(None)
}
fn attempt_assemble(&mut self, msg_id: u16) -> Result<Option<Message>> {
let mut result = None;
if let Some(msg_map) = self.recv_buffer.get(&self.last_recv_msg_id) {
// The BTreeMap docs tell us that into_values will be an iter sorted by key
// In this case the key is the sequence_number, so in a complete set of chunks
Expand All @@ -68,19 +91,24 @@ impl SimpleChunker {
if let Some(last_chunk) = chunks.last() {
// Second, check if the last chunk has final_chunk set
if last_chunk.final_chunk
// Lastly, check if the final chunk's sequence number matches the number of chunks
&& (usize::from(last_chunk.sequence_number) == (chunks.len() - 1))
// Lastly, check if the final chunk's sequence number matches the number of chunks
&& (usize::from(last_chunk.sequence_number) == (chunks.len() - 1))
{
// If all those checks pass, then we *should* have all the chunks in order
// Now we attempt to assemble the message
return Ok(Some(SimpleChunker::msg_unchunk(&chunks)?));
result = Some(SimpleChunker::msg_unchunk(&chunks)?);
}
}
}

Ok(None)
if result.is_some() {
self.drop_pending(msg_id);
}
Ok(result)
}
fn drop_pending(&mut self, msg_id: u16) {
self.recv_buffer.remove(&msg_id);
self.pending.retain(|i| *i != msg_id);
}

pub fn msg_unchunk(data: &[&SimpleChunk]) -> Result<Message> {
let mut all_data = vec![];
data.iter().for_each(|c| all_data.extend(&c.data));
Expand All @@ -89,8 +117,13 @@ impl SimpleChunker {
Ok(container.message)
}

pub fn chunk(&self, message: Message) -> Result<Vec<Vec<u8>>> {
let msg_id = rand::random::<u16>();
pub fn chunk(&mut self, message: Message) -> Result<Vec<Vec<u8>>> {
let msg_id = self.next_outgoing_msg_id;
if let Some(nxt_id) = self.next_outgoing_msg_id.checked_add(1u16) {
self.next_outgoing_msg_id = nxt_id;
} else {
self.next_outgoing_msg_id = 0;
}
let mut seq_num = 0;
// Create container around message
let container = MessageContainer::new(message);
Expand Down Expand Up @@ -157,7 +190,7 @@ mod tests {
let mtu_list = [60, 250, 500, 1024];

for mtu in mtu_list {
let chunker = SimpleChunker::new(mtu);
let mut chunker = SimpleChunker::new(mtu);
let msg = Message::ApplicationAPI(ApplicationAPI::MissingDagBlocks {
cid: "notarealcid".to_string(),
blocks: vec!["data".to_string(); 10240],
Expand Down