Skip to content

Commit

Permalink
Migrate backsyncer to mononoke_app and clap 4
Browse files Browse the repository at this point in the history
Summary:
# This Stack
Migrate backsyncer CLI to use mononoke_app + clap 4, instead of old cmdlib + clap 2.

I might also make some improvements to other cross-repo CLIs, e.g. forward syncer, bookmarks validator.

## This diff
Similar to D68889082 for bookmarks_validator, this actually migrates backsyncer to use mononoke_app + clap 4.

As I mentioned in D68889082, as much as I hate to publish a big diff, it's very, very hard to split this change, since all existing clap 4 APIs are tied to mononoke_app, so I have to make both changes together...

As I work on these binaries, I'm seeing a lot of repetition between then, especially related to shard manager code, so I'll probably introduce a common setup to make it easier to maintain.

test backsyncer after D68957325

Commit:
    bonsai=e1763dad7b35c98d0f82d8f1332e3c7025f7535423fb403489ecabf8ac577408
    git=59ea4e2caca552b32a364f6d4e4fed35e214375c
Date: 2025-01-17 14:04:22 +00:00
Committer Date: 2025-01-17 14:04:22 +00:00
Author: Gustavo Galvao Avena <gustavoavena@fb.com>
test pushing to gustavo-test from large repo

Commit:
    bonsai=f6f0cfebb7e5e35e16c4e182f6e3c61080d89b421d8f36bf6625bc5937dc4f76
    git=c634d4ec2925375124fab874c9bba363eab2cd43
Date: 2025-01-17 13:59:32 +00:00
Committer Date: 2025-01-17 13:59:32 +00:00
Author: Gustavo Galvao Avena <gustavoavena@meta.com>
Test locked bookmark hook

