From c6ada0153e9b2dd41afae895cfe16d68370905fe Mon Sep 17 00:00:00 2001 From: Luisa Vasquez Gomez Date: Tue, 11 Feb 2025 05:58:12 -0800 Subject: [PATCH] modern sync: only log on success Summary: Metrics were looking weird, turns out we were logging even when an error happened, fixing that Reviewed By: markbt Differential Revision: D69397420 fbshipit-source-id: 6052e832f7c873a4d6869e8339369b8eccb6bd26 --- .../modern_sync/src/sender/manager.rs | 92 +++++++++++++------ 1 file changed, 64 insertions(+), 28 deletions(-) diff --git a/eden/mononoke/modern_sync/src/sender/manager.rs b/eden/mononoke/modern_sync/src/sender/manager.rs index 1aef3741e3284..b5968e4e3bf80 100644 --- a/eden/mononoke/modern_sync/src/sender/manager.rs +++ b/eden/mononoke/modern_sync/src/sender/manager.rs @@ -137,8 +137,9 @@ impl SendManager { encountered_error.get_or_insert( e.context(format!("Failed to upload content: {:?}", ct_id)), ); + } else { + STATS::synced_contents.add_value(1, (reponame.clone(),)); } - STATS::synced_contents.add_value(1, (reponame.clone(),)); } ContentMessage::ContentDone(sender) => { if let Some(e) = encountered_error { @@ -194,8 +195,9 @@ impl SendManager { encountered_error.get_or_insert( e.context(format!("Failed to upload filenodes: {:?}", f)), ); + } else { + STATS::synced_filenodes.add_value(1, (reponame.clone(),)); } - STATS::synced_filenodes.add_value(1, (reponame.clone(),)); } FileOrTreeMessage::Tree(t) if encountered_error.is_none() => { // Upload the trees through sender @@ -203,8 +205,9 @@ impl SendManager { encountered_error.get_or_insert( e.context(format!("Failed to upload trees: {:?}", t)), ); + } else { + STATS::synced_trees.add_value(1, (reponame.clone(),)); } - STATS::synced_trees.add_value(1, (reponame.clone(),)); } FileOrTreeMessage::FilesAndTreesDone(sender) => { if let Some(e) = encountered_error { @@ -242,7 +245,7 @@ impl SendManager { loop { tokio::select! { - msg = changeset_recv.recv()=>{ + msg = changeset_recv.recv() => { match msg { Some(ChangesetMessage::WaitForFilesAndTrees(receiver)) => { // Read outcome from files and trees upload @@ -255,54 +258,85 @@ impl SendManager { } Err(e) => { encountered_error.get_or_insert(anyhow::anyhow!( - "Error waiting for files/trees error received {:#}", e) - ); + "Error waiting for files/trees error received {:#}", + e + )); + } + _ => { + let elapsed = start.elapsed().as_secs(); + STATS::trees_files_wait_time_s + .add_value(elapsed as i64, (reponame.clone(),)); } - _ => (), } - let elapsed = start.elapsed().as_secs(); - STATS::trees_files_wait_time_s - .add_value(elapsed as i64, (reponame.clone(),)); } - Some(ChangesetMessage::Changeset((hg_cs, bcs))) if encountered_error.is_none() => { + Some(ChangesetMessage::Changeset((hg_cs, bcs))) + if encountered_error.is_none() => + { current_batch.push((hg_cs, bcs)); } - Some(ChangesetMessage::ChangesetDone(sender)) if encountered_error.is_none() => { + Some(ChangesetMessage::ChangesetDone(sender)) + if encountered_error.is_none() => + { pending_messages.push_back(sender); } - Some(ChangesetMessage::Log((_, lag))) if encountered_error.is_none() => { + Some(ChangesetMessage::Log((_, lag))) + if encountered_error.is_none() => + { pending_log.push_back(lag); } - Some(ChangesetMessage::ChangesetDone(sender)) =>{ + Some(ChangesetMessage::ChangesetDone(sender)) => { let e = encountered_error.unwrap(); - sender.send(Err(anyhow::anyhow!("Error processing changesets: {:?}", e))).await?; + sender + .send(Err(anyhow::anyhow!( + "Error processing changesets: {:?}", + e + ))) + .await?; return Err(e); } - Some(ChangesetMessage::Log((_, _))) | Some(ChangesetMessage::Changeset(_)) => {} + Some(ChangesetMessage::Log((_, _))) + | Some(ChangesetMessage::Changeset(_)) => {} None => break, } if current_batch.len() >= MAX_CHANGESET_BATCH_SIZE { - if let Err(e) = flush_batch(&changeset_es, &mut current_batch, &mut pending_messages, &mut pending_log, &changeset_logger, reponame.clone()).await { - return Err(anyhow::anyhow!("Error processing changesets: {:?}", e)); + if let Err(e) = flush_batch( + &changeset_es, + &mut current_batch, + &mut pending_messages, + &mut pending_log, + &changeset_logger, + reponame.clone(), + ) + .await + { + return Err(anyhow::anyhow!( + "Error processing changesets: {:?}", + e + )); } - } } _ = flush_timer.tick() => { - if let Err(e) = flush_batch(&changeset_es, &mut current_batch, &mut pending_messages, &mut pending_log, &changeset_logger, reponame.clone()).await { - return Err(anyhow::anyhow!("Error processing changesets: {:?}", e)); - } - - + if let Err(e) = flush_batch( + &changeset_es, + &mut current_batch, + &mut pending_messages, + &mut pending_log, + &changeset_logger, + reponame.clone(), + ) + .await + { + return Err(anyhow::anyhow!("Error processing changesets: {:?}", e)); + } } - } } @@ -323,13 +357,15 @@ impl SendManager { { error!(changeset_logger, "Failed to upload changesets: {:?}", e); return Err(e); + } else { + let elapsed = start.elapsed().as_secs() / batch_size as u64; + STATS::changeset_upload_time_s + .add_value(elapsed as i64, (reponame.clone(),)); + STATS::synced_commits.add_value(batch_size as i64, (reponame.clone(),)); } - let elapsed = start.elapsed().as_secs() / batch_size as u64; - STATS::changeset_upload_time_s.add_value(elapsed as i64, (reponame.clone(),)); } while let Some(Some(lag)) = pending_log.pop_front() { - STATS::synced_commits.add_value(1, (reponame.clone(),)); STATS::sync_lag_seconds.add_value(lag, (reponame.clone(),)); }