Skip to content

Commit

Permalink
[Core] MetadataWriter access to MetadataStore and Bifrost access to M…
Browse files Browse the repository at this point in the history
…etadataWriter

This PR introduces two key changes:

1. Bifrost now can provide `bifrost.admin()` to get easy access to bifrost's admin interface withouto passing metadata writer and metadata store client explicitly.
2. MetadataWriter now owns MetadataStoreClient.


This means that you can always access metadata store client directly if you have metadata_writer which used to be a pair of types we _always_ pass together. This also opens a path for a future where MetadataWriter wraps the metadata store client and automatically update metadata manager on successful writes but that's beyond the scope of this PR. What this PR provides is a direct access to the underlying metadata_store_client via an accessor function.
  • Loading branch information
AhmedSoliman committed Jan 7, 2025
1 parent 3e5dc67 commit 52a62f1
Show file tree
Hide file tree
Showing 25 changed files with 200 additions and 271 deletions.
25 changes: 10 additions & 15 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError};
use restate_bifrost::{Bifrost, Error as BiforstError};
use restate_core::{Metadata, MetadataWriter};
use restate_metadata_store::MetadataStoreClient;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::{Logs, SegmentIndex};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
Expand All @@ -44,7 +43,6 @@ use super::service::ChainExtension;
use super::ClusterControllerHandle;

pub(crate) struct ClusterCtrlSvcHandler {
metadata_store_client: MetadataStoreClient,
controller_handle: ClusterControllerHandle,
bifrost: Bifrost,
metadata_writer: MetadataWriter,
Expand All @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler {
impl ClusterCtrlSvcHandler {
pub fn new(
controller_handle: ClusterControllerHandle,
metadata_store_client: MetadataStoreClient,
bifrost: Bifrost,
metadata_writer: MetadataWriter,
) -> Self {
Self {
controller_handle,
metadata_store_client,
bifrost,
metadata_writer,
}
}

async fn get_logs(&self) -> Result<Logs, Status> {
self.metadata_store_client
self.metadata_writer
.metadata_store_client()
.get::<Logs>(BIFROST_CONFIG_KEY.clone())
.await
.map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))?
Expand Down Expand Up @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {

let (trim_point, nodes_config) = tokio::join!(
self.bifrost.get_trim_point(log_id),
self.metadata_store_client
self.metadata_writer
.metadata_store_client()
.get::<NodesConfiguration>(NODES_CONFIG_KEY.clone()),
);

Expand Down Expand Up @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
_request: Request<ListNodesRequest>,
) -> Result<Response<ListNodesResponse>, Status> {
let nodes_config = self
.metadata_store_client
.metadata_writer
.metadata_store_client()
.get::<NodesConfiguration>(NODES_CONFIG_KEY.clone())
.await
.map_err(|error| {
Expand Down Expand Up @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
let request = request.into_inner();
let log_id: LogId = request.log_id.into();

let admin = BifrostAdmin::new(
&self.bifrost,
&self.metadata_writer,
&self.metadata_store_client,
);

let writable_loglet = admin
let writable_loglet = self
.bifrost
.admin()
.writeable_loglet(log_id)
.await
.map_err(|err| match err {
Expand Down
34 changes: 11 additions & 23 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use tokio::task::JoinSet;
use tracing::{debug, error, trace, trace_span, Instrument};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError};
use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError,
retry_on_network_error, Precondition, ReadWriteError, WriteError,
};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::errors::GenericError;
Expand Down Expand Up @@ -924,7 +924,6 @@ pub struct LogsController {
effects: Option<Vec<Effect>>,
inner: LogsControllerInner,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
async_operations: JoinSet<Event>,
find_logs_tail_semaphore: Arc<Semaphore>,
Expand All @@ -934,16 +933,17 @@ impl LogsController {
pub async fn init(
configuration: &Configuration,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
) -> Result<Self> {
// obtain the latest logs or init it with an empty logs variant
let logs = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| {
metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || {
Logs::from_configuration(configuration)
})
metadata_writer
.metadata_store_client()
.get_or_insert(BIFROST_CONFIG_KEY.clone(), || {
Logs::from_configuration(configuration)
})
},
)
.await?;
Expand All @@ -965,7 +965,6 @@ impl LogsController {
retry_policy,
),
bifrost,
metadata_store_client,
metadata_writer,
async_operations: JoinSet::default(),
find_logs_tail_semaphore: Arc::new(Semaphore::new(1)),
Expand All @@ -984,17 +983,12 @@ impl LogsController {

let logs = Arc::clone(&self.inner.current_logs);
let bifrost = self.bifrost.clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();
let find_tail = async move {
let bifrost_admin =
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client);

let mut updates = LogsTailUpdates::default();
for (log_id, chain) in logs.iter() {
let tail_segment = chain.tail();

let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await {
let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await {
Ok(loglet) => loglet,
Err(BifrostError::Shutdown(_)) => break,
Err(err) => {
Expand Down Expand Up @@ -1098,7 +1092,6 @@ impl LogsController {
logs: Arc<Logs>,
mut debounce: Option<RetryIter<'static>>,
) {
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();

self.async_operations.spawn(async move {
Expand All @@ -1108,7 +1101,7 @@ impl LogsController {
tokio::time::sleep(delay).await;
}

if let Err(err) = metadata_store_client
if let Err(err) = metadata_writer.metadata_store_client()
.put(
BIFROST_CONFIG_KEY.clone(),
logs.deref(),
Expand All @@ -1120,7 +1113,7 @@ impl LogsController {
WriteError::FailedPrecondition(_) => {
debug!("Detected a concurrent modification of logs. Fetching the latest logs now.");
// There was a concurrent modification of the logs. Fetch the latest version.
match metadata_store_client
match metadata_writer.metadata_store_client()
.get::<Logs>(BIFROST_CONFIG_KEY.clone())
.await
{
Expand Down Expand Up @@ -1166,8 +1159,6 @@ impl LogsController {
mut debounce: Option<RetryIter<'static>>,
) {
let bifrost = self.bifrost.clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();

self.async_operations.spawn(
async move {
Expand All @@ -1177,10 +1168,7 @@ impl LogsController {
tokio::time::sleep(delay).await;
}

let bifrost_admin =
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client);

match bifrost_admin.seal(log_id, segment_index).await {
match bifrost.admin().seal(log_id, segment_index).await {
Ok(sealed_segment) => {
if sealed_segment.tail.is_sealed() {
Event::SealSucceeded {
Expand Down
Loading

0 comments on commit 52a62f1

Please sign in to comment.