Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid admin roles in local cluster runner #2026

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() {
let nodes = Node::new_test_nodes_with_metadata(
base_config,
BinarySource::CargoTest,
enum_set!(Role::Admin | Role::Worker),
enum_set!(Role::Worker),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should have something in the cluster builder to specify which node is admin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently my thinking is that its easier to have one singleton node that has all the singleton roles (metadata, admin), and then the other N nodes can all be more similar. its always possible to specify nodes in whatever setup you like, but the goal of new_test_nodes_with_metadata is to create a list of nodes with some sensibl defaults

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, but i can see now that non-bootstrap nodes will fail to start if there is no nodes config yet, which is a slightly unpleasant race condition. i wonder if indeed the cluster construct needs to know about who is admin and make sure its started and healthy before moving on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a slightly longer timeout help in preventing this situation? I could see someone who is deploying Restate might run into the same situation that some nodes start a bit earlier than others creating a race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only if your environment can restart the process. currently the non bootstrap nodes will shut down if they reach the metadata service and find no nodes config. this is fine with systemd or kubernetes which will restart on failure. but my local cluster runner does not do this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could not immediately fail but wait a bit in order to mitigate the race condition at start-up. Alternatively, all nodes could be started with the bootstrap option, assuming they have identical configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my current thinking for the runner is to wait for admins to be ready on port 9070 before progressing to other nodes. but i agree, it would be good if non admin nodes would wait a bit instead of bailing

2,
);

Expand Down
42 changes: 25 additions & 17 deletions crates/local-cluster-runner/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub struct Cluster {
}

impl<C, N> ClusterBuilder<(C, N, ())> {
// Use a tempdir as the basedir; this will be removed on Cluster/StartedCluster drop.
// You may set LOCAL_CLUSTER_RUNNER_RETAIN_TEMPDIR=true to instead log it out and retain
// it.
/// Use a tempdir as the basedir; this will be removed on Cluster/StartedCluster drop.
/// You may set LOCAL_CLUSTER_RUNNER_RETAIN_TEMPDIR=true to instead log it out and retain
/// it.
pub fn temp_base_dir(self) -> ClusterBuilder<(C, N, (MaybeTempDir,))> {
let maybe_temp_dir = tempfile::tempdir().expect("to create a tempdir").into();
let base_dir = (maybe_temp_dir,);
Expand All @@ -52,6 +52,8 @@ fn default_cluster_name() -> String {
pub enum ClusterStartError {
#[error("Failed to start node {0}: {1}")]
NodeStartError(usize, NodeStartError),
#[error("Admin node is not healthy after waiting 60 seconds")]
AdminUnhealthy,
#[error("Failed to create cluster base directory: {0}")]
CreateDirectory(io::Error),
#[error("Failed to create metadata client: {0}")]
Expand Down Expand Up @@ -86,11 +88,17 @@ impl Cluster {
);

for (i, node) in nodes.into_iter().enumerate() {
started_nodes.push(
node.start_clustered(base_dir.as_path(), &cluster_name)
.await
.map_err(|err| ClusterStartError::NodeStartError(i, err))?,
)
let node = node
.start_clustered(base_dir.as_path(), &cluster_name)
.await
.map_err(|err| ClusterStartError::NodeStartError(i, err))?;
if node.admin_address().is_some() {
// admin nodes are needed for later nodes to bootstrap. we should wait until they are serving
if !node.wait_admin_healthy(Duration::from_secs(30)).await {
return Err(ClusterStartError::AdminUnhealthy);
}
}
started_nodes.push(node)
}

Ok(StartedCluster {
Expand All @@ -116,31 +124,31 @@ impl StartedCluster {
&self.cluster_name
}

// Send a SIGKILL to every node in the cluster
/// Send a SIGKILL to every node in the cluster
pub async fn kill(&mut self) -> io::Result<()> {
future::try_join_all(self.nodes.iter_mut().map(|n| n.kill()))
.await
.map(drop)
}

// Send a SIGTERM to every node in the cluster
/// Send a SIGTERM to every node in the cluster
pub fn terminate(&self) -> io::Result<()> {
for node in &self.nodes {
node.terminate()?
}
Ok(())
}

// Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit,
// otherwise send a SIGKILL to nodes that are still running.
/// Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit,
/// otherwise send a SIGKILL to nodes that are still running.
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<()> {
future::try_join_all(self.nodes.iter_mut().map(|n| n.graceful_shutdown(dur)))
.await
.map(drop)
}

// For every node in the cluster with an admin role, wait for up to dur for the admin endpoint
// to respond to health checks, otherwise return false.
/// For every node in the cluster with an admin role, wait for up to dur for the admin endpoint
/// to respond to health checks, otherwise return false.
pub async fn wait_admins_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
Expand All @@ -153,8 +161,8 @@ impl StartedCluster {
.all(|b| b)
}

// For every node in the cluster with an ingress role, wait for up to dur for the admin endpoint
// to respond to health checks, otherwise return false.
/// For every node in the cluster with an ingress role, wait for up to dur for the admin endpoint
/// to respond to health checks, otherwise return false.
pub async fn wait_ingresses_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
Expand All @@ -167,7 +175,7 @@ impl StartedCluster {
.all(|b| b)
}

// Wait for all ingress and admin endpoints in the cluster to be healthy
/// Wait for all ingress and admin endpoints in the cluster to be healthy
pub async fn wait_healthy(&self, dur: Duration) -> bool {
future::join(
self.wait_admins_healthy(dur),
Expand Down
62 changes: 33 additions & 29 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Node {
}

// Creates a group of Nodes with a single metadata node "metadata-node", and a given number
// of other nodes ["node-1", ..] each with the provided roles. Node name, roles,
// of other nodes ["node-1", ..] each with the provided roles. Node name, roles,
// bind/advertise addresses, and the metadata address from the base_config will all be overwritten.
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved
pub fn new_test_nodes_with_metadata(
base_config: Configuration,
Expand All @@ -148,18 +148,22 @@ impl Node {
let mut nodes = Vec::with_capacity((size + 1) as usize);

{
let mut base_config = base_config.clone();
base_config.common.allow_bootstrap = true;
nodes.push(Self::new_test_node(
"metadata-node",
base_config.clone(),
base_config,
binary_source.clone(),
enum_set!(Role::Admin | Role::MetadataStore),
));
}

for node in 1..=size {
let mut base_config = base_config.clone();
base_config.common.allow_bootstrap = false;
nodes.push(Self::new_test_node(
format!("node-{node}"),
base_config.clone(),
base_config,
binary_source.clone(),
roles,
));
Expand All @@ -168,10 +172,10 @@ impl Node {
nodes
}

// Start this Node, providing the base_dir and the cluster_name of the cluster its
// expected to attach to. All relative file paths addresses specified in the node config
// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir
// and cluster name present in config will be overwritten.
/// Start this Node, providing the base_dir and the cluster_name of the cluster its
/// expected to attach to. All relative file paths addresses specified in the node config
/// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir
/// and cluster name present in config will be overwritten.
pub async fn start_clustered(
mut self,
base_dir: impl Into<PathBuf>,
Expand Down Expand Up @@ -206,8 +210,8 @@ impl Node {
self.start().await
}

// Start this node with the current config. A subprocess will be created, and a tokio task
// spawned to process output logs and watch for exit.
/// Start this node with the current config. A subprocess will be created, and a tokio task
/// spawned to process output logs and watch for exit.
pub async fn start(self) -> Result<StartedNode, NodeStartError> {
let Self {
base_config,
Expand Down Expand Up @@ -350,13 +354,13 @@ impl Node {
pub enum BinarySource {
Path(OsString),
EnvVar(String),
// Suitable when called from a `cargo run` command, except examples.
// This will attempt to find a `restate-server` binary in the same directory
// as the current binary
/// Suitable when called from a `cargo run` command, except examples.
/// This will attempt to find a `restate-server` binary in the same directory
/// as the current binary
CargoRun,
// Suitable when called from a `cargo test` or `cargo run --example` command;
// this will attempt to find a `restate-server` binary in the parent directory of
// the current binary.
/// Suitable when called from a `cargo test` or `cargo run --example` command;
/// this will attempt to find a `restate-server` binary in the parent directory of
/// the current binary.
CargoTest,
}

Expand Down Expand Up @@ -457,7 +461,7 @@ impl Future for StartedNodeStatus {
}

impl StartedNode {
// Send a SIGKILL to the current process, if it is running, and await for its exit
/// Send a SIGKILL to the current process, if it is running, and await for its exit
pub async fn kill(&mut self) -> io::Result<ExitStatus> {
match self.status {
StartedNodeStatus::Exited(status) => Ok(status),
Expand All @@ -479,7 +483,7 @@ impl StartedNode {
}
}

// Send a SIGTERM to the current process, if it is running
/// Send a SIGTERM to the current process, if it is running
pub fn terminate(&self) -> io::Result<()> {
match self.status {
StartedNodeStatus::Exited(_) => Ok(()),
Expand All @@ -499,7 +503,7 @@ impl StartedNode {
}
}

// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL
/// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<ExitStatus> {
match self.status {
StartedNodeStatus::Exited(status) => Ok(status),
Expand All @@ -526,15 +530,15 @@ impl StartedNode {
}
}

// Get the pid of the subprocess. Returns none after it has exited.
/// Get the pid of the subprocess. Returns none after it has exited.
pub fn pid(&self) -> Option<u32> {
match self.status {
StartedNodeStatus::Exited { .. } | StartedNodeStatus::Failed { .. } => None,
StartedNodeStatus::Running { pid, .. } => Some(pid),
}
}

// Wait for the node to exit and report its exist status
/// Wait for the node to exit and report its exist status
pub async fn status(&mut self) -> io::Result<ExitStatus> {
(&mut self.status).await
}
Expand Down Expand Up @@ -575,8 +579,8 @@ impl StartedNode {
}
}

// Obtain a stream of loglines matching this pattern. The stream will end
// when the stdout and stderr files on the process close.
/// Obtain a stream of loglines matching this pattern. The stream will end
/// when the stdout and stderr files on the process close.
pub fn lines(&self, pattern: Regex) -> impl Stream<Item = String> + '_ {
match self.status {
StartedNodeStatus::Exited { .. } => futures::stream::empty().left_stream(),
Expand All @@ -588,7 +592,7 @@ impl StartedNode {
}
}

// Obtain a metadata client based on this nodes client config.
/// Obtain a metadata client based on this nodes client config.
pub async fn metadata_client(
&self,
) -> Result<restate_metadata_store::MetadataStoreClient, GenericError> {
Expand All @@ -598,7 +602,7 @@ impl StartedNode {
.await
}

// Check to see if the admin address is healthy. Returns false if this node has no admin role.
/// Check to see if the admin address is healthy. Returns false if this node has no admin role.
pub async fn admin_healthy(&self) -> bool {
if let Some(address) = self.admin_address() {
match reqwest::get(format!("http://{address}/health")).await {
Expand All @@ -610,8 +614,8 @@ impl StartedNode {
}
}

// Check every 250ms to see if the admin address is healthy, waiting for up to `timeout`.
// Returns false if this node has no admin role.
/// Check every 250ms to see if the admin address is healthy, waiting for up to `timeout`.
/// Returns false if this node has no admin role.
pub async fn wait_admin_healthy(&self, timeout: Duration) -> bool {
let mut attempts = 1;
if tokio::time::timeout(timeout, async {
Expand All @@ -637,7 +641,7 @@ impl StartedNode {
}
}

// Check to see if the ingress address is healthy. Returns false if this node has no ingress role.
/// Check to see if the ingress address is healthy. Returns false if this node has no ingress role.
pub async fn ingress_healthy(&self) -> bool {
if let Some(address) = self.ingress_address() {
match reqwest::get(format!("http://{address}/restate/health")).await {
Expand All @@ -649,8 +653,8 @@ impl StartedNode {
}
}

// Check every 250ms to see if the ingress address is healthy, waiting for up to `timeout`.
// Returns false if this node has no ingress role.
/// Check every 250ms to see if the ingress address is healthy, waiting for up to `timeout`.
/// Returns false if this node has no ingress role.
pub async fn wait_ingress_healthy(&self, timeout: Duration) -> bool {
let mut attempts = 1;
if tokio::time::timeout(timeout, async {
Expand Down
4 changes: 2 additions & 2 deletions server/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn node_id_mismatch() {
let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Admin | Role::Worker),
enum_set!(Role::Worker),
1,
);

Expand Down Expand Up @@ -64,7 +64,7 @@ async fn cluster_name_mismatch() {
let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Admin | Role::Worker),
enum_set!(Role::Worker),
1,
);

Expand Down
Loading