Skip to content

Commit

Permalink
log: describe events better
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 2, 2024
1 parent e656091 commit de012c6
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions uplink/src/collector/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fs::metadata, sync::Arc, time::Duration};

use axum::{extract::State, routing::post, Json, Router};
use log::{debug, error, info};
use log::{debug, error, info, trace};
use reqwest::StatusCode;
use rumqttc::{AsyncClient, QoS};
use sqlx::{migrate::MigrateDatabase, Connection, Sqlite, SqliteConnection};
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Queue {

/// Write into payloads table
pub async fn push(&mut self, payload: &Payload) -> Result<(), Error> {
let raw = serde_json::to_string(payload)?;
let raw = dbg!(serde_json::to_string(payload))?;
sqlx::query!(
r#"INSERT INTO payloads ( stream, raw )
VALUES ( ?1, ?2 );"#,
Expand Down Expand Up @@ -110,10 +110,11 @@ async fn event(State(queue): State<Arc<Mutex<Queue>>>, Json(payload): Json<Paylo

let mut queue = queue.lock().await;
if let Err(e) = queue.push(&payload).await {
error!("{e}");
error!("Failed write to disk on stream: {}; error={e}", payload.stream);
return StatusCode::INTERNAL_SERVER_ERROR;
}

debug!("Event written to disk on stream: {}", payload.stream);
StatusCode::OK
}

Expand All @@ -126,7 +127,9 @@ async fn push_to_broker_on_ack(
let (id, stream, text) = match queue.lock().await.peek().await {
Ok(q) => q,
Err(Error::Sql(sqlx::Error::RowNotFound)) => {
debug!("Looks like event queue is handled for the time being, check again in 5s");
trace!(
"Looks like event queue is handled for the time being, checking again in 5s"
);
// Wait 5 seconds before asking for next
sleep(Duration::from_secs(5)).await;
continue;
Expand All @@ -152,9 +155,10 @@ async fn push_to_broker_on_ack(
if let Err(e) = queue.lock().await.pop(id).await {
error!("{e}");
}
debug!("Request has been acked by the broker on the topic={topic}");
}
Err(e) => {
error!("Eventloop dropped: {e}");
error!("Eventloop dropped: {e}; uplink restart");
break;
}
}
Expand Down

0 comments on commit de012c6

Please sign in to comment.