diff --git a/uplink/src/base/serializer.rs b/uplink/src/base/serializer.rs index 086718e0..aa7fcd3d 100644 --- a/uplink/src/base/serializer.rs +++ b/uplink/src/base/serializer.rs @@ -193,47 +193,19 @@ impl Serializer { } /// Write all data received, from here-on, to disk only. - async fn crash(&mut self, mut publish: Publish) -> Result { + async fn crash(&mut self, publish: Publish) -> Result { let storage = match &mut self.storage { Some(s) => s, None => return Err(Error::MissingPersistence), }; - // Write failed publish to disk first - publish.pkid = 1; - - if let Err(e) = publish.write(storage.writer()) { - error!("Failed to fill write buffer during bad network. Error = {:?}", e); - } - - if let Err(e) = storage.flush_on_overflow() { - error!("Failed to flush write buffer to disk during bad network. Error = {:?}", e); - } + // Write failed publish to disk first, metrics don't matter + write_to_disk(publish, storage, &mut None); loop { - // Collect next data packet to write to disk + // Collect next data packet and write to disk let data = self.collector_rx.recv_async().await?; - let topic = data.topic(); - let payload = data.serialize()?; - let stream = data.stream().as_ref().to_owned(); - let point_count = data.len(); - let batch_latency = data.batch_latency(); - trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}"); - if let Some(handler) = self.stream_metrics.as_mut() { - handler.update(stream, point_count, batch_latency); - } - - let mut publish = Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload); - publish.pkid = 1; - - if let Err(e) = publish.write(storage.writer()) { - error!("Failed to fill write buffer during bad network. Error = {:?}", e); - continue; - } - - if let Err(e) = storage.flush_on_overflow() { - error!("Failed to flush write buffer to disk during bad network. Error = {:?}", e); - continue; - } + let publish = construct_publish(&mut None, data)?; + write_to_disk(publish, storage, &mut None); } } @@ -265,41 +237,8 @@ impl Serializer { } } - let topic = data.topic(); - let payload = data.serialize()?; - let stream = data.stream().as_ref().to_owned(); - let point_count = data.len(); - let batch_latency = data.batch_latency(); - trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}"); - if let Some(handler) = self.stream_metrics.as_mut() { - handler.update(stream, point_count, batch_latency); - } - - let payload_size = payload.len(); - let mut publish = Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload); - publish.pkid = 1; - - match publish.write(storage.writer()) { - Ok(_) => if let Some(handler) = self.serializer_metrics.as_mut(){ - handler.add_total_disk_size(payload_size) - }, - Err(e) => { - error!("Failed to fill disk buffer. Error = {:?}", e); - continue - } - } - - match storage.flush_on_overflow() { - Ok(deleted) => if let Some(handler) = self.serializer_metrics.as_mut() { - if deleted.is_some() { - handler.increment_lost_segments(); - } - }, - Err(e) => { - error!("Failed to flush disk buffer. Error = {:?}", e); - continue - } - } + let publish = construct_publish(&mut self.stream_metrics, data)?; + write_to_disk(publish, storage, &mut self.serializer_metrics); } o = &mut publish => match o { Ok(_) => return Ok(Status::EventLoopReady), @@ -354,41 +293,8 @@ impl Serializer { } } - let topic = data.topic(); - let payload = data.serialize()?; - let stream = data.stream().as_ref().to_owned(); - let point_count = data.len(); - let batch_latency = data.batch_latency(); - trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}"); - if let Some(handler) = self.stream_metrics.as_mut() { - handler.update(stream, point_count, batch_latency); - } - - let payload_size = payload.len(); - let mut publish = Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload); - publish.pkid = 1; - - match publish.write(storage.writer()) { - Ok(_) => if let Some(handler) = self.serializer_metrics.as_mut() { - handler.add_total_disk_size(payload_size) - }, - Err(e) => { - error!("Failed to fill disk buffer. Error = {:?}", e); - continue - } - } - - match storage.flush_on_overflow() { - Ok(deleted) => if let Some(handler) = self.serializer_metrics.as_mut() { - if deleted.is_some() { - handler.increment_lost_segments(); - } - }, - Err(e) => { - error!("Failed to flush write buffer to disk during catchup. Error = {:?}", e); - continue - } - } + let publish = construct_publish(&mut self.stream_metrics, data)?; + write_to_disk(publish, storage, &mut self.serializer_metrics); } o = &mut send => { // Send failure implies eventloop crash. Switch state to @@ -448,18 +354,9 @@ impl Serializer { } } - let topic = data.topic(); - let payload = data.serialize()?; - let stream = data.stream().as_ref().to_owned(); - let point_count = data.len(); - let batch_latency = data.batch_latency(); - trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}"); - if let Some(handler) = self.stream_metrics.as_mut() { - handler.update(stream, point_count, batch_latency); - } - - let payload_size = payload.len(); - match self.client.try_publish(topic.as_ref(), QoS::AtLeastOnce, false, payload) { + let publish = construct_publish(&mut self.stream_metrics, data)?; + let payload_size = publish.payload.len(); + match self.client.try_publish(publish.topic, QoS::AtLeastOnce, false, publish.payload) { Ok(_) => if let Some(handler) = self.serializer_metrics.as_mut() { handler.add_total_sent_size(payload_size); continue; @@ -469,6 +366,7 @@ impl Serializer { } } + // On a regular interval, forwards metrics information to network _ = interval.tick(), if metrics_enabled => { if let Some(handler) = self.serializer_metrics.as_mut() { let data = handler.update(); @@ -538,6 +436,58 @@ async fn send_publish( Ok(client) } +// Constructs a [Publish] packet given a [Package] element. Updates stream metrics as necessary. +fn construct_publish( + stream_metrics: &mut Option, + data: Box, +) -> Result { + let stream = data.stream().as_ref().to_owned(); + let point_count = data.len(); + let batch_latency = data.batch_latency(); + trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}"); + if let Some(handler) = stream_metrics { + handler.update(stream, point_count, batch_latency); + } + + let topic = data.topic(); + let payload = data.serialize()?; + + Ok(Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload)) +} + +// Writes the provided publish packet to disk with [Storage], after setting its pkid to 1. +// Updates serializer metrics with appropriate values on success, if asked to do so. +fn write_to_disk( + mut publish: Publish, + storage: &mut Storage, + serializer_metrics: &mut Option, +) { + publish.pkid = 1; + let payload_size = publish.payload.len(); + if let Err(e) = publish.write(storage.writer()) { + error!("Failed to fill disk buffer. Error = {:?}", e); + return; + } + + if let Some(handler) = serializer_metrics { + handler.add_total_disk_size(payload_size) + } + + let deleted = match storage.flush_on_overflow() { + Ok(d) => d, + Err(e) => { + error!("Failed to flush disk buffer. Error = {:?}", e); + return; + } + }; + + if let Some(handler) = serializer_metrics { + if deleted.is_some() { + handler.increment_lost_segments(); + } + } +} + #[cfg(test)] mod test { use serde_json::Value; @@ -609,16 +559,6 @@ mod test { } } - fn write_to_storage(storage: &mut Storage, publish: &Publish) { - if let Err(e) = publish.write(storage.writer()) { - panic!("Failed to fill write buffer. Error = {:?}", e); - } - - if let Err(e) = storage.flush_on_overflow() { - panic!("Failed to flush write buffer to disk. Error = {:?}", e); - } - } - fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish { if storage.reload_on_eof().unwrap() { panic!("No publishes found in storage"); @@ -740,12 +680,12 @@ mod test { QoS::AtLeastOnce, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - publish.pkid = 1; - - write_to_storage(&mut storage, &publish); + write_to_disk(publish.clone(), &mut storage, &mut None); let stored_publish = read_from_storage(&mut storage, serializer.config.max_packet_size); + // Ensure publish.pkid is 1, as written to disk + publish.pkid = 1; assert_eq!(publish, stored_publish); } @@ -862,14 +802,12 @@ mod test { }); // Force write a publish into storage - let mut publish = Publish::new( + let publish = Publish::new( "hello/world", QoS::AtLeastOnce, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - publish.pkid = 1; - - write_to_storage(&mut storage, &publish); + write_to_disk(publish.clone(), &mut storage, &mut None); // Replace storage into serializer serializer.storage = Some(storage); @@ -898,14 +836,12 @@ mod test { }); // Force write a publish into storage - let mut publish = Publish::new( + let publish = Publish::new( "hello/world", QoS::AtLeastOnce, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - publish.pkid = 1; - - write_to_storage(&mut storage, &publish); + write_to_disk(publish.clone(), &mut storage, &mut None); // Replace storage into serializer serializer.storage = Some(storage); diff --git a/uplink/src/base/serializer/metrics.rs b/uplink/src/base/serializer/metrics.rs index f6317257..6559f76a 100644 --- a/uplink/src/base/serializer/metrics.rs +++ b/uplink/src/base/serializer/metrics.rs @@ -79,6 +79,7 @@ impl SerializerMetricsHandler { self.metrics.errors.push_str(" | "); } + // Retrieve metrics to send on network pub fn update(&mut self) -> &SerializerMetrics { let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)); @@ -128,6 +129,8 @@ impl StreamMetricsHandler { Some(Self { topic, map: Default::default() }) } + /// Updates the metrics for a stream as deemed necessary with the count of points in batch + /// and the difference between first and last elements timestamp as latency being inputs. pub fn update(&mut self, stream: String, point_count: usize, batch_latency: u64) { // Init stream metrics max/min values with opposite extreme values to ensure first latency value is accepted let metrics = self.map.entry(stream.clone()).or_insert(StreamMetrics { @@ -139,6 +142,7 @@ impl StreamMetricsHandler { metrics.max_latency = metrics.max_latency.max(batch_latency); metrics.min_latency = metrics.min_latency.min(batch_latency); + // NOTE: Average latency is calculated in a slightly lossy fashion, let total_latency = (metrics.average_latency * metrics.batch_count) + batch_latency; metrics.batch_count += 1;