From 52a62f105f9bb52f4fa2f4d4cff21b27550b13ed Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 7 Jan 2025 13:50:07 +0000 Subject: [PATCH] [Core] MetadataWriter access to MetadataStore and Bifrost access to MetadataWriter This PR introduces two key changes: 1. Bifrost now can provide `bifrost.admin()` to get easy access to bifrost's admin interface withouto passing metadata writer and metadata store client explicitly. 2. MetadataWriter now owns MetadataStoreClient. This means that you can always access metadata store client directly if you have metadata_writer which used to be a pair of types we _always_ pass together. This also opens a path for a future where MetadataWriter wraps the metadata store client and automatically update metadata manager on successful writes but that's beyond the scope of this PR. What this PR provides is a direct access to the underlying metadata_store_client via an accessor function. --- .../cluster_controller/grpc_svc_handler.rs | 25 ++--- .../src/cluster_controller/logs_controller.rs | 34 ++---- .../admin/src/cluster_controller/service.rs | 61 ++++------- .../src/cluster_controller/service/state.rs | 22 +--- crates/admin/src/schema_registry/mod.rs | 23 ++-- crates/admin/src/service.rs | 3 - crates/bifrost/src/bifrost.rs | 70 ++++++------ crates/bifrost/src/bifrost_admin.rs | 101 +++++++++--------- crates/bifrost/src/loglet/provider.rs | 3 +- crates/bifrost/src/read_stream.rs | 32 +++--- crates/bifrost/src/service.rs | 14 +-- crates/core/src/metadata.rs | 19 +++- crates/core/src/metadata/manager.rs | 6 +- crates/core/src/metadata_store.rs | 1 - crates/node/src/lib.rs | 4 +- crates/node/src/roles/admin.rs | 4 - crates/worker/src/partition/cleaner.rs | 4 +- crates/worker/src/partition/leadership/mod.rs | 4 +- crates/worker/src/partition/shuffle.rs | 2 +- .../worker/src/partition_processor_manager.rs | 3 +- server/tests/common/replicated_loglet.rs | 12 +-- server/tests/replicated_loglet.rs | 15 +-- tools/bifrost-benchpress/src/main.rs | 2 +- tools/restatectl/src/commands/log/dump_log.rs | 4 +- tools/xtask/src/main.rs | 3 +- 25 files changed, 200 insertions(+), 271 deletions(-) diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c820239..c685a102b 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError}; +use restate_bifrost::{Bifrost, Error as BiforstError}; use restate_core::{Metadata, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::{Logs, SegmentIndex}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; @@ -44,7 +43,6 @@ use super::service::ChainExtension; use super::ClusterControllerHandle; pub(crate) struct ClusterCtrlSvcHandler { - metadata_store_client: MetadataStoreClient, controller_handle: ClusterControllerHandle, bifrost: Bifrost, metadata_writer: MetadataWriter, @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler { impl ClusterCtrlSvcHandler { pub fn new( controller_handle: ClusterControllerHandle, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, metadata_writer: MetadataWriter, ) -> Self { Self { controller_handle, - metadata_store_client, bifrost, metadata_writer, } } async fn get_logs(&self) -> Result { - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await .map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))? @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let (trim_point, nodes_config) = tokio::join!( self.bifrost.get_trim_point(log_id), - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()), ); @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { _request: Request, ) -> Result, Status> { let nodes_config = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()) .await .map_err(|error| { @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let request = request.into_inner(); let log_id: LogId = request.log_id.into(); - let admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - - let writable_loglet = admin + let writable_loglet = self + .bifrost + .admin() .writeable_loglet(log_id) .await .map_err(|err| match err { diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 4f1f7db6e..5935181af 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -25,9 +25,9 @@ use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; +use restate_bifrost::{Bifrost, Error as BifrostError}; use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, + retry_on_network_error, Precondition, ReadWriteError, WriteError, }; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; use restate_types::errors::GenericError; @@ -924,7 +924,6 @@ pub struct LogsController { effects: Option>, inner: LogsControllerInner, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, async_operations: JoinSet, find_logs_tail_semaphore: Arc, @@ -934,16 +933,17 @@ impl LogsController { pub async fn init( configuration: &Configuration, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, ) -> Result { // obtain the latest logs or init it with an empty logs variant let logs = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) + metadata_writer + .metadata_store_client() + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + Logs::from_configuration(configuration) + }) }, ) .await?; @@ -965,7 +965,6 @@ impl LogsController { retry_policy, ), bifrost, - metadata_store_client, metadata_writer, async_operations: JoinSet::default(), find_logs_tail_semaphore: Arc::new(Semaphore::new(1)), @@ -984,17 +983,12 @@ impl LogsController { let logs = Arc::clone(&self.inner.current_logs); let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); let find_tail = async move { - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - let mut updates = LogsTailUpdates::default(); for (log_id, chain) in logs.iter() { let tail_segment = chain.tail(); - let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await { + let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await { Ok(loglet) => loglet, Err(BifrostError::Shutdown(_)) => break, Err(err) => { @@ -1098,7 +1092,6 @@ impl LogsController { logs: Arc, mut debounce: Option>, ) { - let metadata_store_client = self.metadata_store_client.clone(); let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn(async move { @@ -1108,7 +1101,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - if let Err(err) = metadata_store_client + if let Err(err) = metadata_writer.metadata_store_client() .put( BIFROST_CONFIG_KEY.clone(), logs.deref(), @@ -1120,7 +1113,7 @@ impl LogsController { WriteError::FailedPrecondition(_) => { debug!("Detected a concurrent modification of logs. Fetching the latest logs now."); // There was a concurrent modification of the logs. Fetch the latest version. - match metadata_store_client + match metadata_writer.metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await { @@ -1166,8 +1159,6 @@ impl LogsController { mut debounce: Option>, ) { let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn( async move { @@ -1177,10 +1168,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - - match bifrost_admin.seal(log_id, segment_index).await { + match bifrost.admin().seal(log_id, segment_index).await { Ok(sealed_segment) => { if sealed_segment.tail.is_sealed() { Event::SealSucceeded { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 3acbc22f5..9c68aab81 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -36,8 +36,8 @@ use restate_types::partition_table::{ }; use restate_types::replicated_loglet::ReplicatedLogletParams; -use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_bifrost::{Bifrost, SealedSegment}; +use restate_core::metadata_store::retry_on_network_error; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -79,7 +79,6 @@ pub struct Service { cluster_state_refresher: ClusterStateRefresher, configuration: Live, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, @@ -102,7 +101,6 @@ where router_builder: &mut MessageRouterBuilder, server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, ) -> Self { let (command_tx, command_rx) = mpsc::channel(2); @@ -122,7 +120,6 @@ where ClusterControllerHandle { tx: command_tx.clone(), }, - metadata_store_client.clone(), bifrost.clone(), metadata_writer.clone(), )) @@ -140,7 +137,6 @@ where bifrost, cluster_state_refresher, metadata_writer, - metadata_store_client, processor_manager_client, command_tx, command_rx, @@ -309,12 +305,6 @@ impl Service { let mut shutdown = std::pin::pin!(cancellation_watcher()); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let mut state: ClusterControllerState = ClusterControllerState::Follower; self.health_status.update(AdminStatus::Ready); @@ -333,7 +323,7 @@ impl Service { } Some(cmd) = self.command_rx.recv() => { // it is still safe to handle cluster commands as a follower - self.on_cluster_cmd(cmd, bifrost_admin).await; + self.on_cluster_cmd(cmd).await; } _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); @@ -360,8 +350,9 @@ impl Service { let partition_table = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { + self.metadata_writer.metadata_store_client().get_or_insert( + PARTITION_TABLE_KEY.clone(), + || { let partition_table = PartitionTable::with_equally_sized_partitions( Version::MIN, configuration.common.bootstrap_num_partitions.get(), @@ -370,7 +361,8 @@ impl Service { debug!("Initializing the partition table with '{partition_table:?}'"); partition_table - }) + }, + ) }, ) .await?; @@ -442,7 +434,8 @@ impl Service { default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { let logs = match current { Some(logs) => logs, @@ -493,7 +486,8 @@ impl Service { }; let partition_table = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( PARTITION_TABLE_KEY.clone(), |current: Option| { @@ -557,7 +551,6 @@ impl Service { extension, min_version, bifrost: self.bifrost.clone(), - metadata_store_client: self.metadata_store_client.clone(), metadata_writer: self.metadata_writer.clone(), observed_cluster_state: self.observed_cluster_state.clone(), }; @@ -569,11 +562,7 @@ impl Service { }); } - async fn on_cluster_cmd( - &self, - command: ClusterControllerCommand, - bifrost_admin: BifrostAdmin<'_>, - ) { + async fn on_cluster_cmd(&self, command: ClusterControllerCommand) { match command { ClusterControllerCommand::GetClusterState(tx) => { let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); @@ -587,7 +576,7 @@ impl Service { ?log_id, trim_point_inclusive = ?trim_point, "Manual trim log command received"); - let result = bifrost_admin.trim(log_id, trim_point).await; + let result = self.bifrost.admin().trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } ClusterControllerCommand::CreateSnapshot { @@ -715,7 +704,6 @@ struct SealAndExtendTask { extension: Option, bifrost: Bifrost, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, observed_cluster_state: ObservedClusterState, } @@ -726,18 +714,14 @@ impl SealAndExtendTask { .as_ref() .and_then(|ext| ext.segment_index_to_seal); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let (provider, params) = match self.extension.take() { Some(extension) => (extension.provider_kind, extension.params), None => self.next_segment().await?, }; - let sealed_segment = bifrost_admin + let sealed_segment = self + .bifrost + .admin() .seal_and_extend_chain( self.log_id, last_segment_index, @@ -796,7 +780,8 @@ impl SealAndExtendTask { #[cfg(feature = "replicated-loglet")] ProviderConfiguration::Replicated(config) => { let schedule_plan = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(SCHEDULING_PLAN_KEY.clone()) .await?; @@ -858,7 +843,8 @@ mod tests { async fn manual_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let svc = Service::new( @@ -869,7 +855,6 @@ mod tests { &mut builder.router_builder, &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let svc_handle = svc.handle(); @@ -1141,7 +1126,8 @@ mod tests { { restate_types::config::set_current_config(config); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let mut server_builder = NetworkServerBuilder::default(); @@ -1154,7 +1140,6 @@ mod tests { &mut builder.router_builder, &mut server_builder, builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..430ff8124 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -17,10 +17,9 @@ use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{debug, info, warn}; -use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; -use restate_core::{my_node_id, Metadata, MetadataWriter}; +use restate_core::{my_node_id, Metadata}; use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; @@ -135,8 +134,6 @@ pub enum LeaderEvent { pub struct Leader { bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, - metadata_writer: MetadataWriter, logs_watcher: watch::Receiver, partition_table_watcher: watch::Receiver, find_logs_tail_interval: Interval, @@ -156,7 +153,7 @@ where let scheduler = Scheduler::init( &configuration, - service.metadata_store_client.clone(), + service.metadata_writer.metadata_store_client().clone(), service.networking.clone(), ) .await?; @@ -164,7 +161,6 @@ where let logs_controller = LogsController::init( &configuration, service.bifrost.clone(), - service.metadata_store_client.clone(), service.metadata_writer.clone(), ) .await?; @@ -179,8 +175,6 @@ where let metadata = Metadata::current(); let mut leader = Self { bifrost: service.bifrost.clone(), - metadata_store_client: service.metadata_store_client.clone(), - metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), @@ -296,12 +290,6 @@ where } async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> { - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let cluster_state = self.cluster_state_watcher.current(); let mut persisted_lsns_per_partition: BTreeMap< @@ -342,13 +330,13 @@ where if persisted_lsns.len() >= cluster_state.nodes.len() { let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); // trim point is before the oldest record - let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; + let current_trim_point = self.bifrost.get_trim_point(log_id).await?; if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + self.bifrost.admin().trim(log_id, min_persisted_lsn).await? } } else { warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index cd043be8d..39481e6d2 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -11,16 +11,15 @@ pub mod error; mod updater; -use http::Uri; - use std::borrow::Borrow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use http::Uri; use tracing::subscriber::NoSubscriber; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::{Metadata, MetadataWriter}; use restate_service_protocol::discovery::{DiscoverEndpoint, DiscoveredEndpoint, ServiceDiscovery}; use restate_types::identifiers::{DeploymentId, ServiceRevision, SubscriptionId}; @@ -77,7 +76,6 @@ pub enum ModifyServiceChange { /// new deployments. #[derive(Clone)] pub struct SchemaRegistry { - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -87,7 +85,6 @@ pub struct SchemaRegistry { impl SchemaRegistry { pub fn new( - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -95,7 +92,6 @@ impl SchemaRegistry { ) -> Self { Self { metadata_writer, - metadata_store_client, service_discovery, subscription_validator, experimental_feature_kafka_ingress_next, @@ -155,7 +151,8 @@ impl SchemaRegistry { } else { let mut new_deployment_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -195,7 +192,8 @@ impl SchemaRegistry { deployment_id: DeploymentId, ) -> Result<(), SchemaRegistryError> { let schema_registry = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_registry: Option| { @@ -229,7 +227,8 @@ impl SchemaRegistry { changes: Vec, ) -> Result { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -270,7 +269,8 @@ impl SchemaRegistry { subscription_id: SubscriptionId, ) -> Result<(), SchemaRegistryError> { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -370,7 +370,8 @@ where let mut subscription_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index b3c5ecef8..3f3020a62 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,7 +17,6 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tower::ServiceBuilder; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::net_util; use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; @@ -44,7 +43,6 @@ where { pub fn new( metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, subscription_validator: V, service_discovery: ServiceDiscovery, @@ -54,7 +52,6 @@ where Self { bifrost, schema_registry: SchemaRegistry::new( - metadata_store_client, metadata_writer, service_discovery, subscription_validator, diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 06b86ff76..b0da0b240 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -15,7 +15,7 @@ use std::sync::OnceLock; use enum_map::EnumMap; use tracing::instrument; -use restate_core::{Metadata, MetadataKind, TargetVersion}; +use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState}; use restate_types::storage::StorageEncode; @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, InputRecord, LogReadStream, Result}; +use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes /// a sealed loglet while it's tailing a log. @@ -77,20 +77,20 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_in_memory() -> Self { + pub async fn init_in_memory(metadata_writer: MetadataWriter) -> Self { use crate::providers::memory_loglet; - Self::init_with_factory(memory_loglet::Factory::default()).await + Self::init_with_factory(metadata_writer, memory_loglet::Factory::default()).await } #[cfg(any(test, feature = "test-util"))] - pub async fn init_local() -> Self { + pub async fn init_local(metadata_writer: MetadataWriter) -> Self { use restate_types::config::Configuration; use crate::BifrostService; let config = Configuration::updateable(); - let bifrost_svc = BifrostService::new().enable_local_loglet(&config); + let bifrost_svc = BifrostService::new(metadata_writer).enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -102,10 +102,13 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_with_factory(factory: impl crate::loglet::LogletProviderFactory) -> Self { + pub async fn init_with_factory( + metadata_writer: MetadataWriter, + factory: impl crate::loglet::LogletProviderFactory, + ) -> Self { use crate::BifrostService; - let bifrost_svc = BifrostService::new().with_factory(factory); + let bifrost_svc = BifrostService::new(metadata_writer).with_factory(factory); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -116,6 +119,11 @@ impl Bifrost { bifrost } + /// Admin operations of bifrost + pub fn admin(&self) -> BifrostAdmin<'_> { + BifrostAdmin::new(self) + } + /// Appends a single record to a log. The log id must exist, otherwise the /// operation fails with [`Error::UnknownLogId`] /// @@ -302,15 +310,17 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone); pub struct BifrostInner { #[allow(unused)] watchdog: WatchdogSender, + pub(crate) metadata_writer: MetadataWriter, // Initialized after BifrostService::start completes. pub(crate) providers: OnceLock>>>, shutting_down: AtomicBool, } impl BifrostInner { - pub fn new(watchdog: WatchdogSender) -> Self { + pub fn new(watchdog: WatchdogSender, metadata_writer: MetadataWriter) -> Self { Self { watchdog, + metadata_writer, providers: Default::default(), shutting_down: AtomicBool::new(false), } @@ -558,13 +568,12 @@ mod tests { use restate_types::{Version, Versioned}; use crate::providers::memory_loglet::{self}; - use crate::BifrostAdmin; #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { let num_partitions = 5; - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, num_partitions, @@ -572,7 +581,7 @@ mod tests { .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let clean_bifrost_clone = bifrost.clone(); @@ -637,12 +646,12 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_lazy_initialization() -> googletest::Result<()> { - let _ = TestCoreEnv::create_with_single_node(1, 1).await; + let env = TestCoreEnv::create_with_single_node(1, 1).await; let delay = Duration::from_secs(5); // This memory provider adds a delay to its loglet initialization, we want // to ensure that appends do not fail while waiting for the loglet; let factory = memory_loglet::Factory::with_init_delay(delay); - let bifrost = Bifrost::init_with_factory(factory).await; + let bifrost = Bifrost::init_with_factory(env.metadata_writer, factory).await; let start = tokio::time::Instant::now(); let lsn = bifrost @@ -664,12 +673,7 @@ mod tests { .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset()); @@ -681,7 +685,7 @@ mod tests { appender.append("").await?; } - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); @@ -703,7 +707,7 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; + bifrost.admin().trim(LOG_ID, Lsn::MAX).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -737,12 +741,7 @@ mod tests { )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer).await; let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] @@ -765,7 +764,8 @@ mod tests { .unwrap(); // seal the segment - bifrost_admin + bifrost + .admin() .seal(LOG_ID, segment_1.segment_index()) .await?; @@ -925,12 +925,7 @@ mod tests { .build() .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; // create an appender let stop_signal = Arc::new(AtomicBool::default()); @@ -976,7 +971,7 @@ mod tests { } // seal and don't extend the chain. - let _ = bifrost_admin.seal(LOG_ID, SegmentIndex::from(0)).await?; + let _ = bifrost.admin().seal(LOG_ID, SegmentIndex::from(0)).await?; // appends should stall! tokio::time::sleep(Duration::from_millis(100)).await; @@ -998,7 +993,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::Local); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 6e84d52fb..b447f3652 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -8,14 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::Deref; use std::sync::Arc; -use restate_core::metadata_store::retry_on_network_error; use tracing::{debug, info, instrument}; -use restate_core::{Metadata, MetadataKind, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; +use restate_core::metadata_store::retry_on_network_error; +use restate_core::{Metadata, MetadataKind}; use restate_types::config::Configuration; use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; @@ -30,21 +28,6 @@ use crate::{Bifrost, Error, Result}; #[derive(Clone, Copy)] pub struct BifrostAdmin<'a> { bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, -} - -impl<'a> AsRef for BifrostAdmin<'a> { - fn as_ref(&self) -> &Bifrost { - self.bifrost - } -} - -impl<'a> Deref for BifrostAdmin<'a> { - type Target = Bifrost; - fn deref(&self) -> &Self::Target { - self.bifrost - } } #[derive(Debug)] @@ -56,16 +39,8 @@ pub struct SealedSegment { } impl<'a> BifrostAdmin<'a> { - pub fn new( - bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, - ) -> Self { - Self { - bifrost, - metadata_writer, - metadata_store_client, - } + pub fn new(bifrost: &'a Bifrost) -> Self { + Self { bifrost } } /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to @@ -179,7 +154,7 @@ impl<'a> BifrostAdmin<'a> { } pub async fn writeable_loglet(&self, log_id: LogId) -> Result { - self.inner.writeable_loglet(log_id).await + self.bifrost.inner.writeable_loglet(log_id).await } #[instrument(level = "debug", skip(self), err)] @@ -247,9 +222,11 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write(BIFROST_CONFIG_KEY.clone(), |logs: Option| { let logs = logs.ok_or(Error::UnknownLogId(log_id))?; let mut builder = logs.into_builder(); @@ -268,13 +245,16 @@ impl<'a> BifrostAdmin<'a> { .append_segment(base_lsn, provider, params.clone()) .map_err(AdminError::from)?; Ok(builder.build()) - }, - ) + }) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -292,25 +272,33 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write::<_, _, Error>( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { - // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, - // provisioning the empty logs metadata is not our responsibility. - let logs = logs.ok_or(Error::LogsMetadataNotProvisioned)?; - - let mut builder = logs.into_builder(); - builder - .add_log(log_id, Chain::new(provider, params.clone())) - .map_err(AdminError::from)?; - Ok(builder.build()) - }, - ) + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::LogsMetadataNotProvisioned)?; + + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -323,14 +311,21 @@ impl<'a> BifrostAdmin<'a> { .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client + self.bifrost + .inner + .metadata_writer + .metadata_store_client() .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { Logs::from_configuration(&Configuration::pinned()) }) }) .await?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } } diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index dbf4cf806..3d84d6c49 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -44,7 +44,8 @@ pub trait LogletProvider: Send + Sync { /// This will not perform any updates, it just statically generates a valid /// configuration for a potentially new loglet. /// - /// if `chain` is None, this means we no chain exists already for this log. + /// if `chain` is None, the provider should assume that no chain exists already + /// for this log. fn propose_new_loglet_params( &self, log_id: LogId, diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8ac81c3ee..11c32f212 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,14 +454,14 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; + use crate::{BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_readstream_one_loglet() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_provider_kind(ProviderKind::Local) .build() .await; @@ -471,7 +471,7 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = BifrostService::new(env.metadata_writer).enable_local_loglet(&config); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); @@ -548,14 +548,10 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = + BifrostService::new(node_env.metadata_writer.clone()).enable_local_loglet(&config); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -569,7 +565,7 @@ mod tests { } // [1..5] trimmed. trim_point = 5 - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); @@ -590,7 +586,7 @@ mod tests { let tail = bifrost.find_tail(LOG_ID).await?.offset(); // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(LOG_ID).await?; assert_eq!(Lsn::from(10), bifrost.get_trim_point(LOG_ID).await?); // trim point becomes the point before the next slot available for writes (aka. the @@ -643,7 +639,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer.clone()) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); @@ -799,15 +795,10 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -825,7 +816,8 @@ mod tests { // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, @@ -917,7 +909,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..ec6718aa7 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -15,7 +15,9 @@ use anyhow::Context; use enum_map::EnumMap; use tracing::{debug, error, trace}; -use restate_core::{cancellation_watcher, TaskCenter, TaskCenterFutureExt, TaskKind}; +use restate_core::{ + cancellation_watcher, MetadataWriter, TaskCenter, TaskCenterFutureExt, TaskKind, +}; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::metadata::ProviderKind; @@ -34,16 +36,10 @@ pub struct BifrostService { factories: HashMap>, } -impl Default for BifrostService { - fn default() -> Self { - Self::new() - } -} - impl BifrostService { - pub fn new() -> Self { + pub fn new(metadata_writer: MetadataWriter) -> Self { let (watchdog_sender, watchdog_receiver) = tokio::sync::mpsc::unbounded_channel(); - let inner = Arc::new(BifrostInner::new(watchdog_sender.clone())); + let inner = Arc::new(BifrostInner::new(watchdog_sender.clone(), metadata_writer)); let bifrost = Bifrost::new(inner.clone()); let watchdog = Watchdog::new(inner.clone(), watchdog_sender, watchdog_receiver); Self { diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 2244c0ef3..08fde62e1 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -29,7 +29,7 @@ use restate_types::schema::Schema; use restate_types::{GenerationalNodeId, Version, Versioned}; use crate::metadata::manager::Command; -use crate::metadata_store::ReadError; +use crate::metadata_store::{MetadataStoreClient, ReadError}; use crate::network::WeakConnection; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; @@ -335,6 +335,7 @@ struct MetadataInner { /// so it's safe to call update_* without checking the current version. #[derive(Clone)] pub struct MetadataWriter { + metadata_store_client: MetadataStoreClient, sender: manager::CommandSender, /// strictly used to set my node id. Do not use this to update metadata /// directly to avoid race conditions. @@ -342,8 +343,20 @@ pub struct MetadataWriter { } impl MetadataWriter { - fn new(sender: manager::CommandSender, inner: Arc) -> Self { - Self { sender, inner } + fn new( + sender: manager::CommandSender, + metadata_store_client: MetadataStoreClient, + inner: Arc, + ) -> Self { + Self { + metadata_store_client, + sender, + inner, + } + } + + pub fn metadata_store_client(&self) -> &MetadataStoreClient { + &self.metadata_store_client } // Returns when the nodes configuration update is performed. diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 3c9773f99..baf248d3d 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -258,7 +258,11 @@ impl MetadataManager { } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone()) + MetadataWriter::new( + self.metadata.sender.clone(), + self.metadata_store_client.clone(), + self.metadata.inner.clone(), + ) } /// Start and wait for shutdown signal. diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..891e19d6d 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -105,7 +105,6 @@ pub trait MetadataStore { /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. #[derive(Clone)] pub struct MetadataStoreClient { - // premature optimization? Maybe introduce trait object once we have multiple implementations? inner: Arc, backoff_policy: Option, } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..7dbc273a0 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -184,7 +184,8 @@ impl Node { record_cache.clone(), &mut router_builder, ); - let bifrost_svc = BifrostService::new().enable_local_loglet(&updateable_config); + let bifrost_svc = + BifrostService::new(metadata_manager.writer()).enable_local_loglet(&updateable_config); #[cfg(feature = "replicated-loglet")] let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory); @@ -271,7 +272,6 @@ impl Node { metadata_manager.writer(), &mut server_builder, &mut router_builder, - metadata_store_client.clone(), worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eec..89cedd8af 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -14,7 +14,6 @@ use codederror::CodedError; use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::MessageRouterBuilder; use restate_core::network::NetworkServerBuilder; use restate_core::network::Networking; @@ -70,7 +69,6 @@ impl AdminRole { metadata_writer: MetadataWriter, server_builder: &mut NetworkServerBuilder, router_builder: &mut MessageRouterBuilder, - metadata_store_client: MetadataStoreClient, local_query_context: Option, ) -> Result { health_status.update(AdminStatus::StartingUp); @@ -104,7 +102,6 @@ impl AdminRole { let admin = AdminService::new( metadata_writer.clone(), - metadata_store_client.clone(), bifrost.clone(), config.ingress.clone(), service_discovery, @@ -121,7 +118,6 @@ impl AdminRole { router_builder, server_builder, metadata_writer, - metadata_store_client, )) } else { None diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 884a22866..ecb513c56 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -213,14 +213,14 @@ mod tests { // Start paused makes sure the timer is immediately fired #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let _env = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 105e5a6a4..6d8dc4842 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -637,12 +637,12 @@ mod tests { #[test(restate_core::test)] async fn become_leader_then_step_down() -> googletest::Result<()> { - let _env = TestCoreEnv::create_with_single_node(0, 0).await; + let env = TestCoreEnv::create_with_single_node(0, 0).await; let storage_options = StorageOptions::default(); let rocksdb_options = RocksDbOptions::default(); RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let partition_store_manager = PartitionStoreManager::create( Constant::new(storage_options.clone()).boxed(), diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index e4ac810d0..6b378b33e 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -628,7 +628,7 @@ mod tests { let (truncation_tx, _truncation_rx) = mpsc::channel(1); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer.clone()).await; let shuffle = Shuffle::new(metadata, outbox_reader, truncation_tx, 1, bifrost.clone()); ShuffleEnv { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 01a994163..5a394716d 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -1006,7 +1006,8 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(env_builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let partition_store_manager = PartitionStoreManager::create( diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 628707454..c11c9a1a0 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -15,7 +15,7 @@ use enumset::{enum_set, EnumSet}; use googletest::internal::test_outcome::TestAssertionFailure; use googletest::IntoTestResult; -use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin}; +use restate_bifrost::{loglet::Loglet, Bifrost}; use restate_core::metadata_store::Precondition; use restate_core::TaskCenter; use restate_core::{metadata_store::MetadataStoreClient, MetadataWriter}; @@ -92,16 +92,6 @@ pub struct TestEnv { pub cluster: StartedCluster, } -impl TestEnv { - pub fn bifrost_admin(&self) -> BifrostAdmin<'_> { - BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ) - } -} - pub async fn run_in_test_env( mut base_config: Configuration, sequencer: GenerationalNodeId, diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 8fa79c7f2..f3656d2c9 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -248,23 +248,13 @@ mod tests { } let mut sealer_handle: JoinHandle> = tokio::task::spawn({ - let (bifrost, metadata_writer, metadata_store_client) = ( - test_env.bifrost.clone(), - test_env.metadata_writer.clone(), - test_env.metadata_store_client.clone() - ); + let bifrost = test_env.bifrost.clone(); async move { let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); - let bifrost_admin = restate_bifrost::BifrostAdmin::new( - &bifrost, - &metadata_writer, - &metadata_store_client, - ); - let mut last_loglet_id = None; while !cancellation_token.is_cancelled() { @@ -280,7 +270,8 @@ mod tests { eprintln!("Sealing loglet {} and creating new loglet {}", params.loglet_id, params.loglet_id.next()); params.loglet_id = params.loglet_id.next(); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( log_id, None, diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index 33d5f8d21..f9512f964 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -176,7 +176,7 @@ fn spawn_environment(config: Live, num_logs: u16) -> (task_center metadata_writer.submit(Arc::new(logs)); spawn_metadata_manager(metadata_manager).expect("metadata manager starts"); - let bifrost_svc = BifrostService::new() + let bifrost_svc = BifrostService::new(metadata_writer) .enable_in_memory_loglet() .enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index cb4219513..213e328fa 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -86,6 +86,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); + let metadata_writer = metadata_manager.writer(); let mut router_builder = MessageRouterBuilder::default(); metadata_manager.register_in_message_router(&mut router_builder); @@ -95,7 +96,8 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { metadata_manager.run(), )?; - let bifrost_svc = BifrostService::new().enable_local_loglet(&Configuration::updateable()); + let bifrost_svc = + BifrostService::new(metadata_writer).enable_local_loglet(&Configuration::updateable()); let bifrost = bifrost_svc.handle(); // Ensures bifrost has initial metadata synced up before starting the worker. diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index ee702beec..e556aa82c 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -103,11 +103,10 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { // We start the Meta service, then download the openapi schema generated let node_env = TestCoreEnv::create_with_single_node(1, 1).await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; let admin_service = AdminService::new( node_env.metadata_writer.clone(), - node_env.metadata_store_client.clone(), bifrost, Mock, ServiceDiscovery::new(