From b0c231a03f8cbb193b326dcf6a3175e25cb7a2ea Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Thu, 3 Oct 2024 18:04:23 +0100 Subject: [PATCH 1/3] Avoid admin roles in local cluster runner For the time being we only want to run admin on the metadata node, which will also be allowed to bootstrap. --- .../examples/two_nodes_and_metadata.rs | 2 +- crates/local-cluster-runner/src/node/mod.rs | 10 +++++++--- server/tests/cluster.rs | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/local-cluster-runner/examples/two_nodes_and_metadata.rs b/crates/local-cluster-runner/examples/two_nodes_and_metadata.rs index 02bb8ac1a..f5d462898 100644 --- a/crates/local-cluster-runner/examples/two_nodes_and_metadata.rs +++ b/crates/local-cluster-runner/examples/two_nodes_and_metadata.rs @@ -25,7 +25,7 @@ async fn main() { let nodes = Node::new_test_nodes_with_metadata( base_config, BinarySource::CargoTest, - enum_set!(Role::Admin | Role::Worker), + enum_set!(Role::Worker), 2, ); diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 03e4ff333..c0fe6c7b5 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -137,7 +137,7 @@ impl Node { } // Creates a group of Nodes with a single metadata node "metadata-node", and a given number - // of other nodes ["node-1", ..] each with the provided roles. Node name, roles, + // of other nodes ["node-1", ..] each with the provided roles. Node name, roles, // bind/advertise addresses, and the metadata address from the base_config will all be overwritten. pub fn new_test_nodes_with_metadata( base_config: Configuration, @@ -148,18 +148,22 @@ impl Node { let mut nodes = Vec::with_capacity((size + 1) as usize); { + let mut base_config = base_config.clone(); + base_config.common.allow_bootstrap = true; nodes.push(Self::new_test_node( "metadata-node", - base_config.clone(), + base_config, binary_source.clone(), enum_set!(Role::Admin | Role::MetadataStore), )); } for node in 1..=size { + let mut base_config = base_config.clone(); + base_config.common.allow_bootstrap = false; nodes.push(Self::new_test_node( format!("node-{node}"), - base_config.clone(), + base_config, binary_source.clone(), roles, )); diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index d82a1e77a..4b806de61 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -16,7 +16,7 @@ async fn node_id_mismatch() { let nodes = Node::new_test_nodes_with_metadata( base_config.clone(), BinarySource::CargoTest, - enum_set!(Role::Admin | Role::Worker), + enum_set!(Role::Worker), 1, ); @@ -64,7 +64,7 @@ async fn cluster_name_mismatch() { let nodes = Node::new_test_nodes_with_metadata( base_config.clone(), BinarySource::CargoTest, - enum_set!(Role::Admin | Role::Worker), + enum_set!(Role::Worker), 1, ); From 1dcf71202ba9079071f64e0081bc1ff5ee9c8730 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 4 Oct 2024 10:34:02 +0100 Subject: [PATCH 2/3] Check admins are ready before bringing up more nodes --- crates/local-cluster-runner/src/cluster/mod.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/local-cluster-runner/src/cluster/mod.rs b/crates/local-cluster-runner/src/cluster/mod.rs index d37ff7886..0a6955a32 100644 --- a/crates/local-cluster-runner/src/cluster/mod.rs +++ b/crates/local-cluster-runner/src/cluster/mod.rs @@ -52,6 +52,8 @@ fn default_cluster_name() -> String { pub enum ClusterStartError { #[error("Failed to start node {0}: {1}")] NodeStartError(usize, NodeStartError), + #[error("Admin node is not healthy after waiting 60 seconds")] + AdminUnhealthy, #[error("Failed to create cluster base directory: {0}")] CreateDirectory(io::Error), #[error("Failed to create metadata client: {0}")] @@ -86,11 +88,17 @@ impl Cluster { ); for (i, node) in nodes.into_iter().enumerate() { - started_nodes.push( - node.start_clustered(base_dir.as_path(), &cluster_name) - .await - .map_err(|err| ClusterStartError::NodeStartError(i, err))?, - ) + let node = node + .start_clustered(base_dir.as_path(), &cluster_name) + .await + .map_err(|err| ClusterStartError::NodeStartError(i, err))?; + if node.admin_address().is_some() { + // admin nodes are needed for later nodes to bootstrap. we should wait until they are serving + if !node.wait_admin_healthy(Duration::from_secs(30)).await { + return Err(ClusterStartError::AdminUnhealthy); + } + } + started_nodes.push(node) } Ok(StartedCluster { From c9ec4f07cd0fd44366640107ca5e201a765e085b Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Mon, 7 Oct 2024 09:40:15 +0100 Subject: [PATCH 3/3] Fix doc comments --- .../local-cluster-runner/src/cluster/mod.rs | 24 ++++----- crates/local-cluster-runner/src/node/mod.rs | 52 +++++++++---------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/crates/local-cluster-runner/src/cluster/mod.rs b/crates/local-cluster-runner/src/cluster/mod.rs index 0a6955a32..2262e973f 100644 --- a/crates/local-cluster-runner/src/cluster/mod.rs +++ b/crates/local-cluster-runner/src/cluster/mod.rs @@ -26,9 +26,9 @@ pub struct Cluster { } impl ClusterBuilder<(C, N, ())> { - // Use a tempdir as the basedir; this will be removed on Cluster/StartedCluster drop. - // You may set LOCAL_CLUSTER_RUNNER_RETAIN_TEMPDIR=true to instead log it out and retain - // it. + /// Use a tempdir as the basedir; this will be removed on Cluster/StartedCluster drop. + /// You may set LOCAL_CLUSTER_RUNNER_RETAIN_TEMPDIR=true to instead log it out and retain + /// it. pub fn temp_base_dir(self) -> ClusterBuilder<(C, N, (MaybeTempDir,))> { let maybe_temp_dir = tempfile::tempdir().expect("to create a tempdir").into(); let base_dir = (maybe_temp_dir,); @@ -124,14 +124,14 @@ impl StartedCluster { &self.cluster_name } - // Send a SIGKILL to every node in the cluster + /// Send a SIGKILL to every node in the cluster pub async fn kill(&mut self) -> io::Result<()> { future::try_join_all(self.nodes.iter_mut().map(|n| n.kill())) .await .map(drop) } - // Send a SIGTERM to every node in the cluster + /// Send a SIGTERM to every node in the cluster pub fn terminate(&self) -> io::Result<()> { for node in &self.nodes { node.terminate()? @@ -139,16 +139,16 @@ impl StartedCluster { Ok(()) } - // Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit, - // otherwise send a SIGKILL to nodes that are still running. + /// Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit, + /// otherwise send a SIGKILL to nodes that are still running. pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<()> { future::try_join_all(self.nodes.iter_mut().map(|n| n.graceful_shutdown(dur))) .await .map(drop) } - // For every node in the cluster with an admin role, wait for up to dur for the admin endpoint - // to respond to health checks, otherwise return false. + /// For every node in the cluster with an admin role, wait for up to dur for the admin endpoint + /// to respond to health checks, otherwise return false. pub async fn wait_admins_healthy(&self, dur: Duration) -> bool { future::join_all( self.nodes @@ -161,8 +161,8 @@ impl StartedCluster { .all(|b| b) } - // For every node in the cluster with an ingress role, wait for up to dur for the admin endpoint - // to respond to health checks, otherwise return false. + /// For every node in the cluster with an ingress role, wait for up to dur for the admin endpoint + /// to respond to health checks, otherwise return false. pub async fn wait_ingresses_healthy(&self, dur: Duration) -> bool { future::join_all( self.nodes @@ -175,7 +175,7 @@ impl StartedCluster { .all(|b| b) } - // Wait for all ingress and admin endpoints in the cluster to be healthy + /// Wait for all ingress and admin endpoints in the cluster to be healthy pub async fn wait_healthy(&self, dur: Duration) -> bool { future::join( self.wait_admins_healthy(dur), diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index c0fe6c7b5..296ca4078 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -172,10 +172,10 @@ impl Node { nodes } - // Start this Node, providing the base_dir and the cluster_name of the cluster its - // expected to attach to. All relative file paths addresses specified in the node config - // (eg, nodename/node.sock) will be absolutized against the base path, and the base dir - // and cluster name present in config will be overwritten. + /// Start this Node, providing the base_dir and the cluster_name of the cluster its + /// expected to attach to. All relative file paths addresses specified in the node config + /// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir + /// and cluster name present in config will be overwritten. pub async fn start_clustered( mut self, base_dir: impl Into, @@ -210,8 +210,8 @@ impl Node { self.start().await } - // Start this node with the current config. A subprocess will be created, and a tokio task - // spawned to process output logs and watch for exit. + /// Start this node with the current config. A subprocess will be created, and a tokio task + /// spawned to process output logs and watch for exit. pub async fn start(self) -> Result { let Self { base_config, @@ -354,13 +354,13 @@ impl Node { pub enum BinarySource { Path(OsString), EnvVar(String), - // Suitable when called from a `cargo run` command, except examples. - // This will attempt to find a `restate-server` binary in the same directory - // as the current binary + /// Suitable when called from a `cargo run` command, except examples. + /// This will attempt to find a `restate-server` binary in the same directory + /// as the current binary CargoRun, - // Suitable when called from a `cargo test` or `cargo run --example` command; - // this will attempt to find a `restate-server` binary in the parent directory of - // the current binary. + /// Suitable when called from a `cargo test` or `cargo run --example` command; + /// this will attempt to find a `restate-server` binary in the parent directory of + /// the current binary. CargoTest, } @@ -461,7 +461,7 @@ impl Future for StartedNodeStatus { } impl StartedNode { - // Send a SIGKILL to the current process, if it is running, and await for its exit + /// Send a SIGKILL to the current process, if it is running, and await for its exit pub async fn kill(&mut self) -> io::Result { match self.status { StartedNodeStatus::Exited(status) => Ok(status), @@ -483,7 +483,7 @@ impl StartedNode { } } - // Send a SIGTERM to the current process, if it is running + /// Send a SIGTERM to the current process, if it is running pub fn terminate(&self) -> io::Result<()> { match self.status { StartedNodeStatus::Exited(_) => Ok(()), @@ -503,7 +503,7 @@ impl StartedNode { } } - // Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL + /// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result { match self.status { StartedNodeStatus::Exited(status) => Ok(status), @@ -530,7 +530,7 @@ impl StartedNode { } } - // Get the pid of the subprocess. Returns none after it has exited. + /// Get the pid of the subprocess. Returns none after it has exited. pub fn pid(&self) -> Option { match self.status { StartedNodeStatus::Exited { .. } | StartedNodeStatus::Failed { .. } => None, @@ -538,7 +538,7 @@ impl StartedNode { } } - // Wait for the node to exit and report its exist status + /// Wait for the node to exit and report its exist status pub async fn status(&mut self) -> io::Result { (&mut self.status).await } @@ -579,8 +579,8 @@ impl StartedNode { } } - // Obtain a stream of loglines matching this pattern. The stream will end - // when the stdout and stderr files on the process close. + /// Obtain a stream of loglines matching this pattern. The stream will end + /// when the stdout and stderr files on the process close. pub fn lines(&self, pattern: Regex) -> impl Stream + '_ { match self.status { StartedNodeStatus::Exited { .. } => futures::stream::empty().left_stream(), @@ -592,7 +592,7 @@ impl StartedNode { } } - // Obtain a metadata client based on this nodes client config. + /// Obtain a metadata client based on this nodes client config. pub async fn metadata_client( &self, ) -> Result { @@ -602,7 +602,7 @@ impl StartedNode { .await } - // Check to see if the admin address is healthy. Returns false if this node has no admin role. + /// Check to see if the admin address is healthy. Returns false if this node has no admin role. pub async fn admin_healthy(&self) -> bool { if let Some(address) = self.admin_address() { match reqwest::get(format!("http://{address}/health")).await { @@ -614,8 +614,8 @@ impl StartedNode { } } - // Check every 250ms to see if the admin address is healthy, waiting for up to `timeout`. - // Returns false if this node has no admin role. + /// Check every 250ms to see if the admin address is healthy, waiting for up to `timeout`. + /// Returns false if this node has no admin role. pub async fn wait_admin_healthy(&self, timeout: Duration) -> bool { let mut attempts = 1; if tokio::time::timeout(timeout, async { @@ -641,7 +641,7 @@ impl StartedNode { } } - // Check to see if the ingress address is healthy. Returns false if this node has no ingress role. + /// Check to see if the ingress address is healthy. Returns false if this node has no ingress role. pub async fn ingress_healthy(&self) -> bool { if let Some(address) = self.ingress_address() { match reqwest::get(format!("http://{address}/restate/health")).await { @@ -653,8 +653,8 @@ impl StartedNode { } } - // Check every 250ms to see if the ingress address is healthy, waiting for up to `timeout`. - // Returns false if this node has no ingress role. + /// Check every 250ms to see if the ingress address is healthy, waiting for up to `timeout`. + /// Returns false if this node has no ingress role. pub async fn wait_ingress_healthy(&self, timeout: Duration) -> bool { let mut attempts = 1; if tokio::time::timeout(timeout, async {