diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c820239..c685a102b 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError}; +use restate_bifrost::{Bifrost, Error as BiforstError}; use restate_core::{Metadata, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::{Logs, SegmentIndex}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; @@ -44,7 +43,6 @@ use super::service::ChainExtension; use super::ClusterControllerHandle; pub(crate) struct ClusterCtrlSvcHandler { - metadata_store_client: MetadataStoreClient, controller_handle: ClusterControllerHandle, bifrost: Bifrost, metadata_writer: MetadataWriter, @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler { impl ClusterCtrlSvcHandler { pub fn new( controller_handle: ClusterControllerHandle, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, metadata_writer: MetadataWriter, ) -> Self { Self { controller_handle, - metadata_store_client, bifrost, metadata_writer, } } async fn get_logs(&self) -> Result { - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await .map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))? @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let (trim_point, nodes_config) = tokio::join!( self.bifrost.get_trim_point(log_id), - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()), ); @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { _request: Request, ) -> Result, Status> { let nodes_config = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()) .await .map_err(|error| { @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let request = request.into_inner(); let log_id: LogId = request.log_id.into(); - let admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - - let writable_loglet = admin + let writable_loglet = self + .bifrost + .admin() .writeable_loglet(log_id) .await .map_err(|err| match err { diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 4f1f7db6e..5935181af 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -25,9 +25,9 @@ use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; +use restate_bifrost::{Bifrost, Error as BifrostError}; use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, + retry_on_network_error, Precondition, ReadWriteError, WriteError, }; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; use restate_types::errors::GenericError; @@ -924,7 +924,6 @@ pub struct LogsController { effects: Option>, inner: LogsControllerInner, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, async_operations: JoinSet, find_logs_tail_semaphore: Arc, @@ -934,16 +933,17 @@ impl LogsController { pub async fn init( configuration: &Configuration, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, ) -> Result { // obtain the latest logs or init it with an empty logs variant let logs = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) + metadata_writer + .metadata_store_client() + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + Logs::from_configuration(configuration) + }) }, ) .await?; @@ -965,7 +965,6 @@ impl LogsController { retry_policy, ), bifrost, - metadata_store_client, metadata_writer, async_operations: JoinSet::default(), find_logs_tail_semaphore: Arc::new(Semaphore::new(1)), @@ -984,17 +983,12 @@ impl LogsController { let logs = Arc::clone(&self.inner.current_logs); let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); let find_tail = async move { - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - let mut updates = LogsTailUpdates::default(); for (log_id, chain) in logs.iter() { let tail_segment = chain.tail(); - let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await { + let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await { Ok(loglet) => loglet, Err(BifrostError::Shutdown(_)) => break, Err(err) => { @@ -1098,7 +1092,6 @@ impl LogsController { logs: Arc, mut debounce: Option>, ) { - let metadata_store_client = self.metadata_store_client.clone(); let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn(async move { @@ -1108,7 +1101,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - if let Err(err) = metadata_store_client + if let Err(err) = metadata_writer.metadata_store_client() .put( BIFROST_CONFIG_KEY.clone(), logs.deref(), @@ -1120,7 +1113,7 @@ impl LogsController { WriteError::FailedPrecondition(_) => { debug!("Detected a concurrent modification of logs. Fetching the latest logs now."); // There was a concurrent modification of the logs. Fetch the latest version. - match metadata_store_client + match metadata_writer.metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await { @@ -1166,8 +1159,6 @@ impl LogsController { mut debounce: Option>, ) { let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn( async move { @@ -1177,10 +1168,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - - match bifrost_admin.seal(log_id, segment_index).await { + match bifrost.admin().seal(log_id, segment_index).await { Ok(sealed_segment) => { if sealed_segment.tail.is_sealed() { Event::SealSucceeded { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index b194c75d3..18239bf1d 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -36,8 +36,8 @@ use restate_types::partition_table::{ }; use restate_types::replicated_loglet::ReplicatedLogletParams; -use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_bifrost::{Bifrost, SealedSegment}; +use restate_core::metadata_store::retry_on_network_error; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -79,7 +79,6 @@ pub struct Service { cluster_state_refresher: ClusterStateRefresher, configuration: Live, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, @@ -102,7 +101,6 @@ where router_builder: &mut MessageRouterBuilder, server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, ) -> Self { println!( "CONFIGURATION DEFaAULT IS: {}", @@ -126,7 +124,6 @@ where ClusterControllerHandle { tx: command_tx.clone(), }, - metadata_store_client.clone(), bifrost.clone(), metadata_writer.clone(), )) @@ -144,7 +141,6 @@ where bifrost, cluster_state_refresher, metadata_writer, - metadata_store_client, processor_manager_client, command_tx, command_rx, @@ -313,12 +309,6 @@ impl Service { let mut shutdown = std::pin::pin!(cancellation_watcher()); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let mut state: ClusterControllerState = ClusterControllerState::Follower; self.health_status.update(AdminStatus::Ready); @@ -337,7 +327,7 @@ impl Service { } Some(cmd) = self.command_rx.recv() => { // it is still safe to handle cluster commands as a follower - self.on_cluster_cmd(cmd, bifrost_admin).await; + self.on_cluster_cmd(cmd).await; } _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); @@ -364,8 +354,9 @@ impl Service { let partition_table = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { + self.metadata_writer.metadata_store_client().get_or_insert( + PARTITION_TABLE_KEY.clone(), + || { let partition_table = PartitionTable::with_equally_sized_partitions( Version::MIN, configuration.common.bootstrap_num_partitions.get(), @@ -374,7 +365,8 @@ impl Service { debug!("Initializing the partition table with '{partition_table:?}'"); partition_table - }) + }, + ) }, ) .await?; @@ -446,7 +438,8 @@ impl Service { default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { let logs = match current { Some(logs) => logs, @@ -497,7 +490,8 @@ impl Service { }; let partition_table = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( PARTITION_TABLE_KEY.clone(), |current: Option| { @@ -561,7 +555,6 @@ impl Service { extension, min_version, bifrost: self.bifrost.clone(), - metadata_store_client: self.metadata_store_client.clone(), metadata_writer: self.metadata_writer.clone(), observed_cluster_state: self.observed_cluster_state.clone(), }; @@ -573,11 +566,7 @@ impl Service { }); } - async fn on_cluster_cmd( - &self, - command: ClusterControllerCommand, - bifrost_admin: BifrostAdmin<'_>, - ) { + async fn on_cluster_cmd(&self, command: ClusterControllerCommand) { match command { ClusterControllerCommand::GetClusterState(tx) => { let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); @@ -591,7 +580,7 @@ impl Service { ?log_id, trim_point_inclusive = ?trim_point, "Manual trim log command received"); - let result = bifrost_admin.trim(log_id, trim_point).await; + let result = self.bifrost.admin().trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } ClusterControllerCommand::CreateSnapshot { @@ -719,7 +708,6 @@ struct SealAndExtendTask { extension: Option, bifrost: Bifrost, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, observed_cluster_state: ObservedClusterState, } @@ -730,18 +718,14 @@ impl SealAndExtendTask { .as_ref() .and_then(|ext| ext.segment_index_to_seal); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let (provider, params) = match self.extension.take() { Some(extension) => (extension.provider_kind, extension.params), None => self.next_segment().await?, }; - let sealed_segment = bifrost_admin + let sealed_segment = self + .bifrost + .admin() .seal_and_extend_chain( self.log_id, last_segment_index, @@ -800,7 +784,8 @@ impl SealAndExtendTask { #[cfg(feature = "replicated-loglet")] ProviderConfiguration::Replicated(config) => { let schedule_plan = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(SCHEDULING_PLAN_KEY.clone()) .await?; @@ -862,7 +847,8 @@ mod tests { async fn manual_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let svc = Service::new( @@ -873,7 +859,6 @@ mod tests { &mut builder.router_builder, &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let svc_handle = svc.handle(); @@ -1145,7 +1130,8 @@ mod tests { { restate_types::config::set_current_config(config); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let mut server_builder = NetworkServerBuilder::default(); @@ -1158,7 +1144,6 @@ mod tests { &mut builder.router_builder, &mut server_builder, builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..430ff8124 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -17,10 +17,9 @@ use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{debug, info, warn}; -use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; -use restate_core::{my_node_id, Metadata, MetadataWriter}; +use restate_core::{my_node_id, Metadata}; use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; @@ -135,8 +134,6 @@ pub enum LeaderEvent { pub struct Leader { bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, - metadata_writer: MetadataWriter, logs_watcher: watch::Receiver, partition_table_watcher: watch::Receiver, find_logs_tail_interval: Interval, @@ -156,7 +153,7 @@ where let scheduler = Scheduler::init( &configuration, - service.metadata_store_client.clone(), + service.metadata_writer.metadata_store_client().clone(), service.networking.clone(), ) .await?; @@ -164,7 +161,6 @@ where let logs_controller = LogsController::init( &configuration, service.bifrost.clone(), - service.metadata_store_client.clone(), service.metadata_writer.clone(), ) .await?; @@ -179,8 +175,6 @@ where let metadata = Metadata::current(); let mut leader = Self { bifrost: service.bifrost.clone(), - metadata_store_client: service.metadata_store_client.clone(), - metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), @@ -296,12 +290,6 @@ where } async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> { - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let cluster_state = self.cluster_state_watcher.current(); let mut persisted_lsns_per_partition: BTreeMap< @@ -342,13 +330,13 @@ where if persisted_lsns.len() >= cluster_state.nodes.len() { let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); // trim point is before the oldest record - let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; + let current_trim_point = self.bifrost.get_trim_point(log_id).await?; if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + self.bifrost.admin().trim(log_id, min_persisted_lsn).await? } } else { warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index cd043be8d..39481e6d2 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -11,16 +11,15 @@ pub mod error; mod updater; -use http::Uri; - use std::borrow::Borrow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use http::Uri; use tracing::subscriber::NoSubscriber; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::{Metadata, MetadataWriter}; use restate_service_protocol::discovery::{DiscoverEndpoint, DiscoveredEndpoint, ServiceDiscovery}; use restate_types::identifiers::{DeploymentId, ServiceRevision, SubscriptionId}; @@ -77,7 +76,6 @@ pub enum ModifyServiceChange { /// new deployments. #[derive(Clone)] pub struct SchemaRegistry { - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -87,7 +85,6 @@ pub struct SchemaRegistry { impl SchemaRegistry { pub fn new( - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -95,7 +92,6 @@ impl SchemaRegistry { ) -> Self { Self { metadata_writer, - metadata_store_client, service_discovery, subscription_validator, experimental_feature_kafka_ingress_next, @@ -155,7 +151,8 @@ impl SchemaRegistry { } else { let mut new_deployment_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -195,7 +192,8 @@ impl SchemaRegistry { deployment_id: DeploymentId, ) -> Result<(), SchemaRegistryError> { let schema_registry = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_registry: Option| { @@ -229,7 +227,8 @@ impl SchemaRegistry { changes: Vec, ) -> Result { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -270,7 +269,8 @@ impl SchemaRegistry { subscription_id: SubscriptionId, ) -> Result<(), SchemaRegistryError> { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -370,7 +370,8 @@ where let mut subscription_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index b3c5ecef8..3f3020a62 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,7 +17,6 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tower::ServiceBuilder; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::net_util; use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; @@ -44,7 +43,6 @@ where { pub fn new( metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, subscription_validator: V, service_discovery: ServiceDiscovery, @@ -54,7 +52,6 @@ where Self { bifrost, schema_registry: SchemaRegistry::new( - metadata_store_client, metadata_writer, service_discovery, subscription_validator, diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 06b86ff76..b0da0b240 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -15,7 +15,7 @@ use std::sync::OnceLock; use enum_map::EnumMap; use tracing::instrument; -use restate_core::{Metadata, MetadataKind, TargetVersion}; +use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState}; use restate_types::storage::StorageEncode; @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, InputRecord, LogReadStream, Result}; +use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes /// a sealed loglet while it's tailing a log. @@ -77,20 +77,20 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_in_memory() -> Self { + pub async fn init_in_memory(metadata_writer: MetadataWriter) -> Self { use crate::providers::memory_loglet; - Self::init_with_factory(memory_loglet::Factory::default()).await + Self::init_with_factory(metadata_writer, memory_loglet::Factory::default()).await } #[cfg(any(test, feature = "test-util"))] - pub async fn init_local() -> Self { + pub async fn init_local(metadata_writer: MetadataWriter) -> Self { use restate_types::config::Configuration; use crate::BifrostService; let config = Configuration::updateable(); - let bifrost_svc = BifrostService::new().enable_local_loglet(&config); + let bifrost_svc = BifrostService::new(metadata_writer).enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -102,10 +102,13 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_with_factory(factory: impl crate::loglet::LogletProviderFactory) -> Self { + pub async fn init_with_factory( + metadata_writer: MetadataWriter, + factory: impl crate::loglet::LogletProviderFactory, + ) -> Self { use crate::BifrostService; - let bifrost_svc = BifrostService::new().with_factory(factory); + let bifrost_svc = BifrostService::new(metadata_writer).with_factory(factory); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -116,6 +119,11 @@ impl Bifrost { bifrost } + /// Admin operations of bifrost + pub fn admin(&self) -> BifrostAdmin<'_> { + BifrostAdmin::new(self) + } + /// Appends a single record to a log. The log id must exist, otherwise the /// operation fails with [`Error::UnknownLogId`] /// @@ -302,15 +310,17 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone); pub struct BifrostInner { #[allow(unused)] watchdog: WatchdogSender, + pub(crate) metadata_writer: MetadataWriter, // Initialized after BifrostService::start completes. pub(crate) providers: OnceLock>>>, shutting_down: AtomicBool, } impl BifrostInner { - pub fn new(watchdog: WatchdogSender) -> Self { + pub fn new(watchdog: WatchdogSender, metadata_writer: MetadataWriter) -> Self { Self { watchdog, + metadata_writer, providers: Default::default(), shutting_down: AtomicBool::new(false), } @@ -558,13 +568,12 @@ mod tests { use restate_types::{Version, Versioned}; use crate::providers::memory_loglet::{self}; - use crate::BifrostAdmin; #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { let num_partitions = 5; - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, num_partitions, @@ -572,7 +581,7 @@ mod tests { .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let clean_bifrost_clone = bifrost.clone(); @@ -637,12 +646,12 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_lazy_initialization() -> googletest::Result<()> { - let _ = TestCoreEnv::create_with_single_node(1, 1).await; + let env = TestCoreEnv::create_with_single_node(1, 1).await; let delay = Duration::from_secs(5); // This memory provider adds a delay to its loglet initialization, we want // to ensure that appends do not fail while waiting for the loglet; let factory = memory_loglet::Factory::with_init_delay(delay); - let bifrost = Bifrost::init_with_factory(factory).await; + let bifrost = Bifrost::init_with_factory(env.metadata_writer, factory).await; let start = tokio::time::Instant::now(); let lsn = bifrost @@ -664,12 +673,7 @@ mod tests { .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset()); @@ -681,7 +685,7 @@ mod tests { appender.append("").await?; } - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); @@ -703,7 +707,7 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; + bifrost.admin().trim(LOG_ID, Lsn::MAX).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -737,12 +741,7 @@ mod tests { )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer).await; let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] @@ -765,7 +764,8 @@ mod tests { .unwrap(); // seal the segment - bifrost_admin + bifrost + .admin() .seal(LOG_ID, segment_1.segment_index()) .await?; @@ -925,12 +925,7 @@ mod tests { .build() .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; // create an appender let stop_signal = Arc::new(AtomicBool::default()); @@ -976,7 +971,7 @@ mod tests { } // seal and don't extend the chain. - let _ = bifrost_admin.seal(LOG_ID, SegmentIndex::from(0)).await?; + let _ = bifrost.admin().seal(LOG_ID, SegmentIndex::from(0)).await?; // appends should stall! tokio::time::sleep(Duration::from_millis(100)).await; @@ -998,7 +993,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::Local); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 264b44eb1..ba0c09493 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -8,14 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::Deref; use std::sync::Arc; -use restate_core::metadata_store::retry_on_network_error; use tracing::{debug, info, instrument}; -use restate_core::{Metadata, MetadataKind, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; +use restate_core::metadata_store::retry_on_network_error; +use restate_core::{Metadata, MetadataKind}; use restate_types::config::Configuration; use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; @@ -30,21 +28,6 @@ use crate::{Bifrost, Error, Result}; #[derive(Clone, Copy)] pub struct BifrostAdmin<'a> { bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, -} - -impl<'a> AsRef for BifrostAdmin<'a> { - fn as_ref(&self) -> &Bifrost { - self.bifrost - } -} - -impl<'a> Deref for BifrostAdmin<'a> { - type Target = Bifrost; - fn deref(&self) -> &Self::Target { - self.bifrost - } } #[derive(Debug)] @@ -56,16 +39,8 @@ pub struct SealedSegment { } impl<'a> BifrostAdmin<'a> { - pub fn new( - bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, - ) -> Self { - Self { - bifrost, - metadata_writer, - metadata_store_client, - } + pub fn new(bifrost: &'a Bifrost) -> Self { + Self { bifrost } } /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to @@ -179,7 +154,7 @@ impl<'a> BifrostAdmin<'a> { } pub async fn writeable_loglet(&self, log_id: LogId) -> Result { - self.inner.writeable_loglet(log_id).await + self.bifrost.inner.writeable_loglet(log_id).await } #[instrument(level = "debug", skip(self), err)] @@ -247,9 +222,11 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write(BIFROST_CONFIG_KEY.clone(), |logs: Option| { let logs = logs.ok_or(Error::UnknownLogId(log_id))?; let mut builder = logs.into_builder(); @@ -268,13 +245,16 @@ impl<'a> BifrostAdmin<'a> { .append_segment(base_lsn, provider, params.clone()) .map_err(AdminError::from)?; Ok(builder.build()) - }, - ) + }) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -292,25 +272,33 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write::<_, _, Error>( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { - // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, - // provisioning the empty logs metadata is not our responsibility. - let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - - let mut builder = logs.into_builder(); - builder - .add_log(log_id, Chain::new(provider, params.clone())) - .map_err(AdminError::from)?; - Ok(builder.build()) - }, - ) + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; + + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -323,14 +311,21 @@ impl<'a> BifrostAdmin<'a> { .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client + self.bifrost + .inner + .metadata_writer + .metadata_store_client() .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { Logs::from_configuration(&Configuration::pinned()) }) }) .await?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } } diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index 59487e575..00d9fc359 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -41,7 +41,8 @@ pub trait LogletProvider: Send + Sync { /// Create a loglet client for a given segment and configuration. /// - /// if `chain` is None, this means we no chain exists already for this log. + /// if `chain` is None, the provider should assume that no chain exists already + /// for this log. fn propose_new_loglet_params( &self, log_id: LogId, diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8ac81c3ee..11c32f212 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,14 +454,14 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; + use crate::{BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_readstream_one_loglet() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_provider_kind(ProviderKind::Local) .build() .await; @@ -471,7 +471,7 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = BifrostService::new(env.metadata_writer).enable_local_loglet(&config); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); @@ -548,14 +548,10 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = + BifrostService::new(node_env.metadata_writer.clone()).enable_local_loglet(&config); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -569,7 +565,7 @@ mod tests { } // [1..5] trimmed. trim_point = 5 - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); @@ -590,7 +586,7 @@ mod tests { let tail = bifrost.find_tail(LOG_ID).await?.offset(); // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(LOG_ID).await?; assert_eq!(Lsn::from(10), bifrost.get_trim_point(LOG_ID).await?); // trim point becomes the point before the next slot available for writes (aka. the @@ -643,7 +639,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer.clone()) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); @@ -799,15 +795,10 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -825,7 +816,8 @@ mod tests { // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, @@ -917,7 +909,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..ec6718aa7 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -15,7 +15,9 @@ use anyhow::Context; use enum_map::EnumMap; use tracing::{debug, error, trace}; -use restate_core::{cancellation_watcher, TaskCenter, TaskCenterFutureExt, TaskKind}; +use restate_core::{ + cancellation_watcher, MetadataWriter, TaskCenter, TaskCenterFutureExt, TaskKind, +}; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::metadata::ProviderKind; @@ -34,16 +36,10 @@ pub struct BifrostService { factories: HashMap>, } -impl Default for BifrostService { - fn default() -> Self { - Self::new() - } -} - impl BifrostService { - pub fn new() -> Self { + pub fn new(metadata_writer: MetadataWriter) -> Self { let (watchdog_sender, watchdog_receiver) = tokio::sync::mpsc::unbounded_channel(); - let inner = Arc::new(BifrostInner::new(watchdog_sender.clone())); + let inner = Arc::new(BifrostInner::new(watchdog_sender.clone(), metadata_writer)); let bifrost = Bifrost::new(inner.clone()); let watchdog = Watchdog::new(inner.clone(), watchdog_sender, watchdog_receiver); Self { diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 2244c0ef3..08fde62e1 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -29,7 +29,7 @@ use restate_types::schema::Schema; use restate_types::{GenerationalNodeId, Version, Versioned}; use crate::metadata::manager::Command; -use crate::metadata_store::ReadError; +use crate::metadata_store::{MetadataStoreClient, ReadError}; use crate::network::WeakConnection; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; @@ -335,6 +335,7 @@ struct MetadataInner { /// so it's safe to call update_* without checking the current version. #[derive(Clone)] pub struct MetadataWriter { + metadata_store_client: MetadataStoreClient, sender: manager::CommandSender, /// strictly used to set my node id. Do not use this to update metadata /// directly to avoid race conditions. @@ -342,8 +343,20 @@ pub struct MetadataWriter { } impl MetadataWriter { - fn new(sender: manager::CommandSender, inner: Arc) -> Self { - Self { sender, inner } + fn new( + sender: manager::CommandSender, + metadata_store_client: MetadataStoreClient, + inner: Arc, + ) -> Self { + Self { + metadata_store_client, + sender, + inner, + } + } + + pub fn metadata_store_client(&self) -> &MetadataStoreClient { + &self.metadata_store_client } // Returns when the nodes configuration update is performed. diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 3c9773f99..baf248d3d 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -258,7 +258,11 @@ impl MetadataManager { } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone()) + MetadataWriter::new( + self.metadata.sender.clone(), + self.metadata_store_client.clone(), + self.metadata.inner.clone(), + ) } /// Start and wait for shutdown signal. diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..891e19d6d 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -105,7 +105,6 @@ pub trait MetadataStore { /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. #[derive(Clone)] pub struct MetadataStoreClient { - // premature optimization? Maybe introduce trait object once we have multiple implementations? inner: Arc, backoff_policy: Option, } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..7dbc273a0 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -184,7 +184,8 @@ impl Node { record_cache.clone(), &mut router_builder, ); - let bifrost_svc = BifrostService::new().enable_local_loglet(&updateable_config); + let bifrost_svc = + BifrostService::new(metadata_manager.writer()).enable_local_loglet(&updateable_config); #[cfg(feature = "replicated-loglet")] let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory); @@ -271,7 +272,6 @@ impl Node { metadata_manager.writer(), &mut server_builder, &mut router_builder, - metadata_store_client.clone(), worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eec..89cedd8af 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -14,7 +14,6 @@ use codederror::CodedError; use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::MessageRouterBuilder; use restate_core::network::NetworkServerBuilder; use restate_core::network::Networking; @@ -70,7 +69,6 @@ impl AdminRole { metadata_writer: MetadataWriter, server_builder: &mut NetworkServerBuilder, router_builder: &mut MessageRouterBuilder, - metadata_store_client: MetadataStoreClient, local_query_context: Option, ) -> Result { health_status.update(AdminStatus::StartingUp); @@ -104,7 +102,6 @@ impl AdminRole { let admin = AdminService::new( metadata_writer.clone(), - metadata_store_client.clone(), bifrost.clone(), config.ingress.clone(), service_discovery, @@ -121,7 +118,6 @@ impl AdminRole { router_builder, server_builder, metadata_writer, - metadata_store_client, )) } else { None diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 884a22866..ecb513c56 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -213,14 +213,14 @@ mod tests { // Start paused makes sure the timer is immediately fired #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let _env = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 105e5a6a4..6d8dc4842 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -637,12 +637,12 @@ mod tests { #[test(restate_core::test)] async fn become_leader_then_step_down() -> googletest::Result<()> { - let _env = TestCoreEnv::create_with_single_node(0, 0).await; + let env = TestCoreEnv::create_with_single_node(0, 0).await; let storage_options = StorageOptions::default(); let rocksdb_options = RocksDbOptions::default(); RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let partition_store_manager = PartitionStoreManager::create( Constant::new(storage_options.clone()).boxed(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 571812edd..6834c0e40 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -344,6 +344,7 @@ where // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); + let mut log_reader = self .bifrost .create_reader( diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index e4ac810d0..6b378b33e 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -628,7 +628,7 @@ mod tests { let (truncation_tx, _truncation_rx) = mpsc::channel(1); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer.clone()).await; let shuffle = Shuffle::new(metadata, outbox_reader, truncation_tx, 1, bifrost.clone()); ShuffleEnv { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 1a0492d89..9965ceaf8 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -975,7 +975,8 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(env_builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let partition_store_manager = PartitionStoreManager::create( diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 628707454..c11c9a1a0 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -15,7 +15,7 @@ use enumset::{enum_set, EnumSet}; use googletest::internal::test_outcome::TestAssertionFailure; use googletest::IntoTestResult; -use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin}; +use restate_bifrost::{loglet::Loglet, Bifrost}; use restate_core::metadata_store::Precondition; use restate_core::TaskCenter; use restate_core::{metadata_store::MetadataStoreClient, MetadataWriter}; @@ -92,16 +92,6 @@ pub struct TestEnv { pub cluster: StartedCluster, } -impl TestEnv { - pub fn bifrost_admin(&self) -> BifrostAdmin<'_> { - BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ) - } -} - pub async fn run_in_test_env( mut base_config: Configuration, sequencer: GenerationalNodeId, diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 8fa79c7f2..f3656d2c9 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -248,23 +248,13 @@ mod tests { } let mut sealer_handle: JoinHandle> = tokio::task::spawn({ - let (bifrost, metadata_writer, metadata_store_client) = ( - test_env.bifrost.clone(), - test_env.metadata_writer.clone(), - test_env.metadata_store_client.clone() - ); + let bifrost = test_env.bifrost.clone(); async move { let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); - let bifrost_admin = restate_bifrost::BifrostAdmin::new( - &bifrost, - &metadata_writer, - &metadata_store_client, - ); - let mut last_loglet_id = None; while !cancellation_token.is_cancelled() { @@ -280,7 +270,8 @@ mod tests { eprintln!("Sealing loglet {} and creating new loglet {}", params.loglet_id, params.loglet_id.next()); params.loglet_id = params.loglet_id.next(); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( log_id, None, diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index 33d5f8d21..f9512f964 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -176,7 +176,7 @@ fn spawn_environment(config: Live, num_logs: u16) -> (task_center metadata_writer.submit(Arc::new(logs)); spawn_metadata_manager(metadata_manager).expect("metadata manager starts"); - let bifrost_svc = BifrostService::new() + let bifrost_svc = BifrostService::new(metadata_writer) .enable_in_memory_loglet() .enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index cb4219513..213e328fa 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -86,6 +86,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); + let metadata_writer = metadata_manager.writer(); let mut router_builder = MessageRouterBuilder::default(); metadata_manager.register_in_message_router(&mut router_builder); @@ -95,7 +96,8 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { metadata_manager.run(), )?; - let bifrost_svc = BifrostService::new().enable_local_loglet(&Configuration::updateable()); + let bifrost_svc = + BifrostService::new(metadata_writer).enable_local_loglet(&Configuration::updateable()); let bifrost = bifrost_svc.handle(); // Ensures bifrost has initial metadata synced up before starting the worker. diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index ee702beec..e556aa82c 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -103,11 +103,10 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { // We start the Meta service, then download the openapi schema generated let node_env = TestCoreEnv::create_with_single_node(1, 1).await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; let admin_service = AdminService::new( node_env.metadata_writer.clone(), - node_env.metadata_store_client.clone(), bifrost, Mock, ServiceDiscovery::new(