diff --git a/Cargo.lock b/Cargo.lock index d6221be2..cbfad377 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -636,9 +636,11 @@ name = "anchor" version = "0.1.0" dependencies = [ "async-channel", + "bls", "clap", "client", "dirs 5.0.1", + "eth2_network_config", "futures", "regex", "sensitive_url", @@ -656,8 +658,11 @@ version = "0.1.0" dependencies = [ "beacon_node_fallback", "dashmap", + "database", "eth2", "futures", + "hex", + "openssl", "parking_lot", "qbft", "qbft_manager", @@ -1543,15 +1548,19 @@ dependencies = [ "anchor_validator_store", "beacon_node_fallback", "clap", + "database", "dirs 5.0.1", + "eth", "eth2", "eth2_config", + "eth2_network_config", "ethereum_hashing", "fdlimit", "http_api", "http_metrics", "hyper 1.5.2", "network", + "openssl", "parking_lot", "processor", "qbft_manager", @@ -1570,6 +1579,7 @@ dependencies = [ "validator_metrics", "validator_services", "version", + "zeroize", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f2c89aba..1c8c2fca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,8 +38,10 @@ ssv_types = { path = "anchor/common/ssv_types" } version = { path = "anchor/common/version" } beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } +bls = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } eth2 = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } eth2_config = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } +eth2_network_config = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } health_metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } lighthouse_network = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" } @@ -74,7 +76,6 @@ dashmap = "6.1.0" derive_more = { version = "1.0.0", features = ["full"] } dirs = "5.0.1" discv5 = "0.9.0" -either = "1.13.0" futures = "0.3.30" hex = "0.4.3" hyper = "1.4" @@ -98,6 +99,7 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } tree_hash = "0.8" tree_hash_derive = "0.8" +zeroize = "1.8.1" [profile.maxperf] inherits = "release" diff --git a/anchor/Cargo.toml b/anchor/Cargo.toml index 5a3c5538..775f1a5b 100644 --- a/anchor/Cargo.toml +++ b/anchor/Cargo.toml @@ -7,9 +7,11 @@ rust-version = "1.83.0" [dependencies] async-channel = { workspace = true } +bls = { workspace = true } clap = { workspace = true } client = { workspace = true } dirs = { workspace = true } +eth2_network_config = { workspace = true } futures = { workspace = true } sensitive_url = { workspace = true } serde = { workspace = true } @@ -21,3 +23,11 @@ types = { workspace = true } [dev-dependencies] regex = "1.10.6" + +[features] +# Compiles the BLS crypto code so that the binary is portable across machines. +portable = ["bls/supranational-portable"] +# Compiles BLST so that it always uses ADX instructions. +modern = ["bls/supranational-force-adx"] +# Support minimal spec (used for testing only). +spec-minimal = [] diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index 84562eba..f73872cb 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -12,15 +12,19 @@ path = "src/lib.rs" anchor_validator_store = { workspace = true } beacon_node_fallback = { workspace = true } clap = { workspace = true } +database = { workspace = true } dirs = { workspace = true } +eth = { workspace = true } eth2 = { workspace = true } eth2_config = { workspace = true } +eth2_network_config = { workspace = true } ethereum_hashing = "0.7.0" fdlimit = "0.3" http_api = { workspace = true } http_metrics = { workspace = true } hyper = { workspace = true } network = { workspace = true } +openssl = { workspace = true } parking_lot = { workspace = true } processor = { workspace = true } qbft_manager = { workspace = true } @@ -39,3 +43,4 @@ unused_port = { workspace = true } validator_metrics = { workspace = true } validator_services = { workspace = true } version = { workspace = true } +zeroize = { workspace = true } diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 944f438a..b847b75d 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -95,15 +95,25 @@ pub struct Anchor { #[clap( long, + short = 't', + global = true, value_name = "DIR", - help = "The directory which contains the password to unlock the validator \ - voting keypairs. Each password should be contained in a file where the \ - name is the 0x-prefixed hex representation of the validators voting public \ - key. Defaults to ~/.lighthouse/{network}/secrets.", - conflicts_with = "datadir", + help = "Path to directory containing eth2_testnet specs.", display_order = 0 )] - pub secrets_dir: Option, + pub testnet_dir: Option, + + #[clap( + long, + global = true, + value_name = "NETWORK", + value_parser = vec!["mainnet", "holesky"], + conflicts_with = "testnet_dir", + help = "Name of the chain Anchor will validate.", + display_order = 0, + default_value = crate::config::DEFAULT_HARDCODED_NETWORK, + )] + pub network: String, /* External APIs */ #[clap( diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 235a5cce..b759f198 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -2,9 +2,9 @@ // use clap_utils::{flags::DISABLE_MALLOC_TUNING_FLAG, parse_optional, parse_required}; use crate::cli::Anchor; +use eth2_network_config::Eth2NetworkConfig; use network::{ListenAddr, ListenAddress}; use sensitive_url::SensitiveUrl; -use serde::{Deserialize, Serialize}; use std::fs; use std::net::IpAddr; use std::path::PathBuf; @@ -12,20 +12,21 @@ use tracing::warn; pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; pub const DEFAULT_EXECUTION_NODE: &str = "http://localhost:8545/"; +pub const DEFAULT_EXECUTION_NODE_WS: &str = "ws://localhost:8545/"; /// The default Data directory, relative to the users home directory pub const DEFAULT_ROOT_DIR: &str = ".anchor"; /// Default network, used to partition the data storage pub const DEFAULT_HARDCODED_NETWORK: &str = "mainnet"; -/// Directory within the network directory where secrets are stored. -pub const DEFAULT_SECRETS_DIR: &str = "secrets"; +/// Base directory name for unnamed testnets passed through the --testnet-dir flag +pub const CUSTOM_TESTNET_DIR: &str = "custom"; /// Stores the core configuration for this Anchor instance. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone)] pub struct Config { /// The data directory, which stores all validator databases pub data_dir: PathBuf, - /// The directory containing the passwords to unlock validator keystores. - pub secrets_dir: PathBuf, + /// The Eth2 Network to use + pub eth2_network: Eth2NetworkConfig, /// The http endpoints of the beacon node APIs. /// /// Should be similar to `["http://localhost:8080"]` @@ -54,24 +55,34 @@ pub struct Config { pub processor: processor::Config, } -impl Default for Config { +impl Config { /// Build a new configuration from defaults. - fn default() -> Self { - // WARNING: these directory defaults should be always overwritten with parameters from cli - // for specific networks. + /// + /// eth2_network: We pass this because it would be expensive to uselessly get a default eagerly. + fn new(eth2_network: Eth2NetworkConfig) -> Self { let data_dir = dirs::home_dir() .unwrap_or_else(|| PathBuf::from(".")) .join(DEFAULT_ROOT_DIR) - .join(DEFAULT_HARDCODED_NETWORK); - let secrets_dir = data_dir.join(DEFAULT_SECRETS_DIR); + .join( + eth2_network + .config + .config_name + .as_deref() + .unwrap_or("custom"), + ); let beacon_nodes = vec![SensitiveUrl::parse(DEFAULT_BEACON_NODE) .expect("beacon_nodes must always be a valid url.")]; - let execution_nodes = vec![SensitiveUrl::parse(DEFAULT_EXECUTION_NODE) - .expect("execution_nodes must always be a valid url.")]; + let execution_nodes = vec![ + SensitiveUrl::parse(DEFAULT_EXECUTION_NODE) + .expect("execution_nodes must always be a valid url."), + SensitiveUrl::parse(DEFAULT_EXECUTION_NODE_WS) + .expect("execution_nodes must always be a valid url."), + ]; + Self { data_dir, - secrets_dir, + eth2_network, beacon_nodes, proposer_nodes: vec![], execution_nodes, @@ -90,27 +101,20 @@ impl Default for Config { /// Returns a `Default` implementation of `Self` with some parameters modified by the supplied /// `cli_args`. pub fn from_cli(cli_args: &Anchor) -> Result { - let mut config = Config::default(); - - let default_root_dir = dirs::home_dir() - .map(|home| home.join(DEFAULT_ROOT_DIR)) - .unwrap_or_else(|| PathBuf::from(".")); + let eth2_network = if let Some(_testnet_dir) = &cli_args.testnet_dir { + // todo + return Err("testnet dir not yet supported".into()); + } else { + Eth2NetworkConfig::constant(&cli_args.network) + .and_then(|net| net.ok_or_else(|| format!("Unknown network {}", cli_args.network))) + }?; - let (mut data_dir, mut secrets_dir) = (None, None); + let mut config = Config::new(eth2_network); if let Some(datadir) = cli_args.datadir.clone() { - secrets_dir = Some(datadir.join(DEFAULT_SECRETS_DIR)); - data_dir = Some(datadir); + config.data_dir = datadir; } - if cli_args.secrets_dir.is_some() { - secrets_dir = cli_args.secrets_dir.clone(); - } - - config.data_dir = data_dir.unwrap_or_else(|| default_root_dir.join(DEFAULT_ROOT_DIR)); - - config.secrets_dir = secrets_dir.unwrap_or_else(|| default_root_dir.join(DEFAULT_SECRETS_DIR)); - if !config.data_dir.exists() { fs::create_dir_all(&config.data_dir) .map_err(|e| format!("Failed to create {:?}: {:?}", config.data_dir, e))?; @@ -390,6 +394,10 @@ mod tests { #[test] // Ensures the default config does not panic. fn default_config() { - Config::default(); + Config::new( + Eth2NetworkConfig::constant(DEFAULT_HARDCODED_NETWORK) + .unwrap() + .unwrap(), + ); } } diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 2aa4bb41..138859c8 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -10,10 +10,12 @@ use beacon_node_fallback::{ }; pub use cli::Anchor; use config::Config; +use database::NetworkDatabase; use eth2::reqwest::{Certificate, ClientBuilder}; use eth2::{BeaconNodeHttpClient, Timeouts}; -use eth2_config::Eth2Config; use network::Network; +use openssl::pkey::Private; +use openssl::rsa::Rsa; use parking_lot::RwLock; use qbft_manager::QbftManager; use sensitive_url::SensitiveUrl; @@ -22,23 +24,26 @@ use slashing_protection::SlashingDatabase; use slot_clock::{SlotClock, SystemTimeSlotClock}; use ssv_types::OperatorId; use std::fs::File; -use std::io::Read; +use std::io::{ErrorKind, Read, Write}; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::net::TcpListener; -use tokio::sync::mpsc; +use tokio::select; +use tokio::sync::oneshot::Receiver; +use tokio::sync::{mpsc, oneshot}; use tokio::time::sleep; use tracing::{debug, error, info, warn}; -use types::{EthSpec, Hash256}; +use types::{ChainSpec, EthSpec, Hash256}; use validator_metrics::set_gauge; use validator_services::attestation_service::AttestationServiceBuilder; use validator_services::block_service::BlockServiceBuilder; use validator_services::duties_service; use validator_services::duties_service::DutiesServiceBuilder; use validator_services::preparation_service::PreparationServiceBuilder; +use zeroize::Zeroizing; /// The filename within the `validators` directory that contains the slashing protection DB. const SLASHING_PROTECTION_FILENAME: &str = "slashing_protection.sqlite"; @@ -90,8 +95,18 @@ impl Client { "Starting the Anchor client" ); - // TODO make configurable - let spec = Eth2Config::mainnet().spec; + let spec = Arc::new(config.eth2_network.chain_spec::()?); + + let key = read_or_generate_private_key(&config.data_dir.join("key.pem"))?; + let err = |e| format!("Unable to derive public key: {e:?}"); + let pubkey = Rsa::from_public_components( + key.n().to_owned().map_err(err)?, + key.e().to_owned().map_err(err)?, + ) + .map_err(err)?; + + // Start the processor + let processor_senders = processor::spawn(config.processor, executor.clone()); // Optionally start the metrics server. let http_metrics_shared_state = if config.http_metrics.enabled { @@ -125,6 +140,12 @@ impl Client { // Spawn the network listening task executor.spawn(network.run(), "network"); + // Open database + let database = Arc::new( + NetworkDatabase::new(config.data_dir.join("anchor_db.sqlite").as_path(), &pubkey) + .map_err(|e| format!("Unable to open Anchor database: {e}"))?, + ); + // Initialize slashing protection. let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); let slashing_protection = @@ -276,26 +297,72 @@ impl Client { let proposer_nodes = Arc::new(proposer_nodes); start_fallback_updater_service::<_, E>(executor.clone(), proposer_nodes.clone())?; - // Start the processor - let processor_senders = processor::spawn(config.processor, executor.clone()); + // Wait until genesis has occurred. + wait_for_genesis(&beacon_nodes, genesis_time).await?; + + // Start syncer + let (historic_finished_tx, historic_finished_rx) = oneshot::channel(); + let mut syncer = eth::SsvEventSyncer::new( + database.clone(), + // TODO this is very hacky, but `eth::Config` has a TODO anyways + eth::Config { + http_url: config + .execution_nodes + .first() + .ok_or("No execution node http url specified")? + .full + .to_string(), + ws_url: config + .execution_nodes + .get(1) + .ok_or("No execution node wss url specified")? + .full + .to_string(), + beacon_url: "".to_string(), // this one is not actually needed :) + network: match spec.config_name.as_deref() { + Some("mainnet") => eth::Network::Mainnet, + Some("holesky") => eth::Network::Holesky, + _ => return Err(format!("Unsupported network {:?}", spec.config_name)), + }, + historic_finished_notify: Some(historic_finished_tx), + }, + ) + .await + .map_err(|e| format!("Unable to create syncer: {e}"))?; + + executor.spawn( + async move { + if let Err(e) = syncer.sync().await { + error!("Syncer failed: {e}"); + } + }, + "syncer", + ); + + // Wait until we have an operator id and historical sync is done + let operator_id = + wait_for_operator_id_and_sync(&database, historic_finished_rx, &spec).await; - // Create the processor-adjacent managers + // Create the signature collector let signature_collector = SignatureCollectorManager::new(processor_senders.clone(), slot_clock.clone()) .map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?; + // Create the qbft manager let qbft_manager = - QbftManager::new(processor_senders.clone(), OperatorId(1), slot_clock.clone()) + QbftManager::new(processor_senders.clone(), operator_id, slot_clock.clone()) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; let validator_store = Arc::new(AnchorValidatorStore::<_, E>::new( + database, signature_collector, qbft_manager, slashing_protection, slot_clock.clone(), spec.clone(), genesis_validators_root, - OperatorId(123), + operator_id, + key, )); let duties_service = Arc::new( @@ -365,9 +432,6 @@ impl Client { let channel_capacity = E::slots_per_epoch() as usize; let (block_service_tx, block_service_rx) = mpsc::channel(channel_capacity); - // Wait until genesis has occurred. - wait_for_genesis(&beacon_nodes, genesis_time).await?; - duties_service::start_update_service(duties_service.clone(), block_service_tx); block_service @@ -562,6 +626,28 @@ async fn poll_whilst_waiting_for_genesis( } } +async fn wait_for_operator_id_and_sync( + database: &Arc, + mut sync_notification: Receiver<()>, + spec: &Arc, +) -> OperatorId { + let sleep_duration = Duration::from_secs(spec.seconds_per_slot); + let id = loop { + if let Some(id) = database.get_own_id() { + break id; + } + info!("Waiting for operator id"); + sleep(sleep_duration).await; + }; + info!(id = *id, "Operator found on chain"); + loop { + select! { + _ = &mut sync_notification => return id, + _ = sleep(sleep_duration) => info!("Waiting for historical sync to finish"), + } + } +} + pub fn load_pem_certificate>(pem_path: P) -> Result { let mut buf = Vec::new(); File::open(&pem_path) @@ -570,3 +656,44 @@ pub fn load_pem_certificate>(pem_path: P) -> Result Result, String> { + match File::open(path) { + Ok(mut file) => { + // there seems to be an existing file, try to read key + let mut key_string = Zeroizing::new(String::with_capacity( + // it's important for Zeroizing to properly work that we don't reallocate + file.metadata() + .map(|m| m.len() as usize + 1) + .unwrap_or(10_000), + )); + file.read_to_string(&mut key_string) + .map_err(|e| format!("Unable to read private key at {path:?}: {e:?}"))?; + // TODO support passphrase + Rsa::private_key_from_pem(key_string.as_ref()) + .map_err(|e| format!("Unable to read private key: {e:?}")) + } + Err(err) => { + // only try to write a new one if we get a "not found" error + // to not accidentally overwrite something the user might be able to recover + if err.kind() != ErrorKind::NotFound { + return Err(format!("Unable to read private key at {path:?}: {err:?}")); + } + + info!(path = %path.as_os_str().to_string_lossy(), "Creating private key"); + + let mut file = File::create(path) + .map_err(|e| format!("Unable create private key file at {path:?}: {e:?}"))?; + + let key = Rsa::generate(2048).map_err(|e| format!("Unable to generate key: {e:?}"))?; + + file.write_all(&Zeroizing::new( + key.private_key_to_pem() + .map_err(|e| format!("Unable serialize private key: {e:?}"))?, + )) + .map_err(|e| format!("Unable to write private key: {e:?}"))?; + + Ok(key) + } + } +} diff --git a/anchor/database/src/lib.rs b/anchor/database/src/lib.rs index 859db7d4..a4a41ff6 100644 --- a/anchor/database/src/lib.rs +++ b/anchor/database/src/lib.rs @@ -28,6 +28,7 @@ mod tests; const POOL_SIZE: u32 = 1; const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); +const UNKNOWN_OWN_OPERATOR_ID: OperatorId = OperatorId(u64::MAX); type Pool = r2d2::Pool; type PoolConn = r2d2::PooledConnection; diff --git a/anchor/database/src/operator_operations.rs b/anchor/database/src/operator_operations.rs index 8cda5a8a..8740ca57 100644 --- a/anchor/database/src/operator_operations.rs +++ b/anchor/database/src/operator_operations.rs @@ -1,4 +1,4 @@ -use super::{DatabaseError, NetworkDatabase, SqlStatement, SQL}; +use super::{DatabaseError, NetworkDatabase, SqlStatement, SQL, UNKNOWN_OWN_OPERATOR_ID}; use base64::prelude::*; use rusqlite::params; use ssv_types::{Operator, OperatorId}; @@ -34,7 +34,7 @@ impl NetworkDatabase { // Check to see if this operator is the current operator let own_id = self.state.single_state.id.load(Ordering::Relaxed); - if own_id == (u64::MAX / 2) { + if own_id == *UNKNOWN_OWN_OPERATOR_ID { // If the keys match, this is the current operator so we want to save the id let keys_match = pem_key == self.pubkey.public_key_to_pem().unwrap_or_default(); if keys_match { diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 11feaba0..0c229bc0 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -1,4 +1,7 @@ -use crate::{ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, ShareMultiIndexMap}; +use crate::{ + ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, ShareMultiIndexMap, + UNKNOWN_OWN_OPERATOR_ID, +}; use crate::{DatabaseError, NetworkDatabase, NetworkState, Pool, PoolConn}; use crate::{MultiState, SingleState}; use crate::{SqlStatement, SQL}; @@ -31,13 +34,7 @@ impl NetworkState { // Without an ID, we have no idea who we are. Check to see if an operator with our public key // is stored the database. If it does not exist, that means the operator still has to be registered // with the network contract or that we have not seen the corresponding event yet - let id = if let Ok(Some(operator_id)) = Self::does_self_exist(&conn, pubkey) { - operator_id - } else { - // It does not exist, just default to some impossible operator - // SQL bounded by u32::max - OperatorId(u64::MAX / 2) - }; + let id = Self::does_self_exist(&conn, pubkey)?; // First Phase: Fetch data from the database // 1) OperatorId -> Operator @@ -47,7 +44,7 @@ impl NetworkState { // 3) ClusterId -> Vec let validator_map = Self::fetch_validators(&conn)?; // 4) ClusterId -> Vec - let share_map = Self::fetch_shares(&conn, id)?; + let share_map = id.map(|id| Self::fetch_shares(&conn, id)).transpose()?; // 5) Owner -> Nonce (u16) let nonce_map = Self::fetch_nonces(&conn)?; @@ -56,10 +53,13 @@ impl NetworkState { let metadata_multi: MetadataMultiIndexMap = MultiIndexMap::new(); let cluster_multi: ClusterMultiIndexMap = MultiIndexMap::new(); let single_state = SingleState { - id: AtomicU64::new(*id), + id: AtomicU64::new(*id.unwrap_or(UNKNOWN_OWN_OPERATOR_ID)), last_processed_block: AtomicU64::new(last_processed_block), operators: DashMap::from_iter(operators), - clusters: DashSet::from_iter(cluster_map.keys().copied()), + clusters: share_map + .as_ref() + .map(|m| m.keys().copied().collect()) + .unwrap_or_default(), nonces: DashMap::from_iter(nonce_map), }; @@ -87,15 +87,17 @@ impl NetworkState { ); // Process this validators shares - if let Some(shares) = share_map.get(cluster_id) { - for share in shares { - if share.validator_pubkey == validator.public_key { - shares_multi.insert( - &validator.public_key, - cluster_id, - &cluster.owner, - share.clone(), - ); + if let Some(share_map) = &share_map { + if let Some(shares) = share_map.get(cluster_id) { + for share in shares { + if share.validator_pubkey == validator.public_key { + shares_multi.insert( + &validator.public_key, + cluster_id, + &cluster.owner, + share.clone(), + ); + } } } } @@ -261,7 +263,7 @@ impl NetworkDatabase { /// Get the ID of our Operator if it exists pub fn get_own_id(&self) -> Option { let id = self.state.single_state.id.load(Ordering::Relaxed); - if id == u64::MAX { + if id == *UNKNOWN_OWN_OPERATOR_ID { None } else { Some(OperatorId(id)) @@ -283,6 +285,11 @@ impl NetworkDatabase { self.state.single_state.clusters.contains(id) } + /// Get the clusters we are member of + pub fn get_own_clusters(&self) -> &DashSet { + &self.state.single_state.clusters + } + /// Get the last block that has been fully processed by the database pub fn get_last_processed_block(&self) -> u64 { self.state diff --git a/anchor/eth/execution.rs b/anchor/eth/execution.rs index 0fd78fa1..6ce6a8e4 100644 --- a/anchor/eth/execution.rs +++ b/anchor/eth/execution.rs @@ -28,6 +28,7 @@ async fn main() { ws_url: String::from(ws_endpoint), beacon_url: String::from(beacon_endpoint), network: Network::Mainnet, + historic_finished_notify: None, }; // Setup mock operator data diff --git a/anchor/eth/src/sync.rs b/anchor/eth/src/sync.rs index eb0eef62..60c8b5b5 100644 --- a/anchor/eth/src/sync.rs +++ b/anchor/eth/src/sync.rs @@ -13,6 +13,7 @@ use futures::StreamExt; use rand::Rng; use std::collections::BTreeMap; use std::sync::{Arc, LazyLock}; +use tokio::sync::oneshot::Sender; use tokio::time::Duration; use tracing::{debug, error, info, instrument, warn}; @@ -89,6 +90,7 @@ pub struct Config { pub ws_url: String, pub beacon_url: String, pub network: Network, + pub historic_finished_notify: Option>, } /// Client for interacting with the SSV contract on Ethereum L1 @@ -106,6 +108,8 @@ pub struct SsvEventSyncer { event_processor: EventProcessor, /// The network the node is connected to network: Network, + /// Notify a channel as soon as the historical sync is done + historic_finished_notify: Option>, } impl SsvEventSyncer { @@ -139,6 +143,7 @@ impl SsvEventSyncer { ws_url: config.ws_url, event_processor, network: config.network, + historic_finished_notify: config.historic_finished_notify, }) } @@ -163,6 +168,8 @@ impl SsvEventSyncer { self.historical_sync(contract_address, deployment_block) .await?; + self.historic_finished_notify.take().map(|x| x.send(())); + info!("Starting live sync"); self.live_sync(contract_address).await?; diff --git a/anchor/src/environment.rs b/anchor/src/environment.rs index 5c737dfb..5b7891d8 100644 --- a/anchor/src/environment.rs +++ b/anchor/src/environment.rs @@ -7,8 +7,8 @@ use std::sync::Arc; use task_executor::{ShutdownReason, TaskExecutor}; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use tracing::{error, info, warn}; +use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::EnvFilter; - use { futures::Future, std::{pin::Pin, task::Context, task::Poll}, @@ -40,7 +40,9 @@ impl Default for Environment { /// If a more fine-grained executor is required, a more general function should be built. fn default() -> Self { // Default logging to `debug` for the time being - let env_filter = EnvFilter::new("debug"); + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(); tracing_subscriber::fmt().with_env_filter(env_filter).init(); // Create a multi-threaded task executor diff --git a/anchor/src/main.rs b/anchor/src/main.rs index 6b632ecf..11a497cc 100644 --- a/anchor/src/main.rs +++ b/anchor/src/main.rs @@ -5,7 +5,7 @@ mod environment; use client::{config, Anchor, Client}; use environment::Environment; use task_executor::ShutdownReason; -use types::MainnetEthSpec; +use types::EthSpecId; fn main() { // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided. @@ -37,10 +37,30 @@ fn main() { let anchor_executor = core_executor.clone(); let shutdown_executor = core_executor.clone(); + let eth_spec_id = match config.eth2_network.eth_spec_id() { + Ok(eth_spec_id) => eth_spec_id, + Err(e) => { + error!(e, "Unable to get eth spec id"); + return; + } + }; + // Run the main task core_executor.spawn( async move { - if let Err(e) = Client::run::(anchor_executor, config).await { + let result = match eth_spec_id { + EthSpecId::Mainnet => { + Client::run::(anchor_executor, config).await + } + #[cfg(feature = "spec-minimal")] + EthSpecId::Minimal => { + Client::run::(anchor_executor, config).await + } + other => Err(format!( + "Eth spec `{other}` is not supported by this build of Anchor", + )), + }; + if let Err(e) = result { error!(reason = e, "Failed to start Anchor"); // Ignore the error since it always occurs during normal operation when // shutting down. diff --git a/anchor/validator_store/Cargo.toml b/anchor/validator_store/Cargo.toml index e0ed9732..af78356d 100644 --- a/anchor/validator_store/Cargo.toml +++ b/anchor/validator_store/Cargo.toml @@ -8,8 +8,11 @@ rust-version = "1.81.0" [dependencies] beacon_node_fallback = { workspace = true } dashmap = { workspace = true } +database = { workspace = true } eth2 = { workspace = true } -futures = "0.3.31" +futures = { workspace = true } +hex = { workspace = true } +openssl = { workspace = true } parking_lot = { workspace = true } qbft = { workspace = true } qbft_manager = { workspace = true } diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index f57e1f14..bbc2e253 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -1,7 +1,10 @@ pub mod sync_committee_service; use dashmap::DashMap; +use database::{NetworkDatabase, NonUniqueIndex, UniqueIndex}; use futures::future::join_all; +use openssl::pkey::Private; +use openssl::rsa::{Padding, Rsa}; use parking_lot::Mutex; use qbft::Completed; use qbft_manager::{ @@ -17,8 +20,10 @@ use ssv_types::message::{ DATA_VERSION_ALTAIR, DATA_VERSION_BELLATRIX, DATA_VERSION_CAPELLA, DATA_VERSION_DENEB, DATA_VERSION_PHASE0, DATA_VERSION_UNKNOWN, }; -use ssv_types::{Cluster, OperatorId, ValidatorMetadata}; +use ssv_types::{Cluster, OperatorId, ValidatorIndex, ValidatorMetadata}; +use std::collections::HashSet; use std::fmt::Debug; +use std::str::from_utf8; use std::sync::Arc; use std::time::Duration; use tracing::{error, info, warn}; @@ -56,26 +61,31 @@ use validator_store::{ const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 512; #[derive(Clone)] -struct InitializedCluster { - cluster: Cluster, - validator_metadata: ValidatorMetadata, +struct InitializedValidator { + cluster: Arc, + metadata: ValidatorMetadata, decrypted_key_share: SecretKey, } pub struct AnchorValidatorStore { - clusters: DashMap, + validators: DashMap, + database: Arc, signature_collector: Arc>, qbft_manager: Arc>, slashing_protection: SlashingDatabase, slashing_protection_last_prune: Mutex, + last_validator_update: Mutex, slot_clock: T, spec: Arc, genesis_validators_root: Hash256, operator_id: OperatorId, + private_key: Rsa, } impl AnchorValidatorStore { + #[allow(clippy::too_many_arguments)] pub fn new( + database: Arc, signature_collector: Arc>, qbft_manager: Arc>, slashing_protection: SlashingDatabase, @@ -83,42 +93,141 @@ impl AnchorValidatorStore { spec: Arc, genesis_validators_root: Hash256, operator_id: OperatorId, + private_key: Rsa, ) -> AnchorValidatorStore { Self { - clusters: DashMap::new(), + validators: DashMap::new(), + database, signature_collector, qbft_manager, slashing_protection, slashing_protection_last_prune: Mutex::new(Epoch::new(0)), + last_validator_update: Mutex::new(Slot::new(0)), slot_clock, spec, genesis_validators_root, operator_id, + private_key, } } - pub fn add_cluster( + fn maybe_load_validators(&self) { + let mut last = self.last_validator_update.lock(); + if let Some(now) = self.slot_clock.now() { + if now > *last { + *last = now; + drop(last); + self.load_validators(); + } + } + } + + fn load_validators(&self) { + let mut unseen_validators = self + .validators + .iter() + .map(|v| *v.key()) + .collect::>(); + let db_clusters = self.database.get_own_clusters().iter().collect::>(); + + for (cluster, validator) in db_clusters + .into_iter() + .filter_map(|id| self.database.clusters().get_by(id.key()).map(Arc::new)) + .flat_map(|cluster| { + self.database + .metadata() + .get_all_by(&cluster.cluster_id) + .unwrap_or_default() + .into_iter() + .map(move |metadata| (cluster.clone(), metadata)) + }) + { + let pubkey_bytes = validator.public_key.compress(); + // value was not present: add to store + if !unseen_validators.remove(&pubkey_bytes) { + if let Ok(secret_key) = self.get_share_from_db(&validator, pubkey_bytes) { + let result = self.add_validator(pubkey_bytes, cluster, validator, secret_key); + if let Err(err) = result { + error!(?err, "Unable to initialize validator"); + } + } + } + } + + for validator in unseen_validators { + self.validators.remove(&validator); + info!(%validator, "Validator disabled"); + } + } + + fn get_share_from_db( + &self, + validator: &ValidatorMetadata, + pubkey_bytes: PublicKeyBytes, + ) -> Result { + let share = self + .database + .shares() + .get_by(&validator.public_key) + .ok_or_else(|| warn!(validator = %pubkey_bytes, "Key share not found"))?; + + // the buffer size must be larger than or equal the modulus size + let mut key_hex = [0; 2048 / 8]; + let length = self + .private_key + .private_decrypt(&share.encrypted_private_key, &mut key_hex, Padding::PKCS1) + .map_err(|e| error!(?e, validator = %pubkey_bytes, "Share decryption failed"))?; + + let key_hex = from_utf8(&key_hex[..length]).map_err(|err| { + error!( + ?err, + validator = %pubkey_bytes, + "Share decryption yielded non-utf8 data" + ) + })?; + + let mut secret_key = [0; 32]; + hex::decode_to_slice( + key_hex.strip_prefix("0x").unwrap_or(key_hex), + &mut secret_key, + ) + .map_err(|err| { + error!( + ?err, + validator = %pubkey_bytes, + "Decrypted share is not a hex string of size 64" + ) + })?; + + SecretKey::deserialize(&secret_key) + .map_err(|err| error!(?err, validator = %pubkey_bytes, "Invalid secret key decrypted")) + } + + fn add_validator( &self, - cluster: Cluster, + pubkey_bytes: PublicKeyBytes, + cluster: Arc, validator_metadata: ValidatorMetadata, decrypted_key_share: SecretKey, ) -> Result<(), Error> { - let pubkey_bytes = validator_metadata.public_key.compress(); - self.clusters.insert( + self.validators.insert( pubkey_bytes, - InitializedCluster { + InitializedValidator { cluster, - validator_metadata, + metadata: validator_metadata, decrypted_key_share, }, ); self.slashing_protection .register_validator(pubkey_bytes) - .map_err(Error::Slashable) + .map_err(Error::Slashable)?; + info!(validator = %pubkey_bytes, "Validator enabled"); + Ok(()) } - fn cluster(&self, validator_pubkey: PublicKeyBytes) -> Result { - self.clusters + fn validator(&self, validator_pubkey: PublicKeyBytes) -> Result { + self.maybe_load_validators(); + self.validators .get(&validator_pubkey) .map(|c| c.value().clone()) .ok_or(Error::UnknownPubkey(validator_pubkey)) @@ -135,7 +244,7 @@ impl AnchorValidatorStore { async fn collect_signature( &self, - cluster: InitializedCluster, + cluster: InitializedValidator, signing_root: Hash256, for_slot: Slot, ) -> Result { @@ -180,7 +289,7 @@ impl AnchorValidatorStore { }); } - let cluster = self.cluster(validator_pubkey)?; + let validator = self.validator(validator_pubkey)?; // first, we have to get to consensus let completed = self @@ -196,7 +305,7 @@ impl AnchorValidatorStore { r#type: BEACON_ROLE_PROPOSER, pub_key: validator_pubkey, slot: block.slot().as_usize().into(), - validator_index: cluster.validator_metadata.index, + validator_index: validator.metadata.index, committee_index: 0, committee_length: 0, committees_at_slot: 0, @@ -213,7 +322,7 @@ impl AnchorValidatorStore { }, data_ssz: Box::new(wrapper(block)), }, - &cluster.cluster, + &validator.cluster, ) .await .map_err(SpecificError::from)?; @@ -244,7 +353,11 @@ impl AnchorValidatorStore { let signing_root = block.signing_root(domain_hash); let signature = self - .collect_signature(self.cluster(validator_pubkey)?, signing_root, block.slot()) + .collect_signature( + self.validator(validator_pubkey)?, + signing_root, + block.slot(), + ) .await?; Ok(SignedBeaconBlock::from_block(block, signature)) } @@ -257,18 +370,18 @@ impl AnchorValidatorStore { validator_pubkey: &PublicKeyBytes, ) -> Result { let epoch = slot.epoch(E::slots_per_epoch()); - let cluster = self.cluster(*validator_pubkey)?; + let validator = self.validator(*validator_pubkey)?; let beacon_block_root = vote.block_root; let completed = self .qbft_manager .decide_instance( CommitteeInstanceId { - committee: cluster.cluster.cluster_id, + committee: validator.cluster.cluster_id, instance_height: slot.as_usize().into(), }, vote, - &cluster.cluster, + &validator.cluster, ) .await .map_err(SpecificError::from)?; @@ -279,7 +392,9 @@ impl AnchorValidatorStore { let domain = self.get_domain(epoch, Domain::SyncCommittee); let signing_root = data.block_root.signing_root(domain); - let signature = self.collect_signature(cluster, signing_root, slot).await?; + let signature = self + .collect_signature(validator, signing_root, slot) + .await?; Ok(SyncCommitteeMessage { slot, @@ -301,7 +416,7 @@ impl AnchorValidatorStore { return vec![]; }; let epoch = slot.epoch(E::slots_per_epoch()); - let cluster = match self.cluster(aggregator_pubkey) { + let validator = match self.validator(aggregator_pubkey) { Ok(cluster) => cluster, Err(err) => return error(err), }; @@ -332,7 +447,7 @@ impl AnchorValidatorStore { r#type: BEACON_ROLE_SYNC_COMMITTEE_CONTRIBUTION, pub_key: aggregator_pubkey, slot, - validator_index: cluster.validator_metadata.index, + validator_index: validator.metadata.index, committee_index: 0, committee_length: 0, committees_at_slot: 0, @@ -342,7 +457,7 @@ impl AnchorValidatorStore { version: DATA_VERSION_PHASE0, data_ssz: Box::new(DataSsz::Contributions(data)), }, - &cluster.cluster, + &validator.cluster, ) .await; let data = match completed { @@ -359,7 +474,7 @@ impl AnchorValidatorStore { let signing_futures = data .into_iter() .map(|contribution| { - let cluster = cluster.clone(); + let cluster = validator.clone(); async move { let slot = contribution.contribution.slot; let message = ContributionAndProof { @@ -462,9 +577,13 @@ impl ValidatorStore for AnchorValidatorStore { type E = E; fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option { - self.clusters - .get(pubkey) - .map(|v| v.validator_metadata.index.0 as u64) + self.validator(*pubkey).ok().and_then(|v| { + if v.metadata.index.0 == 0 { + None + } else { + Some(v.metadata.index.0 as u64) + } + }) } fn voting_pubkeys(&self, _filter_func: F) -> I @@ -472,8 +591,9 @@ impl ValidatorStore for AnchorValidatorStore { I: FromIterator, F: Fn(DoppelgangerStatus) -> Option, { + self.maybe_load_validators(); // we don't care about doppelgangers - self.clusters.iter().map(|v| *v.key()).collect() + self.validators.iter().map(|v| *v.key()).collect() } fn doppelganger_protection_allows_signing(&self, _validator_pubkey: PublicKeyBytes) -> bool { @@ -482,18 +602,19 @@ impl ValidatorStore for AnchorValidatorStore { } fn num_voting_validators(&self) -> usize { - self.clusters.len() + self.maybe_load_validators(); + self.validators.len() } fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option { - self.clusters - .get(validator_pubkey) - .map(|v| v.validator_metadata.graffiti) + self.validator(*validator_pubkey) + .ok() + .map(|v| v.metadata.graffiti) } fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
{ - self.clusters - .get(validator_pubkey) + self.validator(*validator_pubkey) + .ok() .map(|v| v.cluster.fee_recipient) } @@ -509,7 +630,7 @@ impl ValidatorStore for AnchorValidatorStore { let domain_hash = self.get_domain(signing_epoch, Domain::Randao); let signing_root = signing_epoch.signing_root(domain_hash); self.collect_signature( - self.cluster(validator_pubkey)?, + self.validator(validator_pubkey)?, signing_root, signing_epoch.end_slot(E::slots_per_epoch()), ) @@ -517,21 +638,14 @@ impl ValidatorStore for AnchorValidatorStore { } fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) { - // we actually have the index already. we use the opportunity to do a sanity check - match self.clusters.get(validator_pubkey) { + self.maybe_load_validators(); + match self.validators.get_mut(validator_pubkey) { None => warn!( validator = validator_pubkey.as_hex_string(), "Trying to set index for unknown validator" ), - Some(v) => { - if v.validator_metadata.index.0 as u64 != index { - error!( - validator = validator_pubkey.as_hex_string(), - expected = v.validator_metadata.index.0, - actual = index, - "Mismatched validator index", - ) - } + Some(mut v) => { + v.metadata.index = ValidatorIndex(index as usize); } } } @@ -592,13 +706,13 @@ impl ValidatorStore for AnchorValidatorStore { }); } - let cluster = self.cluster(validator_pubkey)?; + let validator = self.validator(validator_pubkey)?; let completed = self .qbft_manager .decide_instance( CommitteeInstanceId { - committee: cluster.cluster.cluster_id, + committee: validator.cluster.cluster_id, instance_height: attestation.data().slot.as_usize().into(), }, BeaconVote { @@ -606,7 +720,7 @@ impl ValidatorStore for AnchorValidatorStore { source: attestation.data().source, target: attestation.data().target, }, - &cluster.cluster, + &validator.cluster, ) .await .map_err(SpecificError::from)?; @@ -633,7 +747,7 @@ impl ValidatorStore for AnchorValidatorStore { let signing_root = attestation.data().signing_root(domain_hash); let signature = self - .collect_signature(cluster, signing_root, attestation.data().slot) + .collect_signature(validator, signing_root, attestation.data().slot) .await?; attestation .add_signature(&signature, validator_committee_position) @@ -672,7 +786,7 @@ impl ValidatorStore for AnchorValidatorStore { let signature = self .collect_signature( - self.cluster(validator_registration_data.pubkey)?, + self.validator(validator_registration_data.pubkey)?, signing_root, validity_slot, ) @@ -692,7 +806,7 @@ impl ValidatorStore for AnchorValidatorStore { selection_proof: SelectionProof, ) -> Result, Error> { let signing_epoch = aggregate.data().target.epoch; - let cluster = self.cluster(validator_pubkey)?; + let validator = self.validator(validator_pubkey)?; let message = AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); @@ -711,7 +825,7 @@ impl ValidatorStore for AnchorValidatorStore { r#type: BEACON_ROLE_AGGREGATOR, pub_key: validator_pubkey, slot: message.aggregate().data().slot, - validator_index: cluster.validator_metadata.index, + validator_index: validator.metadata.index, committee_index: message.aggregate().data().index, // todo it seems the below are not needed (anymore?) committee_length: 0, @@ -722,7 +836,7 @@ impl ValidatorStore for AnchorValidatorStore { version: DATA_VERSION_PHASE0, data_ssz: Box::new(DataSsz::AggregateAndProof(message)), }, - &cluster.cluster, + &validator.cluster, ) .await .map_err(SpecificError::from)?; @@ -738,7 +852,7 @@ impl ValidatorStore for AnchorValidatorStore { let domain_hash = self.get_domain(signing_epoch, Domain::AggregateAndProof); let signing_root = message.signing_root(domain_hash); let signature = self - .collect_signature(cluster, signing_root, message.aggregate().get_slot()) + .collect_signature(validator, signing_root, message.aggregate().get_slot()) .await?; Ok(SignedAggregateAndProof::from_aggregate_and_proof( @@ -755,7 +869,7 @@ impl ValidatorStore for AnchorValidatorStore { let domain_hash = self.get_domain(epoch, Domain::SelectionProof); let signing_root = slot.signing_root(domain_hash); - self.collect_signature(self.cluster(validator_pubkey)?, signing_root, slot) + self.collect_signature(self.validator(validator_pubkey)?, signing_root, slot) .await .map(SelectionProof::from) } @@ -774,7 +888,7 @@ impl ValidatorStore for AnchorValidatorStore { } .signing_root(domain_hash); - self.collect_signature(self.cluster(*validator_pubkey)?, signing_root, slot) + self.collect_signature(self.validator(*validator_pubkey)?, signing_root, slot) .await .map(SyncSelectionProof::from) } @@ -866,8 +980,8 @@ impl ValidatorStore for AnchorValidatorStore { } fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option { - self.clusters.get(pubkey).map(|v| ProposalData { - validator_index: Some(v.validator_metadata.index.0 as u64), + self.validator(*pubkey).ok().map(|v| ProposalData { + validator_index: Some(v.metadata.index.0 as u64), fee_recipient: Some(v.cluster.fee_recipient), gas_limit: 29_999_998, // TODO support scalooors builder_proposals: false, // TODO support MEVooors