Skip to content

Commit

Permalink
Allow embedded metadata store client to connect to different endpoints
Browse files Browse the repository at this point in the history
This commit introduces the ability to specify multiple addresses for the
metadata store endpoint. On error, the GrpcMetadataStoreClient randomly
switches to another endpoint. Moreover, this commit makes the OmniPaxosMetadataStore
only accept requests if it is the leader. Additionally, it fails all pending
callbacks if it loses leadership to avoid hanging requests if the request
was not decided.
  • Loading branch information
tillrohrmann committed Dec 6, 2024
1 parent 0655822 commit 186de27
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 57 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct Node {
pub fn with_socket_metadata(self) {
let metadata_socket: PathBuf = "metadata.sock".into();
self.base_config.metadata_store.bind_address = BindAddress::Uds(metadata_socket.clone());
self.base_config.common.metadata_store_client.metadata_store_client = MetadataStoreClient::Embedded { address: AdvertisedAddress::Uds(metadata_socket) }
self.base_config.common.metadata_store_client.metadata_store_client = MetadataStoreClient::Embedded { addresses: vec![AdvertisedAddress::Uds(metadata_socket)] }
}
pub fn with_random_ports(self) {
Expand Down Expand Up @@ -221,15 +221,17 @@ impl Node {
let base_dir = base_dir.into();

// ensure file paths are relative to the base dir
if let MetadataStoreClient::Embedded {
address: AdvertisedAddress::Uds(file),
} = &mut self
if let MetadataStoreClient::Embedded { addresses } = &mut self
.base_config
.common
.metadata_store_client
.metadata_store_client
{
*file = base_dir.join(&*file)
for advertised_address in addresses {
if let AdvertisedAddress::Uds(file) = advertised_address {
*file = base_dir.join(&*file)
}
}
}
if let BindAddress::Uds(file) = &mut self.base_config.metadata_store.bind_address {
*file = base_dir.join(&*file)
Expand Down
2 changes: 2 additions & 0 deletions crates/metadata-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ restate-rocksdb = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
assert2 = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
Expand All @@ -35,6 +36,7 @@ prost = { workspace = true }
prost-types = { workspace = true }
protobuf = "2.28.0"
raft = { version = "0.7.0" }
rand = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
157 changes: 126 additions & 31 deletions crates/metadata-store/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use arc_swap::ArcSwap;
use async_trait::async_trait;
use bytestring::ByteString;
use rand::seq::SliceRandom;
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::Channel;
use tonic::{Code, Status};

Expand All @@ -19,6 +23,7 @@ use restate_core::metadata_store::{
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::network::net_util::CommonClientConnectionOptions;
use restate_types::net::AdvertisedAddress;
use restate_types::retries::RetryPolicy;
use restate_types::Version;

use crate::grpc::pb_conversions::ConversionError;
Expand All @@ -28,31 +33,81 @@ use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest};
/// Client end to interact with the metadata store.
#[derive(Debug, Clone)]
pub struct GrpcMetadataStoreClient {
svc_client: MetadataStoreSvcClient<Channel>,
channels: Arc<Vec<Channel>>,
svc_client: Arc<ArcSwap<MetadataStoreSvcClient<Channel>>>,
}

impl GrpcMetadataStoreClient {
pub fn new<T: CommonClientConnectionOptions>(
metadata_store_address: AdvertisedAddress,
metadata_store_addresses: Vec<AdvertisedAddress>,
options: &T,
) -> Self {
let channel = create_tonic_channel_from_advertised_address(metadata_store_address, options);
assert!(
!metadata_store_addresses.is_empty(),
"At least one metadata store needs to be configured"
);
let channels: Vec<_> = metadata_store_addresses
.into_iter()
.map(|address| create_tonic_channel_from_advertised_address(address, options))
.collect();
let svc_client = MetadataStoreSvcClient::new(
channels
.first()
.expect("at least one address mus be configured")
.clone(),
);

Self {
svc_client: MetadataStoreSvcClient::new(channel),
channels: Arc::new(channels),
svc_client: Arc::new(ArcSwap::from_pointee(svc_client)),
}
}

fn retry_policy() -> RetryPolicy {
RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
Some(20),
Some(Duration::from_secs(2)),
)
}

fn choose_different_endpoint(&self) {
// let's try another endpoint
let mut rng = rand::thread_rng();
let new_svc_client = MetadataStoreSvcClient::new(
self.channels
.choose(&mut rng)
.expect("at least one channel be present")
.clone(),
);
self.svc_client.store(Arc::new(new_svc_client))
}
}

#[async_trait]
impl MetadataStore for GrpcMetadataStoreClient {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let response = self
.svc_client
.clone()
.get(GetRequest { key: key.into() })
.await
.map_err(map_status_to_read_error)?;
let retry_policy = Self::retry_policy();

let response = retry_policy
.retry(|| async {
let mut client = self.svc_client.load().as_ref().clone();

let response = client
.get(GetRequest {
key: key.clone().into(),
})
.await
.map_err(map_status_to_read_error);

if response.is_err() {
self.choose_different_endpoint();
}

response
})
.await?;

response
.into_inner()
Expand All @@ -61,12 +116,26 @@ impl MetadataStore for GrpcMetadataStoreClient {
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let response = self
.svc_client
.clone()
.get_version(GetRequest { key: key.into() })
.await
.map_err(map_status_to_read_error)?;
let retry_policy = Self::retry_policy();

let response = retry_policy
.retry(|| async {
let mut client = self.svc_client.load().as_ref().clone();

let response = client
.get_version(GetRequest {
key: key.clone().into(),
})
.await
.map_err(map_status_to_read_error);

if response.is_err() {
self.choose_different_endpoint();
}

response
})
.await?;

Ok(response.into_inner().into())
}
Expand All @@ -77,28 +146,54 @@ impl MetadataStore for GrpcMetadataStoreClient {
value: VersionedValue,
precondition: Precondition,
) -> Result<(), WriteError> {
self.svc_client
.clone()
.put(PutRequest {
key: key.into(),
value: Some(value.into()),
precondition: Some(precondition.into()),
let retry_policy = Self::retry_policy();

retry_policy
.retry(|| async {
let mut client = self.svc_client.load().as_ref().clone();

let response = client
.put(PutRequest {
key: key.clone().into(),
value: Some(value.clone().into()),
precondition: Some(precondition.clone().into()),
})
.await
.map_err(map_status_to_write_error);

if response.is_err() {
self.choose_different_endpoint();
}

response
})
.await
.map_err(map_status_to_write_error)?;
.await?;

Ok(())
}

async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
self.svc_client
.clone()
.delete(DeleteRequest {
key: key.into(),
precondition: Some(precondition.into()),
let retry_policy = Self::retry_policy();

retry_policy
.retry(|| async {
let mut client = self.svc_client.load().as_ref().clone();

let response = client
.delete(DeleteRequest {
key: key.clone().into(),
precondition: Some(precondition.clone().into()),
})
.await
.map_err(map_status_to_write_error);

if response.is_err() {
self.choose_different_endpoint();
}

response
})
.await
.map_err(map_status_to_write_error)?;
.await?;

Ok(())
}
Expand Down
6 changes: 0 additions & 6 deletions crates/metadata-store/src/kv_memory_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use bytestring::ByteString;
use restate_core::metadata_store::{Precondition, VersionedValue};
use restate_types::Version;
use std::collections::HashMap;
use tracing::debug;
use ulid::Ulid;

#[derive(Default)]
Expand All @@ -25,7 +24,6 @@ pub struct KvMemoryStorage {
impl KvMemoryStorage {
pub fn register_callback(&mut self, callback: Callback) {
self.callbacks.insert(callback.request_id, callback);
debug!("Pending callbacks: {}", self.callbacks.len());
}

pub fn fail_callbacks<F: Fn() -> RequestError>(&mut self, cause: F) {
Expand All @@ -35,8 +33,6 @@ impl KvMemoryStorage {
}

pub fn handle_request(&mut self, request: Request) {
debug!("Handle request: {request:?}");
debug!("Pending callbacks before handling request: {}", self.callbacks.len());
match request.kind {
RequestKind::Get { key } => {
let result = self.get(key);
Expand Down Expand Up @@ -67,8 +63,6 @@ impl KvMemoryStorage {
}
}
}

debug!("Pending callbacks after handling request: {}", self.callbacks.len());
}

pub fn get(&self, key: ByteString) -> Option<VersionedValue> {
Expand Down
4 changes: 2 additions & 2 deletions crates/metadata-store/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ pub async fn create_client(
);

let client = match metadata_store_client_options.metadata_store_client.clone() {
MetadataStoreClientConfig::Embedded { address } => {
MetadataStoreClientConfig::Embedded { addresses } => {
let inner_client =
GrpcMetadataStoreClient::new(address, &metadata_store_client_options);
GrpcMetadataStoreClient::new(addresses, &metadata_store_client_options);
MetadataStoreClient::new(inner_client, backoff_policy)
}
MetadataStoreClientConfig::Etcd { addresses } => {
Expand Down
8 changes: 4 additions & 4 deletions crates/metadata-store/src/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async fn durable_storage() -> anyhow::Result<()> {
let bind_address = BindAddress::Uds(uds_path.clone());
let metadata_store_client_opts = MetadataStoreClientOptionsBuilder::default()
.metadata_store_client(restate_types::config::MetadataStoreClient::Embedded {
address: AdvertisedAddress::Uds(uds_path),
addresses: vec![AdvertisedAddress::Uds(uds_path)],
})
.build()
.expect("valid metadata store client options");
Expand Down Expand Up @@ -290,7 +290,7 @@ async fn create_test_environment(
config.metadata_store.bind_address = bind_address;
config.common.metadata_store_client.metadata_store_client =
config::MetadataStoreClient::Embedded {
address: advertised_address.clone(),
addresses: vec![advertised_address.clone()],
};

restate_types::config::set_current_config(config.clone());
Expand Down Expand Up @@ -333,15 +333,15 @@ async fn start_metadata_store(
)?;

assert2::let_assert!(
config::MetadataStoreClient::Embedded { address } =
config::MetadataStoreClient::Embedded { addresses } =
metadata_store_client_options.metadata_store_client.clone()
);

health_status
.wait_for_value(MetadataServerStatus::Ready)
.await;

let grpc_client = GrpcMetadataStoreClient::new(address, &metadata_store_client_options);
let grpc_client = GrpcMetadataStoreClient::new(addresses, &metadata_store_client_options);
let client = MetadataStoreClient::new(
grpc_client,
Some(metadata_store_client_options.metadata_store_client_backoff_policy),
Expand Down
12 changes: 9 additions & 3 deletions crates/metadata-store/src/omnipaxos/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ impl OmnipaxosMetadataStore {

let omni_paxos = op_config.build(rocksdb_storage)?;

let is_leader = omni_paxos.get_current_leader().is_some_and(|(node_id, _)| node_id == own_node_id);
let is_leader = omni_paxos
.get_current_leader()
.is_some_and(|(node_id, _)| node_id == own_node_id);

Ok(Self {
own_node_id,
Expand Down Expand Up @@ -143,11 +145,15 @@ impl OmnipaxosMetadataStore {

fn check_leadership(&mut self) {
let previous_is_leader = self.is_leader;
self.is_leader = self.omni_paxos.get_current_leader().is_some_and(|(node_id, _)| node_id == self.own_node_id);
self.is_leader = self
.omni_paxos
.get_current_leader()
.is_some_and(|(node_id, _)| node_id == self.own_node_id);

if previous_is_leader && !self.is_leader {
// we lost leadership :-(
self.kv_storage.fail_callbacks(|| RequestError::Unavailable("lost leadership".into()));
self.kv_storage
.fail_callbacks(|| RequestError::Unavailable("lost leadership".into()));
}
}

Expand Down
Loading

0 comments on commit 186de27

Please sign in to comment.