From a99c045848312b895afb2e017af00bf1a51a3960 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 24 Dec 2024 16:11:10 +0000 Subject: [PATCH] [Core] MetadataWriter access to MetadataStore and Bifrost access to MetadataWriter This PR introduces two key changes: 1. Bifrost now can provide `bifrost.admin()` to get easy access to bifrost's admin interface withouto passing metadata writer and metadata store client explicitly. 2. MetadataWriter now owns MetadataStoreClient. This means that you can always access metadata store client directly if you have metadata_writer which used to be a pair of types we _always_ pass together. This also opens a path for a future where MetadataWriter wraps the metadata store client and automatically update metadata manager on successful writes but that's beyond the scope of this PR. What this PR provides is a direct access to the underlying metadata_store_client via an accessor function. --- .../cluster_controller/grpc_svc_handler.rs | 25 ++--- .../src/cluster_controller/logs_controller.rs | 34 ++---- .../admin/src/cluster_controller/service.rs | 61 ++++------- .../src/cluster_controller/service/state.rs | 22 +--- crates/admin/src/schema_registry/mod.rs | 23 ++-- crates/admin/src/service.rs | 3 - crates/bifrost/src/bifrost.rs | 70 ++++++------ crates/bifrost/src/bifrost_admin.rs | 101 +++++++++--------- crates/bifrost/src/loglet/provider.rs | 3 +- crates/bifrost/src/read_stream.rs | 32 +++--- crates/bifrost/src/service.rs | 14 +-- crates/core/src/metadata.rs | 19 +++- crates/core/src/metadata/manager.rs | 6 +- crates/core/src/metadata_store.rs | 1 - crates/node/src/lib.rs | 4 +- crates/node/src/roles/admin.rs | 4 - crates/worker/src/partition/cleaner.rs | 4 +- crates/worker/src/partition/leadership/mod.rs | 4 +- crates/worker/src/partition/mod.rs | 1 + crates/worker/src/partition/shuffle.rs | 2 +- .../worker/src/partition_processor_manager.rs | 3 +- server/tests/common/replicated_loglet.rs | 12 +-- server/tests/replicated_loglet.rs | 15 +-- tools/bifrost-benchpress/src/main.rs | 2 +- tools/restatectl/src/commands/log/dump_log.rs | 4 +- tools/xtask/src/main.rs | 3 +- 26 files changed, 201 insertions(+), 271 deletions(-) diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c8202393..c685a102bf 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError}; +use restate_bifrost::{Bifrost, Error as BiforstError}; use restate_core::{Metadata, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::{Logs, SegmentIndex}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; @@ -44,7 +43,6 @@ use super::service::ChainExtension; use super::ClusterControllerHandle; pub(crate) struct ClusterCtrlSvcHandler { - metadata_store_client: MetadataStoreClient, controller_handle: ClusterControllerHandle, bifrost: Bifrost, metadata_writer: MetadataWriter, @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler { impl ClusterCtrlSvcHandler { pub fn new( controller_handle: ClusterControllerHandle, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, metadata_writer: MetadataWriter, ) -> Self { Self { controller_handle, - metadata_store_client, bifrost, metadata_writer, } } async fn get_logs(&self) -> Result { - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await .map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))? @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let (trim_point, nodes_config) = tokio::join!( self.bifrost.get_trim_point(log_id), - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()), ); @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { _request: Request, ) -> Result, Status> { let nodes_config = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()) .await .map_err(|error| { @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let request = request.into_inner(); let log_id: LogId = request.log_id.into(); - let admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - - let writable_loglet = admin + let writable_loglet = self + .bifrost + .admin() .writeable_loglet(log_id) .await .map_err(|err| match err { diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 4f1f7db6e2..5935181af1 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -25,9 +25,9 @@ use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; +use restate_bifrost::{Bifrost, Error as BifrostError}; use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, + retry_on_network_error, Precondition, ReadWriteError, WriteError, }; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; use restate_types::errors::GenericError; @@ -924,7 +924,6 @@ pub struct LogsController { effects: Option>, inner: LogsControllerInner, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, async_operations: JoinSet, find_logs_tail_semaphore: Arc, @@ -934,16 +933,17 @@ impl LogsController { pub async fn init( configuration: &Configuration, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, ) -> Result { // obtain the latest logs or init it with an empty logs variant let logs = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) + metadata_writer + .metadata_store_client() + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + Logs::from_configuration(configuration) + }) }, ) .await?; @@ -965,7 +965,6 @@ impl LogsController { retry_policy, ), bifrost, - metadata_store_client, metadata_writer, async_operations: JoinSet::default(), find_logs_tail_semaphore: Arc::new(Semaphore::new(1)), @@ -984,17 +983,12 @@ impl LogsController { let logs = Arc::clone(&self.inner.current_logs); let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); let find_tail = async move { - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - let mut updates = LogsTailUpdates::default(); for (log_id, chain) in logs.iter() { let tail_segment = chain.tail(); - let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await { + let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await { Ok(loglet) => loglet, Err(BifrostError::Shutdown(_)) => break, Err(err) => { @@ -1098,7 +1092,6 @@ impl LogsController { logs: Arc, mut debounce: Option>, ) { - let metadata_store_client = self.metadata_store_client.clone(); let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn(async move { @@ -1108,7 +1101,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - if let Err(err) = metadata_store_client + if let Err(err) = metadata_writer.metadata_store_client() .put( BIFROST_CONFIG_KEY.clone(), logs.deref(), @@ -1120,7 +1113,7 @@ impl LogsController { WriteError::FailedPrecondition(_) => { debug!("Detected a concurrent modification of logs. Fetching the latest logs now."); // There was a concurrent modification of the logs. Fetch the latest version. - match metadata_store_client + match metadata_writer.metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await { @@ -1166,8 +1159,6 @@ impl LogsController { mut debounce: Option>, ) { let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn( async move { @@ -1177,10 +1168,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - - match bifrost_admin.seal(log_id, segment_index).await { + match bifrost.admin().seal(log_id, segment_index).await { Ok(sealed_segment) => { if sealed_segment.tail.is_sealed() { Event::SealSucceeded { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index b194c75d3e..18239bf1dd 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -36,8 +36,8 @@ use restate_types::partition_table::{ }; use restate_types::replicated_loglet::ReplicatedLogletParams; -use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_bifrost::{Bifrost, SealedSegment}; +use restate_core::metadata_store::retry_on_network_error; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -79,7 +79,6 @@ pub struct Service { cluster_state_refresher: ClusterStateRefresher, configuration: Live, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, @@ -102,7 +101,6 @@ where router_builder: &mut MessageRouterBuilder, server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, ) -> Self { println!( "CONFIGURATION DEFaAULT IS: {}", @@ -126,7 +124,6 @@ where ClusterControllerHandle { tx: command_tx.clone(), }, - metadata_store_client.clone(), bifrost.clone(), metadata_writer.clone(), )) @@ -144,7 +141,6 @@ where bifrost, cluster_state_refresher, metadata_writer, - metadata_store_client, processor_manager_client, command_tx, command_rx, @@ -313,12 +309,6 @@ impl Service { let mut shutdown = std::pin::pin!(cancellation_watcher()); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let mut state: ClusterControllerState = ClusterControllerState::Follower; self.health_status.update(AdminStatus::Ready); @@ -337,7 +327,7 @@ impl Service { } Some(cmd) = self.command_rx.recv() => { // it is still safe to handle cluster commands as a follower - self.on_cluster_cmd(cmd, bifrost_admin).await; + self.on_cluster_cmd(cmd).await; } _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); @@ -364,8 +354,9 @@ impl Service { let partition_table = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { + self.metadata_writer.metadata_store_client().get_or_insert( + PARTITION_TABLE_KEY.clone(), + || { let partition_table = PartitionTable::with_equally_sized_partitions( Version::MIN, configuration.common.bootstrap_num_partitions.get(), @@ -374,7 +365,8 @@ impl Service { debug!("Initializing the partition table with '{partition_table:?}'"); partition_table - }) + }, + ) }, ) .await?; @@ -446,7 +438,8 @@ impl Service { default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { let logs = match current { Some(logs) => logs, @@ -497,7 +490,8 @@ impl Service { }; let partition_table = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( PARTITION_TABLE_KEY.clone(), |current: Option| { @@ -561,7 +555,6 @@ impl Service { extension, min_version, bifrost: self.bifrost.clone(), - metadata_store_client: self.metadata_store_client.clone(), metadata_writer: self.metadata_writer.clone(), observed_cluster_state: self.observed_cluster_state.clone(), }; @@ -573,11 +566,7 @@ impl Service { }); } - async fn on_cluster_cmd( - &self, - command: ClusterControllerCommand, - bifrost_admin: BifrostAdmin<'_>, - ) { + async fn on_cluster_cmd(&self, command: ClusterControllerCommand) { match command { ClusterControllerCommand::GetClusterState(tx) => { let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); @@ -591,7 +580,7 @@ impl Service { ?log_id, trim_point_inclusive = ?trim_point, "Manual trim log command received"); - let result = bifrost_admin.trim(log_id, trim_point).await; + let result = self.bifrost.admin().trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } ClusterControllerCommand::CreateSnapshot { @@ -719,7 +708,6 @@ struct SealAndExtendTask { extension: Option, bifrost: Bifrost, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, observed_cluster_state: ObservedClusterState, } @@ -730,18 +718,14 @@ impl SealAndExtendTask { .as_ref() .and_then(|ext| ext.segment_index_to_seal); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let (provider, params) = match self.extension.take() { Some(extension) => (extension.provider_kind, extension.params), None => self.next_segment().await?, }; - let sealed_segment = bifrost_admin + let sealed_segment = self + .bifrost + .admin() .seal_and_extend_chain( self.log_id, last_segment_index, @@ -800,7 +784,8 @@ impl SealAndExtendTask { #[cfg(feature = "replicated-loglet")] ProviderConfiguration::Replicated(config) => { let schedule_plan = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(SCHEDULING_PLAN_KEY.clone()) .await?; @@ -862,7 +847,8 @@ mod tests { async fn manual_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let svc = Service::new( @@ -873,7 +859,6 @@ mod tests { &mut builder.router_builder, &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let svc_handle = svc.handle(); @@ -1145,7 +1130,8 @@ mod tests { { restate_types::config::set_current_config(config); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let mut server_builder = NetworkServerBuilder::default(); @@ -1158,7 +1144,6 @@ mod tests { &mut builder.router_builder, &mut server_builder, builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b55488393..430ff8124e 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -17,10 +17,9 @@ use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{debug, info, warn}; -use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; -use restate_core::{my_node_id, Metadata, MetadataWriter}; +use restate_core::{my_node_id, Metadata}; use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; @@ -135,8 +134,6 @@ pub enum LeaderEvent { pub struct Leader { bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, - metadata_writer: MetadataWriter, logs_watcher: watch::Receiver, partition_table_watcher: watch::Receiver, find_logs_tail_interval: Interval, @@ -156,7 +153,7 @@ where let scheduler = Scheduler::init( &configuration, - service.metadata_store_client.clone(), + service.metadata_writer.metadata_store_client().clone(), service.networking.clone(), ) .await?; @@ -164,7 +161,6 @@ where let logs_controller = LogsController::init( &configuration, service.bifrost.clone(), - service.metadata_store_client.clone(), service.metadata_writer.clone(), ) .await?; @@ -179,8 +175,6 @@ where let metadata = Metadata::current(); let mut leader = Self { bifrost: service.bifrost.clone(), - metadata_store_client: service.metadata_store_client.clone(), - metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), @@ -296,12 +290,6 @@ where } async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> { - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let cluster_state = self.cluster_state_watcher.current(); let mut persisted_lsns_per_partition: BTreeMap< @@ -342,13 +330,13 @@ where if persisted_lsns.len() >= cluster_state.nodes.len() { let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); // trim point is before the oldest record - let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; + let current_trim_point = self.bifrost.get_trim_point(log_id).await?; if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + self.bifrost.admin().trim(log_id, min_persisted_lsn).await? } } else { warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index cd043be8d5..39481e6d27 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -11,16 +11,15 @@ pub mod error; mod updater; -use http::Uri; - use std::borrow::Borrow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use http::Uri; use tracing::subscriber::NoSubscriber; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::{Metadata, MetadataWriter}; use restate_service_protocol::discovery::{DiscoverEndpoint, DiscoveredEndpoint, ServiceDiscovery}; use restate_types::identifiers::{DeploymentId, ServiceRevision, SubscriptionId}; @@ -77,7 +76,6 @@ pub enum ModifyServiceChange { /// new deployments. #[derive(Clone)] pub struct SchemaRegistry { - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -87,7 +85,6 @@ pub struct SchemaRegistry { impl SchemaRegistry { pub fn new( - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -95,7 +92,6 @@ impl SchemaRegistry { ) -> Self { Self { metadata_writer, - metadata_store_client, service_discovery, subscription_validator, experimental_feature_kafka_ingress_next, @@ -155,7 +151,8 @@ impl SchemaRegistry { } else { let mut new_deployment_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -195,7 +192,8 @@ impl SchemaRegistry { deployment_id: DeploymentId, ) -> Result<(), SchemaRegistryError> { let schema_registry = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_registry: Option| { @@ -229,7 +227,8 @@ impl SchemaRegistry { changes: Vec, ) -> Result { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -270,7 +269,8 @@ impl SchemaRegistry { subscription_id: SubscriptionId, ) -> Result<(), SchemaRegistryError> { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -370,7 +370,8 @@ where let mut subscription_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index b3c5ecef82..3f3020a629 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,7 +17,6 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tower::ServiceBuilder; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::net_util; use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; @@ -44,7 +43,6 @@ where { pub fn new( metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, subscription_validator: V, service_discovery: ServiceDiscovery, @@ -54,7 +52,6 @@ where Self { bifrost, schema_registry: SchemaRegistry::new( - metadata_store_client, metadata_writer, service_discovery, subscription_validator, diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 06b86ff761..b0da0b2405 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -15,7 +15,7 @@ use std::sync::OnceLock; use enum_map::EnumMap; use tracing::instrument; -use restate_core::{Metadata, MetadataKind, TargetVersion}; +use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState}; use restate_types::storage::StorageEncode; @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, InputRecord, LogReadStream, Result}; +use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes /// a sealed loglet while it's tailing a log. @@ -77,20 +77,20 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_in_memory() -> Self { + pub async fn init_in_memory(metadata_writer: MetadataWriter) -> Self { use crate::providers::memory_loglet; - Self::init_with_factory(memory_loglet::Factory::default()).await + Self::init_with_factory(metadata_writer, memory_loglet::Factory::default()).await } #[cfg(any(test, feature = "test-util"))] - pub async fn init_local() -> Self { + pub async fn init_local(metadata_writer: MetadataWriter) -> Self { use restate_types::config::Configuration; use crate::BifrostService; let config = Configuration::updateable(); - let bifrost_svc = BifrostService::new().enable_local_loglet(&config); + let bifrost_svc = BifrostService::new(metadata_writer).enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -102,10 +102,13 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_with_factory(factory: impl crate::loglet::LogletProviderFactory) -> Self { + pub async fn init_with_factory( + metadata_writer: MetadataWriter, + factory: impl crate::loglet::LogletProviderFactory, + ) -> Self { use crate::BifrostService; - let bifrost_svc = BifrostService::new().with_factory(factory); + let bifrost_svc = BifrostService::new(metadata_writer).with_factory(factory); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -116,6 +119,11 @@ impl Bifrost { bifrost } + /// Admin operations of bifrost + pub fn admin(&self) -> BifrostAdmin<'_> { + BifrostAdmin::new(self) + } + /// Appends a single record to a log. The log id must exist, otherwise the /// operation fails with [`Error::UnknownLogId`] /// @@ -302,15 +310,17 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone); pub struct BifrostInner { #[allow(unused)] watchdog: WatchdogSender, + pub(crate) metadata_writer: MetadataWriter, // Initialized after BifrostService::start completes. pub(crate) providers: OnceLock>>>, shutting_down: AtomicBool, } impl BifrostInner { - pub fn new(watchdog: WatchdogSender) -> Self { + pub fn new(watchdog: WatchdogSender, metadata_writer: MetadataWriter) -> Self { Self { watchdog, + metadata_writer, providers: Default::default(), shutting_down: AtomicBool::new(false), } @@ -558,13 +568,12 @@ mod tests { use restate_types::{Version, Versioned}; use crate::providers::memory_loglet::{self}; - use crate::BifrostAdmin; #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { let num_partitions = 5; - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, num_partitions, @@ -572,7 +581,7 @@ mod tests { .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let clean_bifrost_clone = bifrost.clone(); @@ -637,12 +646,12 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_lazy_initialization() -> googletest::Result<()> { - let _ = TestCoreEnv::create_with_single_node(1, 1).await; + let env = TestCoreEnv::create_with_single_node(1, 1).await; let delay = Duration::from_secs(5); // This memory provider adds a delay to its loglet initialization, we want // to ensure that appends do not fail while waiting for the loglet; let factory = memory_loglet::Factory::with_init_delay(delay); - let bifrost = Bifrost::init_with_factory(factory).await; + let bifrost = Bifrost::init_with_factory(env.metadata_writer, factory).await; let start = tokio::time::Instant::now(); let lsn = bifrost @@ -664,12 +673,7 @@ mod tests { .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset()); @@ -681,7 +685,7 @@ mod tests { appender.append("").await?; } - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); @@ -703,7 +707,7 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; + bifrost.admin().trim(LOG_ID, Lsn::MAX).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -737,12 +741,7 @@ mod tests { )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer).await; let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] @@ -765,7 +764,8 @@ mod tests { .unwrap(); // seal the segment - bifrost_admin + bifrost + .admin() .seal(LOG_ID, segment_1.segment_index()) .await?; @@ -925,12 +925,7 @@ mod tests { .build() .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; // create an appender let stop_signal = Arc::new(AtomicBool::default()); @@ -976,7 +971,7 @@ mod tests { } // seal and don't extend the chain. - let _ = bifrost_admin.seal(LOG_ID, SegmentIndex::from(0)).await?; + let _ = bifrost.admin().seal(LOG_ID, SegmentIndex::from(0)).await?; // appends should stall! tokio::time::sleep(Duration::from_millis(100)).await; @@ -998,7 +993,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::Local); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 264b44eb16..ba0c094930 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -8,14 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::Deref; use std::sync::Arc; -use restate_core::metadata_store::retry_on_network_error; use tracing::{debug, info, instrument}; -use restate_core::{Metadata, MetadataKind, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; +use restate_core::metadata_store::retry_on_network_error; +use restate_core::{Metadata, MetadataKind}; use restate_types::config::Configuration; use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; @@ -30,21 +28,6 @@ use crate::{Bifrost, Error, Result}; #[derive(Clone, Copy)] pub struct BifrostAdmin<'a> { bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, -} - -impl<'a> AsRef for BifrostAdmin<'a> { - fn as_ref(&self) -> &Bifrost { - self.bifrost - } -} - -impl<'a> Deref for BifrostAdmin<'a> { - type Target = Bifrost; - fn deref(&self) -> &Self::Target { - self.bifrost - } } #[derive(Debug)] @@ -56,16 +39,8 @@ pub struct SealedSegment { } impl<'a> BifrostAdmin<'a> { - pub fn new( - bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, - ) -> Self { - Self { - bifrost, - metadata_writer, - metadata_store_client, - } + pub fn new(bifrost: &'a Bifrost) -> Self { + Self { bifrost } } /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to @@ -179,7 +154,7 @@ impl<'a> BifrostAdmin<'a> { } pub async fn writeable_loglet(&self, log_id: LogId) -> Result { - self.inner.writeable_loglet(log_id).await + self.bifrost.inner.writeable_loglet(log_id).await } #[instrument(level = "debug", skip(self), err)] @@ -247,9 +222,11 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write(BIFROST_CONFIG_KEY.clone(), |logs: Option| { let logs = logs.ok_or(Error::UnknownLogId(log_id))?; let mut builder = logs.into_builder(); @@ -268,13 +245,16 @@ impl<'a> BifrostAdmin<'a> { .append_segment(base_lsn, provider, params.clone()) .map_err(AdminError::from)?; Ok(builder.build()) - }, - ) + }) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -292,25 +272,33 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write::<_, _, Error>( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { - // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, - // provisioning the empty logs metadata is not our responsibility. - let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - - let mut builder = logs.into_builder(); - builder - .add_log(log_id, Chain::new(provider, params.clone())) - .map_err(AdminError::from)?; - Ok(builder.build()) - }, - ) + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; + + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -323,14 +311,21 @@ impl<'a> BifrostAdmin<'a> { .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client + self.bifrost + .inner + .metadata_writer + .metadata_store_client() .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { Logs::from_configuration(&Configuration::pinned()) }) }) .await?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } } diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index 59487e575b..00d9fc359f 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -41,7 +41,8 @@ pub trait LogletProvider: Send + Sync { /// Create a loglet client for a given segment and configuration. /// - /// if `chain` is None, this means we no chain exists already for this log. + /// if `chain` is None, the provider should assume that no chain exists already + /// for this log. fn propose_new_loglet_params( &self, log_id: LogId, diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8ac81c3ee7..11c32f212b 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,14 +454,14 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; + use crate::{BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_readstream_one_loglet() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_provider_kind(ProviderKind::Local) .build() .await; @@ -471,7 +471,7 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = BifrostService::new(env.metadata_writer).enable_local_loglet(&config); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); @@ -548,14 +548,10 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = + BifrostService::new(node_env.metadata_writer.clone()).enable_local_loglet(&config); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -569,7 +565,7 @@ mod tests { } // [1..5] trimmed. trim_point = 5 - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); @@ -590,7 +586,7 @@ mod tests { let tail = bifrost.find_tail(LOG_ID).await?.offset(); // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(LOG_ID).await?; assert_eq!(Lsn::from(10), bifrost.get_trim_point(LOG_ID).await?); // trim point becomes the point before the next slot available for writes (aka. the @@ -643,7 +639,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer.clone()) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); @@ -799,15 +795,10 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -825,7 +816,8 @@ mod tests { // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, @@ -917,7 +909,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c26..ec6718aa7d 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -15,7 +15,9 @@ use anyhow::Context; use enum_map::EnumMap; use tracing::{debug, error, trace}; -use restate_core::{cancellation_watcher, TaskCenter, TaskCenterFutureExt, TaskKind}; +use restate_core::{ + cancellation_watcher, MetadataWriter, TaskCenter, TaskCenterFutureExt, TaskKind, +}; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::metadata::ProviderKind; @@ -34,16 +36,10 @@ pub struct BifrostService { factories: HashMap>, } -impl Default for BifrostService { - fn default() -> Self { - Self::new() - } -} - impl BifrostService { - pub fn new() -> Self { + pub fn new(metadata_writer: MetadataWriter) -> Self { let (watchdog_sender, watchdog_receiver) = tokio::sync::mpsc::unbounded_channel(); - let inner = Arc::new(BifrostInner::new(watchdog_sender.clone())); + let inner = Arc::new(BifrostInner::new(watchdog_sender.clone(), metadata_writer)); let bifrost = Bifrost::new(inner.clone()); let watchdog = Watchdog::new(inner.clone(), watchdog_sender, watchdog_receiver); Self { diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 2244c0ef3e..08fde62e1c 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -29,7 +29,7 @@ use restate_types::schema::Schema; use restate_types::{GenerationalNodeId, Version, Versioned}; use crate::metadata::manager::Command; -use crate::metadata_store::ReadError; +use crate::metadata_store::{MetadataStoreClient, ReadError}; use crate::network::WeakConnection; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; @@ -335,6 +335,7 @@ struct MetadataInner { /// so it's safe to call update_* without checking the current version. #[derive(Clone)] pub struct MetadataWriter { + metadata_store_client: MetadataStoreClient, sender: manager::CommandSender, /// strictly used to set my node id. Do not use this to update metadata /// directly to avoid race conditions. @@ -342,8 +343,20 @@ pub struct MetadataWriter { } impl MetadataWriter { - fn new(sender: manager::CommandSender, inner: Arc) -> Self { - Self { sender, inner } + fn new( + sender: manager::CommandSender, + metadata_store_client: MetadataStoreClient, + inner: Arc, + ) -> Self { + Self { + metadata_store_client, + sender, + inner, + } + } + + pub fn metadata_store_client(&self) -> &MetadataStoreClient { + &self.metadata_store_client } // Returns when the nodes configuration update is performed. diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 3c9773f99b..baf248d3d3 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -258,7 +258,11 @@ impl MetadataManager { } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone()) + MetadataWriter::new( + self.metadata.sender.clone(), + self.metadata_store_client.clone(), + self.metadata.inner.clone(), + ) } /// Start and wait for shutdown signal. diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5f..891e19d6d1 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -105,7 +105,6 @@ pub trait MetadataStore { /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. #[derive(Clone)] pub struct MetadataStoreClient { - // premature optimization? Maybe introduce trait object once we have multiple implementations? inner: Arc, backoff_policy: Option, } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a7..7dbc273a07 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -184,7 +184,8 @@ impl Node { record_cache.clone(), &mut router_builder, ); - let bifrost_svc = BifrostService::new().enable_local_loglet(&updateable_config); + let bifrost_svc = + BifrostService::new(metadata_manager.writer()).enable_local_loglet(&updateable_config); #[cfg(feature = "replicated-loglet")] let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory); @@ -271,7 +272,6 @@ impl Node { metadata_manager.writer(), &mut server_builder, &mut router_builder, - metadata_store_client.clone(), worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eeca..89cedd8afe 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -14,7 +14,6 @@ use codederror::CodedError; use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::MessageRouterBuilder; use restate_core::network::NetworkServerBuilder; use restate_core::network::Networking; @@ -70,7 +69,6 @@ impl AdminRole { metadata_writer: MetadataWriter, server_builder: &mut NetworkServerBuilder, router_builder: &mut MessageRouterBuilder, - metadata_store_client: MetadataStoreClient, local_query_context: Option, ) -> Result { health_status.update(AdminStatus::StartingUp); @@ -104,7 +102,6 @@ impl AdminRole { let admin = AdminService::new( metadata_writer.clone(), - metadata_store_client.clone(), bifrost.clone(), config.ingress.clone(), service_discovery, @@ -121,7 +118,6 @@ impl AdminRole { router_builder, server_builder, metadata_writer, - metadata_store_client, )) } else { None diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 884a22866d..ecb513c56c 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -213,14 +213,14 @@ mod tests { // Start paused makes sure the timer is immediately fired #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let _env = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 105e5a6a4f..6d8dc48423 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -637,12 +637,12 @@ mod tests { #[test(restate_core::test)] async fn become_leader_then_step_down() -> googletest::Result<()> { - let _env = TestCoreEnv::create_with_single_node(0, 0).await; + let env = TestCoreEnv::create_with_single_node(0, 0).await; let storage_options = StorageOptions::default(); let rocksdb_options = RocksDbOptions::default(); RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let partition_store_manager = PartitionStoreManager::create( Constant::new(storage_options.clone()).boxed(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 571812eddf..6834c0e40b 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -344,6 +344,7 @@ where // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); + let mut log_reader = self .bifrost .create_reader( diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index e4ac810d03..6b378b33e6 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -628,7 +628,7 @@ mod tests { let (truncation_tx, _truncation_rx) = mpsc::channel(1); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer.clone()).await; let shuffle = Shuffle::new(metadata, outbox_reader, truncation_tx, 1, bifrost.clone()); ShuffleEnv { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 1a0492d89d..9965ceaf81 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -975,7 +975,8 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(env_builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let partition_store_manager = PartitionStoreManager::create( diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 6287074541..c11c9a1a0b 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -15,7 +15,7 @@ use enumset::{enum_set, EnumSet}; use googletest::internal::test_outcome::TestAssertionFailure; use googletest::IntoTestResult; -use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin}; +use restate_bifrost::{loglet::Loglet, Bifrost}; use restate_core::metadata_store::Precondition; use restate_core::TaskCenter; use restate_core::{metadata_store::MetadataStoreClient, MetadataWriter}; @@ -92,16 +92,6 @@ pub struct TestEnv { pub cluster: StartedCluster, } -impl TestEnv { - pub fn bifrost_admin(&self) -> BifrostAdmin<'_> { - BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ) - } -} - pub async fn run_in_test_env( mut base_config: Configuration, sequencer: GenerationalNodeId, diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 8fa79c7f26..f3656d2c97 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -248,23 +248,13 @@ mod tests { } let mut sealer_handle: JoinHandle> = tokio::task::spawn({ - let (bifrost, metadata_writer, metadata_store_client) = ( - test_env.bifrost.clone(), - test_env.metadata_writer.clone(), - test_env.metadata_store_client.clone() - ); + let bifrost = test_env.bifrost.clone(); async move { let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); - let bifrost_admin = restate_bifrost::BifrostAdmin::new( - &bifrost, - &metadata_writer, - &metadata_store_client, - ); - let mut last_loglet_id = None; while !cancellation_token.is_cancelled() { @@ -280,7 +270,8 @@ mod tests { eprintln!("Sealing loglet {} and creating new loglet {}", params.loglet_id, params.loglet_id.next()); params.loglet_id = params.loglet_id.next(); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( log_id, None, diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index 33d5f8d21f..f9512f9644 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -176,7 +176,7 @@ fn spawn_environment(config: Live, num_logs: u16) -> (task_center metadata_writer.submit(Arc::new(logs)); spawn_metadata_manager(metadata_manager).expect("metadata manager starts"); - let bifrost_svc = BifrostService::new() + let bifrost_svc = BifrostService::new(metadata_writer) .enable_in_memory_loglet() .enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index cb4219513b..213e328fa9 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -86,6 +86,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); + let metadata_writer = metadata_manager.writer(); let mut router_builder = MessageRouterBuilder::default(); metadata_manager.register_in_message_router(&mut router_builder); @@ -95,7 +96,8 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { metadata_manager.run(), )?; - let bifrost_svc = BifrostService::new().enable_local_loglet(&Configuration::updateable()); + let bifrost_svc = + BifrostService::new(metadata_writer).enable_local_loglet(&Configuration::updateable()); let bifrost = bifrost_svc.handle(); // Ensures bifrost has initial metadata synced up before starting the worker. diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index ee702beec9..e556aa82c2 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -103,11 +103,10 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { // We start the Meta service, then download the openapi schema generated let node_env = TestCoreEnv::create_with_single_node(1, 1).await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; let admin_service = AdminService::new( node_env.metadata_writer.clone(), - node_env.metadata_store_client.clone(), bifrost, Mock, ServiceDiscovery::new(