Skip to content

Commit

Permalink
Replace nested kind with Status variant and boolean
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Dec 24, 2024
1 parent 7a738aa commit 092e256
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 96 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,17 @@ service NodeCtlSvc {

message ProvisionClusterRequest {
bool dry_run = 1;
// if unset then the configured cluster num partitions will be used
optional uint32 num_partitions = 2;
// if unset then the configured cluster placement strategy will be used
optional restate.cluster.ReplicationStrategy placement_strategy = 3;
// if unset then the configured cluster default log provider will be used
optional restate.cluster.DefaultProvider log_provider = 4;
}

enum ProvisionClusterResponseKind {
ProvisionClusterResponseType_UNKNOWN = 0;
DRY_RUN = 1;
NEWLY_PROVISIONED = 2;
ALREADY_PROVISIONED = 3;
}

message ProvisionClusterResponse {
ProvisionClusterResponseKind kind = 1;
// This field will be empty if the cluster is already provisioned
optional restate.cluster.ClusterConfiguration cluster_configuration = 3;
bool dry_run = 1;
restate.cluster.ClusterConfiguration cluster_configuration = 2;
}

message IdentResponse {
Expand Down
13 changes: 3 additions & 10 deletions crates/core/src/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@ pub mod node_ctl_svc {
impl ProvisionClusterResponse {
pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::DryRun.into(),
dry_run: true,
cluster_configuration: Some(cluster_configuration),
}
}

pub fn newly_provisioned(cluster_configuration: ClusterConfiguration) -> Self {
pub fn provisioned(cluster_configuration: ClusterConfiguration) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::NewlyProvisioned.into(),
dry_run: false,
cluster_configuration: Some(cluster_configuration),
}
}

pub fn already_provisioned() -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::AlreadyProvisioned.into(),
..Default::default()
}
}
}
}
1 change: 1 addition & 0 deletions crates/local-cluster-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["process", "fs"] }
tonic = { workspace = true }
toml = "0.8"
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
29 changes: 17 additions & 12 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use itertools::Itertools;
use regex::{Regex, RegexSet};
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::{
ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponseKind,
};
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest as ProtoProvisionClusterRequest;
use restate_types::logs::metadata::DefaultProvider;
use restate_types::partition_table::ReplicationStrategy;
use restate_types::retries::RetryPolicy;
Expand Down Expand Up @@ -54,6 +52,7 @@ use tokio::{
task::JoinHandle,
};
use tokio::{process::Command, sync::mpsc::Sender};
use tonic::Code;
use tracing::{error, info, warn};
use typed_builder::TypedBuilder;

Expand Down Expand Up @@ -789,17 +788,23 @@ impl StartedNode {
let request = request.clone();
async move { client.provision_cluster(request).await }
})
.await?
.into_inner();
.await;

Ok(match response.kind() {
ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => {
panic!("unknown cluster response type")
match response {
Ok(response) => {
let response = response.into_inner();

assert!(!response.dry_run, "provision command was run w/o dry run");
Ok(true)
}
ProvisionClusterResponseKind::DryRun => unreachable!("request non dry run"),
ProvisionClusterResponseKind::NewlyProvisioned => true,
ProvisionClusterResponseKind::AlreadyProvisioned => false,
})
Err(status) => {
if status.code() == Code::AlreadyExists {
Ok(false)
} else {
Err(status.into())
}
}
}
}
}

Expand Down
24 changes: 9 additions & 15 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use restate_core::network::{
};
use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind};
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest;
use restate_core::{cancellation_watcher, Metadata, TaskKind};
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter};
#[cfg(feature = "replicated-loglet")]
Expand Down Expand Up @@ -387,22 +387,16 @@ impl Node {
.await;

match response {
Ok(response) => match response.into_inner().kind() {
ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => {
panic!("unknown cluster response type")
}
ProvisionClusterResponseKind::DryRun => {
unreachable!("call w/o dry run")
}
ProvisionClusterResponseKind::NewlyProvisioned => {
debug!("Successfully auto provisioned the cluster")
}
ProvisionClusterResponseKind::AlreadyProvisioned => {
Ok(response) => {
let response = response.into_inner();
debug_assert!(!response.dry_run, "Provision w/o dry run");
}
Err(err) => {
if err.code() == Code::AlreadyExists {
debug!("The cluster is already provisioned.")
} else {
warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}");
}
},
Err(err) => {
warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}");
}
}

Expand Down
27 changes: 12 additions & 15 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc;
use restate_core::protobuf::node_ctl_svc::{
GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest,
ProvisionClusterResponse, ProvisionClusterResponseKind,
ProvisionClusterResponse,
};
use restate_core::task_center::TaskCenterMonitoring;
use restate_core::{task_center, Metadata, MetadataKind, TargetVersion};
Expand All @@ -39,7 +39,6 @@ use restate_types::{GenerationalNodeId, Version, Versioned};
use std::num::NonZeroU16;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};
use tracing::warn;

