Skip to content

Commit

Permalink
modern sync: only log on success
Browse files Browse the repository at this point in the history
Summary: Metrics were looking weird, turns out we were logging even when an error happened, fixing that

Reviewed By: markbt

Differential Revision: D69397420

fbshipit-source-id: 6052e832f7c873a4d6869e8339369b8eccb6bd26
  • Loading branch information
lmvasquezg authored and facebook-github-bot committed Feb 11, 2025
1 parent b7de5ec commit c6ada01
Showing 1 changed file with 64 additions and 28 deletions.
92 changes: 64 additions & 28 deletions eden/mononoke/modern_sync/src/sender/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ impl SendManager {
encountered_error.get_or_insert(
e.context(format!("Failed to upload content: {:?}", ct_id)),
);
} else {
STATS::synced_contents.add_value(1, (reponame.clone(),));
}
STATS::synced_contents.add_value(1, (reponame.clone(),));
}
ContentMessage::ContentDone(sender) => {
if let Some(e) = encountered_error {
Expand Down Expand Up @@ -194,17 +195,19 @@ impl SendManager {
encountered_error.get_or_insert(
e.context(format!("Failed to upload filenodes: {:?}", f)),
);
} else {
STATS::synced_filenodes.add_value(1, (reponame.clone(),));
}
STATS::synced_filenodes.add_value(1, (reponame.clone(),));
}
FileOrTreeMessage::Tree(t) if encountered_error.is_none() => {
// Upload the trees through sender
if let Err(e) = files_trees_es.upload_trees(vec![t]).await {
encountered_error.get_or_insert(
e.context(format!("Failed to upload trees: {:?}", t)),
);
} else {
STATS::synced_trees.add_value(1, (reponame.clone(),));
}
STATS::synced_trees.add_value(1, (reponame.clone(),));
}
FileOrTreeMessage::FilesAndTreesDone(sender) => {
if let Some(e) = encountered_error {
Expand Down Expand Up @@ -242,7 +245,7 @@ impl SendManager {

loop {
tokio::select! {
msg = changeset_recv.recv()=>{
msg = changeset_recv.recv() => {
match msg {
Some(ChangesetMessage::WaitForFilesAndTrees(receiver)) => {
// Read outcome from files and trees upload
Expand All @@ -255,54 +258,85 @@ impl SendManager {
}
Err(e) => {
encountered_error.get_or_insert(anyhow::anyhow!(
"Error waiting for files/trees error received {:#}", e)
);
"Error waiting for files/trees error received {:#}",
e
));
}
_ => {
let elapsed = start.elapsed().as_secs();
STATS::trees_files_wait_time_s
.add_value(elapsed as i64, (reponame.clone(),));
}
_ => (),
}
let elapsed = start.elapsed().as_secs();
STATS::trees_files_wait_time_s
.add_value(elapsed as i64, (reponame.clone(),));
}

Some(ChangesetMessage::Changeset((hg_cs, bcs))) if encountered_error.is_none() => {
Some(ChangesetMessage::Changeset((hg_cs, bcs)))
if encountered_error.is_none() =>
{
current_batch.push((hg_cs, bcs));
}

Some(ChangesetMessage::ChangesetDone(sender)) if encountered_error.is_none() => {
Some(ChangesetMessage::ChangesetDone(sender))
if encountered_error.is_none() =>
{
pending_messages.push_back(sender);
}

Some(ChangesetMessage::Log((_, lag))) if encountered_error.is_none() => {
Some(ChangesetMessage::Log((_, lag)))
if encountered_error.is_none() =>
{
pending_log.push_back(lag);
}

Some(ChangesetMessage::ChangesetDone(sender)) =>{
Some(ChangesetMessage::ChangesetDone(sender)) => {
let e = encountered_error.unwrap();
sender.send(Err(anyhow::anyhow!("Error processing changesets: {:?}", e))).await?;
sender
.send(Err(anyhow::anyhow!(
"Error processing changesets: {:?}",
e
)))
.await?;
return Err(e);
}

Some(ChangesetMessage::Log((_, _))) | Some(ChangesetMessage::Changeset(_)) => {}
Some(ChangesetMessage::Log((_, _)))
| Some(ChangesetMessage::Changeset(_)) => {}

None => break,
}

if current_batch.len() >= MAX_CHANGESET_BATCH_SIZE {
if let Err(e) = flush_batch(&changeset_es, &mut current_batch, &mut pending_messages, &mut pending_log, &changeset_logger, reponame.clone()).await {
return Err(anyhow::anyhow!("Error processing changesets: {:?}", e));
if let Err(e) = flush_batch(
&changeset_es,
&mut current_batch,
&mut pending_messages,
&mut pending_log,
&changeset_logger,
reponame.clone(),
)
.await
{
return Err(anyhow::anyhow!(
"Error processing changesets: {:?}",
e
));
}

}
}
_ = flush_timer.tick() => {
if let Err(e) = flush_batch(&changeset_es, &mut current_batch, &mut pending_messages, &mut pending_log, &changeset_logger, reponame.clone()).await {
return Err(anyhow::anyhow!("Error processing changesets: {:?}", e));
}


if let Err(e) = flush_batch(
&changeset_es,
&mut current_batch,
&mut pending_messages,
&mut pending_log,
&changeset_logger,
reponame.clone(),
)
.await
{
return Err(anyhow::anyhow!("Error processing changesets: {:?}", e));
}
}

}
}

Expand All @@ -323,13 +357,15 @@ impl SendManager {
{
error!(changeset_logger, "Failed to upload changesets: {:?}", e);
return Err(e);
} else {
let elapsed = start.elapsed().as_secs() / batch_size as u64;
STATS::changeset_upload_time_s
.add_value(elapsed as i64, (reponame.clone(),));
STATS::synced_commits.add_value(batch_size as i64, (reponame.clone(),));
}
let elapsed = start.elapsed().as_secs() / batch_size as u64;
STATS::changeset_upload_time_s.add_value(elapsed as i64, (reponame.clone(),));
}

while let Some(Some(lag)) = pending_log.pop_front() {
STATS::synced_commits.add_value(1, (reponame.clone(),));
STATS::sync_lag_seconds.add_value(lag, (reponame.clone(),));
}

Expand Down

0 comments on commit c6ada01

Please sign in to comment.