┬─[gustavoavena@devvm25564:~/bin]─[11:26:08]
```

TW tasks logs: P1725959703

Reviewed By: RajivTS

Differential Revision: D68957325

fbshipit-source-id: 3cf42c0c1afa987f629379a8b258e38e6b184612
  • Loading branch information
gustavoavena authored and facebook-github-bot committed Feb 13, 2025
1 parent 06c6da6 commit 8faf0ef
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ async-trait = "0.1.71"
backsyncer = { version = "0.1.0", path = ".." }
blobrepo_hg = { version = "0.1.0", path = "../../../blobrepo/blobrepo_hg" }
bookmarks = { version = "0.1.0", path = "../../../bookmarks" }
clap = "2.34.0"
clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] }
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
cmdlib = { version = "0.1.0", path = "../../../cmdlib" }
cmdlib_x_repo = { version = "0.1.0", path = "../../../cmdlib/x_repo" }
cmdlib_cross_repo = { version = "0.1.0", path = "../../../cmdlib/cross_repo" }
cmdlib_logging = { version = "0.1.0", path = "../../../cmdlib/log" }
context = { version = "0.1.0", path = "../../../server/context" }
cross_repo_sync = { version = "0.1.0", path = "../../cross_repo_sync" }
executor_lib = { version = "0.1.0", path = "../../../cmdlib/sharding" }
Expand All @@ -30,10 +30,10 @@ live_commit_sync_config = { version = "0.1.0", path = "../../live_commit_sync_co
mercurial_derivation = { version = "0.1.0", path = "../../../derived_data/mercurial_derivation" }
mercurial_types = { version = "0.1.0", path = "../../../mercurial/types" }
metadata = { version = "0.1.0", path = "../../../server/metadata" }
mononoke_app = { version = "0.1.0", path = "../../../cmdlib/mononoke_app" }
mononoke_types = { version = "0.1.0", path = "../../../mononoke_types" }
repo_identity = { version = "0.1.0", path = "../../../repo_attributes/repo_identity" }
sapling-clientinfo = { version = "0.1.0", path = "../../../../scm/lib/clientinfo" }
scuba_ext = { version = "0.1.0", path = "../../../common/scuba_ext" }
sharding_ext = { version = "0.1.0", path = "../../../cmdlib/sharding_ext" }
slog = { package = "tracing_slog_compat", version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
Expand Down
57 changes: 57 additions & 0 deletions eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/

use clap::command;
use clap::Args;
use clap::Parser;
use cmdlib_logging::ScribeLoggingArgs;
use executor_lib::args::ShardedExecutorArgs;
use mononoke_app::args::OptSourceAndTargetRepoArgs;

pub(crate) const SCUBA_TABLE: &str = "mononoke_xrepo_backsync";

#[derive(Clone, Debug, Args)]
#[clap(about = "Syncs all commits from the file")]
pub struct CommitsCommandArgs {
/// File containing the list of commits to sync
#[clap(long)]
pub input_file: String,

/// How many commits to backsync at once
#[clap(long)]
pub batch_size: Option<usize>,
}

#[derive(Debug, clap::Subcommand)]
pub enum BacksyncerCommand {
/// Backsyncs all new bookmark moves once
#[command(name = "backsync-all")]
Once,
/// Backsyncs all new bookmark moves
#[command(name = "backsync-forever")]
Forever,
/// Syncs all commits from the file
#[command(name = "backsync-commits")]
Commits(CommitsCommandArgs),
}

#[derive(Debug, Parser)]
#[clap(about = "CLI to sync commits from large repositories (i.e. mega repos) to small ones")]
pub struct BacksyncerArgs {
/// Identifiers or names for the source and target repos
#[clap(flatten, next_help_heading = "CROSS REPO OPTIONS")]
pub repo_args: OptSourceAndTargetRepoArgs,

#[clap(flatten)]
pub scribe_logging_args: ScribeLoggingArgs,

#[clap(flatten)]
pub sharded_executor_args: ShardedExecutorArgs,

#[command(subcommand)]
pub command: BacksyncerCommand,
}
144 changes: 75 additions & 69 deletions eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,88 +5,94 @@
* GNU General Public License version 2.
*/

mod cli;
mod run;
mod sharding;

use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::OnceLock;

use anyhow::Context;
use anyhow::Error;
use cmdlib::args;
use cmdlib::helpers;
use executor_lib::ShardedProcessExecutor;
use anyhow::Result;
use clientinfo::ClientEntryPoint;
use clientinfo::ClientInfo;
use context::CoreContext;
use context::SessionContainer;
use cross_repo_sync::Repo as CrossRepo;
use executor_lib::RepoShardedProcessExecutor;
use fbinit::FacebookInit;
use metadata::Metadata;
use mononoke_app::monitoring::AliveService;
use mononoke_app::monitoring::MonitoringAppExtension;
use mononoke_app::MononokeApp;
use mononoke_app::MononokeAppBuilder;
use slog::info;

use crate::run::run_backsyncer;
use crate::cli::BacksyncerArgs;
use crate::cli::SCUBA_TABLE;
use crate::sharding::BacksyncProcess;
use crate::sharding::BacksyncProcessExecutor;
use crate::sharding::APP_NAME;
use crate::sharding::DEFAULT_SHARDED_SCOPE_NAME;
use crate::sharding::SM_CLEANUP_TIMEOUT_SECS;

async fn async_main(ctx: CoreContext, app: MononokeApp) -> Result<(), Error> {
let args: BacksyncerArgs = app.args()?;
let ctx = Arc::new(ctx);
let app = Arc::new(app);
let repo_args = args.repo_args.clone();
let runtime = app.runtime().clone();

if let Some(mut executor) = args.sharded_executor_args.clone().build_executor(
app.fb,
runtime.clone(),
ctx.logger(),
|| Arc::new(BacksyncProcess::new(ctx.clone(), app.clone())),
true, // enable shard (repo) level healing
SM_CLEANUP_TIMEOUT_SECS,
)? {
executor
.block_and_execute(ctx.logger(), Arc::new(AtomicBool::new(false)))
.await
} else {
let repo_args = repo_args
.into_source_and_target_args()
.context("Source and Target repos must be provided when running in non-sharded mode")?;
let process_executor = BacksyncProcessExecutor::new(ctx.clone(), app, repo_args).await?;
process_executor.execute().await
}
}

#[fbinit::main]
fn main(fb: FacebookInit) -> Result<(), Error> {
let process = BacksyncProcess::new(fb)?;
match process.matches.value_of(cmdlib::args::SHARDED_SERVICE_NAME) {
Some(service_name) => {
// Don't fail if the scope name is missing, but use global. This allows us to be
// backward compatible with the old tw setup.
// TODO (Pierre): Once the tw jobs have been updated, we can be less lenient here.
let scope_name = process
.matches
.value_of(cmdlib::args::SHARDED_SCOPE_NAME)
.unwrap_or(DEFAULT_SHARDED_SCOPE_NAME);
// The service name needs to be 'static to satisfy SM contract
static SM_SERVICE_NAME: OnceLock<String> = OnceLock::new();
static SM_SERVICE_SCOPE_NAME: OnceLock<String> = OnceLock::new();
let logger = process.matches.logger().clone();
let matches = Arc::clone(&process.matches);
let mut executor = ShardedProcessExecutor::new(
process.fb,
process.matches.runtime().clone(),
&logger,
SM_SERVICE_NAME.get_or_init(|| service_name.to_string()),
SM_SERVICE_SCOPE_NAME.get_or_init(|| scope_name.to_string()),
SM_CLEANUP_TIMEOUT_SECS,
Arc::new(process),
true, // enable shard (repo) level healing
)?;
helpers::block_execute(
executor.block_and_execute(&logger, Arc::new(AtomicBool::new(false))),
fb,
&std::env::var("TW_JOB_NAME").unwrap_or_else(|_| APP_NAME.to_string()),
matches.logger(),
&matches,
cmdlib::monitoring::AliveService,
)
}
None => {
let logger = process.matches.logger().clone();
let matches = process.matches.clone();
let config_store = matches.config_store();
let source_repo_id =
args::not_shardmanager_compatible::get_source_repo_id(config_store, &matches)?;
let target_repo_id =
args::not_shardmanager_compatible::get_target_repo_id(config_store, &matches)?;
let (source_repo_name, _) =
args::get_config_by_repoid(config_store, &matches, source_repo_id)?;
let (target_repo_name, _) =
args::get_config_by_repoid(config_store, &matches, target_repo_id)?;
let fut = run_backsyncer(
fb,
matches.clone(),
source_repo_name,
target_repo_name,
Arc::new(AtomicBool::new(false)),
);
helpers::block_execute(
fut,
fb,
APP_NAME,
&logger,
&matches,
cmdlib::monitoring::AliveService,
)
}
}
let app: MononokeApp = MononokeAppBuilder::new(fb)
.with_app_extension(MonitoringAppExtension {})
.with_default_scuba_dataset(SCUBA_TABLE)
.build::<BacksyncerArgs>()?;
let args: BacksyncerArgs = app.args()?;

let mut metadata = Metadata::default();
metadata.add_client_info(ClientInfo::default_with_entry_point(
ClientEntryPoint::MegarepoBacksyncer,
));

let scribe = args.scribe_logging_args.get_scribe(fb)?;

let mut scuba = app.environment().scuba_sample_builder.clone();
scuba.add_metadata(&metadata);
scuba.add_common_server_data();

let session_container = SessionContainer::builder(fb)
.metadata(Arc::new(metadata))
.build();

let ctx = session_container.new_context_with_scribe(app.logger().clone(), scuba, scribe);

info!(
ctx.logger(),
"Starting session with id {}",
ctx.metadata().session_id(),
);

app.run_with_monitoring_and_logging(|app| async_main(ctx.clone(), app), APP_NAME, AliveService)
}
Loading

0 comments on commit 8faf0ef

Please sign in to comment.