pub struct NodeCtlSvcHandler {
task_center: task_center::Handle,
Expand Down Expand Up @@ -276,27 +275,25 @@ impl NodeCtlSvc for NodeCtlSvcHandler {
.map_err(|err| Status::invalid_argument(err.to_string()))?;

if dry_run {
return Ok(Response::new(ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::DryRun.into(),
cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)),
}));
return Ok(Response::new(ProvisionClusterResponse::dry_run(
ProtoClusterConfiguration::from(cluster_configuration),
)));
}

let newly_provisioned = self
.provision_cluster_metadata(&config.common, &cluster_configuration)
.await
.map_err(|err| Status::internal(err.to_string()))?;

let kind = if newly_provisioned {
ProvisionClusterResponseKind::NewlyProvisioned
} else {
ProvisionClusterResponseKind::AlreadyProvisioned
};
if !newly_provisioned {
return Err(Status::already_exists(
"The cluster has already been provisioned",
));
}

Ok(Response::new(ProvisionClusterResponse {
kind: kind.into(),
cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)),
}))
Ok(Response::new(ProvisionClusterResponse::provisioned(
ProtoClusterConfiguration::from(cluster_configuration),
)))
}
}

Expand Down
51 changes: 17 additions & 34 deletions tools/restatectl/src/commands/cluster/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use cling::{Collect, Run};
use restate_cli_util::ui::console::confirm_or_exit;
use restate_cli_util::{c_error, c_println, c_warn};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind};
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest;
use restate_types::logs::metadata::{
DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig,
};
Expand All @@ -26,6 +26,7 @@ use restate_types::partition_table::ReplicationStrategy;
use restate_types::replicated_loglet::ReplicationProperty;
use std::num::NonZeroU16;
use tonic::codec::CompressionEncoding;
use tonic::Code;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "cluster_provision")]
Expand Down Expand Up @@ -91,21 +92,10 @@ async fn cluster_provision(
}
};

let cluster_configuration_to_provision = match response.kind() {
ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => {
panic!("unknown cluster response type")
}
ProvisionClusterResponseKind::DryRun => response
.cluster_configuration
.expect("dry run response needs to carry a cluster configuration"),
ProvisionClusterResponseKind::NewlyProvisioned => {
unreachable!("provisioning a cluster with dry run should not have an effect")
}
ProvisionClusterResponseKind::AlreadyProvisioned => {
c_println!("🏃The cluster has already been provisioned.");
return Ok(());
}
};
debug_assert!(response.dry_run, "Provision with dry run");
let cluster_configuration_to_provision = response
.cluster_configuration
.expect("Provision response should carry a cluster configuration");

c_println!(
"{}",
Expand Down Expand Up @@ -134,28 +124,21 @@ async fn cluster_provision(
log_provider: cluster_configuration_to_provision.default_provider,
};

let response = match client.provision_cluster(request).await {
Ok(response) => response.into_inner(),
Err(err) => {
c_error!("Failed to provision cluster: {}", err.message());
return Ok(());
}
};
match client.provision_cluster(request).await {
Ok(response) => {
let response = response.into_inner();
debug_assert!(!response.dry_run, "Provision w/o dry run");

match response.kind() {
ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => {
panic!("unknown provision cluster response kind")
}
ProvisionClusterResponseKind::DryRun => {
unreachable!("provisioning a cluster w/o dry run should have an effect")
}
ProvisionClusterResponseKind::NewlyProvisioned => {
c_println!("✅ Cluster has been successfully provisioned.");
}
ProvisionClusterResponseKind::AlreadyProvisioned => {
c_println!("🤷 Cluster has been provisioned by somebody else.");
Err(err) => {
if err.code() == Code::AlreadyExists {
c_println!("🤷 Cluster has been provisioned by somebody else.");
} else {
c_error!("Failed to provision cluster: {}", err.message());
}
}
}
};

Ok(())
}
Expand Down

0 comments on commit 092e256

Please sign in to comment.