From 092e256f50798d6d921ca4071fa87a88b407773e Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 24 Dec 2024 14:19:08 +0100 Subject: [PATCH] Replace nested kind with Status variant and boolean --- Cargo.lock | 1 + crates/core/protobuf/node_ctl_svc.proto | 15 ++---- crates/core/src/protobuf.rs | 13 ++--- crates/local-cluster-runner/Cargo.toml | 1 + crates/local-cluster-runner/src/node/mod.rs | 29 ++++++----- crates/node/src/lib.rs | 24 ++++----- .../src/network_server/grpc_svc_handler.rs | 27 +++++----- .../src/commands/cluster/provision.rs | 51 +++++++------------ 8 files changed, 65 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df241d1aad..457963807f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6473,6 +6473,7 @@ dependencies = [ "thiserror 2.0.6", "tokio", "toml", + "tonic", "tracing", "tracing-subscriber", "typed-builder", diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index fa45f4047b..bcaa8dbcb5 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -28,22 +28,17 @@ service NodeCtlSvc { message ProvisionClusterRequest { bool dry_run = 1; + // if unset then the configured cluster num partitions will be used optional uint32 num_partitions = 2; + // if unset then the configured cluster placement strategy will be used optional restate.cluster.ReplicationStrategy placement_strategy = 3; + // if unset then the configured cluster default log provider will be used optional restate.cluster.DefaultProvider log_provider = 4; } -enum ProvisionClusterResponseKind { - ProvisionClusterResponseType_UNKNOWN = 0; - DRY_RUN = 1; - NEWLY_PROVISIONED = 2; - ALREADY_PROVISIONED = 3; -} - message ProvisionClusterResponse { - ProvisionClusterResponseKind kind = 1; - // This field will be empty if the cluster is already provisioned - optional restate.cluster.ClusterConfiguration cluster_configuration = 3; + bool dry_run = 1; + restate.cluster.ClusterConfiguration cluster_configuration = 2; } message IdentResponse { diff --git a/crates/core/src/protobuf.rs b/crates/core/src/protobuf.rs index 31a4737751..cd47f7bffc 100644 --- a/crates/core/src/protobuf.rs +++ b/crates/core/src/protobuf.rs @@ -19,23 +19,16 @@ pub mod node_ctl_svc { impl ProvisionClusterResponse { pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::DryRun.into(), + dry_run: true, cluster_configuration: Some(cluster_configuration), } } - pub fn newly_provisioned(cluster_configuration: ClusterConfiguration) -> Self { + pub fn provisioned(cluster_configuration: ClusterConfiguration) -> Self { ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::NewlyProvisioned.into(), + dry_run: false, cluster_configuration: Some(cluster_configuration), } } - - pub fn already_provisioned() -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::AlreadyProvisioned.into(), - ..Default::default() - } - } } } diff --git a/crates/local-cluster-runner/Cargo.toml b/crates/local-cluster-runner/Cargo.toml index 708524c78f..c1db4e4c59 100644 --- a/crates/local-cluster-runner/Cargo.toml +++ b/crates/local-cluster-runner/Cargo.toml @@ -34,6 +34,7 @@ serde = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["process", "fs"] } +tonic = { workspace = true } toml = "0.8" tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 605e411e9e..fc996c2524 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -16,9 +16,7 @@ use itertools::Itertools; use regex::{Regex, RegexSet}; use restate_core::network::net_util::create_tonic_channel_from_advertised_address; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::{ - ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponseKind, -}; +use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest as ProtoProvisionClusterRequest; use restate_types::logs::metadata::DefaultProvider; use restate_types::partition_table::ReplicationStrategy; use restate_types::retries::RetryPolicy; @@ -54,6 +52,7 @@ use tokio::{ task::JoinHandle, }; use tokio::{process::Command, sync::mpsc::Sender}; +use tonic::Code; use tracing::{error, info, warn}; use typed_builder::TypedBuilder; @@ -789,17 +788,23 @@ impl StartedNode { let request = request.clone(); async move { client.provision_cluster(request).await } }) - .await? - .into_inner(); + .await; - Ok(match response.kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown cluster response type") + match response { + Ok(response) => { + let response = response.into_inner(); + + assert!(!response.dry_run, "provision command was run w/o dry run"); + Ok(true) } - ProvisionClusterResponseKind::DryRun => unreachable!("request non dry run"), - ProvisionClusterResponseKind::NewlyProvisioned => true, - ProvisionClusterResponseKind::AlreadyProvisioned => false, - }) + Err(status) => { + if status.code() == Code::AlreadyExists { + Ok(false) + } else { + Err(status.into()) + } + } + } } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index e13ca3bb91..3a014e7c88 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -30,7 +30,7 @@ use restate_core::network::{ }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; +use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_core::{cancellation_watcher, Metadata, TaskKind}; use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] @@ -387,22 +387,16 @@ impl Node { .await; match response { - Ok(response) => match response.into_inner().kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown cluster response type") - } - ProvisionClusterResponseKind::DryRun => { - unreachable!("call w/o dry run") - } - ProvisionClusterResponseKind::NewlyProvisioned => { - debug!("Successfully auto provisioned the cluster") - } - ProvisionClusterResponseKind::AlreadyProvisioned => { + Ok(response) => { + let response = response.into_inner(); + debug_assert!(!response.dry_run, "Provision w/o dry run"); + } + Err(err) => { + if err.code() == Code::AlreadyExists { debug!("The cluster is already provisioned.") + } else { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); } - }, - Err(err) => { - warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); } } diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 1fd84d2db3..28d5cf3d8c 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -22,7 +22,7 @@ use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::protobuf::node_ctl_svc::{ GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest, - ProvisionClusterResponse, ProvisionClusterResponseKind, + ProvisionClusterResponse, }; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; @@ -39,7 +39,6 @@ use restate_types::{GenerationalNodeId, Version, Versioned}; use std::num::NonZeroU16; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; -use tracing::warn; pub struct NodeCtlSvcHandler { task_center: task_center::Handle, @@ -276,10 +275,9 @@ impl NodeCtlSvc for NodeCtlSvcHandler { .map_err(|err| Status::invalid_argument(err.to_string()))?; if dry_run { - return Ok(Response::new(ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::DryRun.into(), - cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)), - })); + return Ok(Response::new(ProvisionClusterResponse::dry_run( + ProtoClusterConfiguration::from(cluster_configuration), + ))); } let newly_provisioned = self @@ -287,16 +285,15 @@ impl NodeCtlSvc for NodeCtlSvcHandler { .await .map_err(|err| Status::internal(err.to_string()))?; - let kind = if newly_provisioned { - ProvisionClusterResponseKind::NewlyProvisioned - } else { - ProvisionClusterResponseKind::AlreadyProvisioned - }; + if !newly_provisioned { + return Err(Status::already_exists( + "The cluster has already been provisioned", + )); + } - Ok(Response::new(ProvisionClusterResponse { - kind: kind.into(), - cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)), - })) + Ok(Response::new(ProvisionClusterResponse::provisioned( + ProtoClusterConfiguration::from(cluster_configuration), + ))) } } diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs index 1df2cac1f6..9b2ef2f168 100644 --- a/tools/restatectl/src/commands/cluster/provision.rs +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -17,7 +17,7 @@ use cling::{Collect, Run}; use restate_cli_util::ui::console::confirm_or_exit; use restate_cli_util::{c_error, c_println, c_warn}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; +use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_types::logs::metadata::{ DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, }; @@ -26,6 +26,7 @@ use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; use std::num::NonZeroU16; use tonic::codec::CompressionEncoding; +use tonic::Code; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "cluster_provision")] @@ -91,21 +92,10 @@ async fn cluster_provision( } }; - let cluster_configuration_to_provision = match response.kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown cluster response type") - } - ProvisionClusterResponseKind::DryRun => response - .cluster_configuration - .expect("dry run response needs to carry a cluster configuration"), - ProvisionClusterResponseKind::NewlyProvisioned => { - unreachable!("provisioning a cluster with dry run should not have an effect") - } - ProvisionClusterResponseKind::AlreadyProvisioned => { - c_println!("🏃The cluster has already been provisioned."); - return Ok(()); - } - }; + debug_assert!(response.dry_run, "Provision with dry run"); + let cluster_configuration_to_provision = response + .cluster_configuration + .expect("Provision response should carry a cluster configuration"); c_println!( "{}", @@ -134,28 +124,21 @@ async fn cluster_provision( log_provider: cluster_configuration_to_provision.default_provider, }; - let response = match client.provision_cluster(request).await { - Ok(response) => response.into_inner(), - Err(err) => { - c_error!("Failed to provision cluster: {}", err.message()); - return Ok(()); - } - }; + match client.provision_cluster(request).await { + Ok(response) => { + let response = response.into_inner(); + debug_assert!(!response.dry_run, "Provision w/o dry run"); - match response.kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown provision cluster response kind") - } - ProvisionClusterResponseKind::DryRun => { - unreachable!("provisioning a cluster w/o dry run should have an effect") - } - ProvisionClusterResponseKind::NewlyProvisioned => { c_println!("✅ Cluster has been successfully provisioned."); } - ProvisionClusterResponseKind::AlreadyProvisioned => { - c_println!("🤷 Cluster has been provisioned by somebody else."); + Err(err) => { + if err.code() == Code::AlreadyExists { + c_println!("🤷 Cluster has been provisioned by somebody else."); + } else { + c_error!("Failed to provision cluster: {}", err.message()); + } } - } + }; Ok(()) }