diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs index 762be6083..fa9146cbb 100644 --- a/crates/bifrost/src/loglet/error.rs +++ b/crates/bifrost/src/loglet/error.rs @@ -11,7 +11,7 @@ use std::fmt::Debug; use std::sync::Arc; -use restate_core::ShutdownError; +use restate_core::{network::NetworkError, ShutdownError}; use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError}; #[derive(Debug, Clone, thiserror::Error)] @@ -68,3 +68,13 @@ impl From for AppendError { } } } + +impl From for OperationError { + fn from(value: NetworkError) -> Self { + match value { + NetworkError::Shutdown(err) => OperationError::Shutdown(err), + // todo(azmy): are all network errors retryable? + _ => OperationError::retryable(value), + } + } +} diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index e64c81005..25bc99234 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -168,6 +168,10 @@ impl LogletCommitResolver { pub fn offset(self, offset: LogletOffset) { let _ = self.tx.send(Ok(offset)); } + + pub fn error(self, err: AppendError) { + let _ = self.tx.send(Err(err)); + } } pub struct LogletCommit { diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 686375767..44e021741 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -32,6 +32,7 @@ use crate::providers::replicated_loglet::tasks::{FindTailTask, SealTask}; use super::error::ReplicatedLogletError; use super::log_server_manager::RemoteLogServerManager; use super::record_cache::RecordCache; +use super::remote_sequencer::RemoteSequencer; use super::rpc_routers::{LogServersRpc, SequencersRpc}; use super::tasks::FindTailResult; @@ -97,7 +98,14 @@ impl ReplicatedLoglet { } } else { SequencerAccess::Remote { - sequencers_rpc: sequencers_rpc.clone(), + handle: RemoteSequencer::new( + log_id, + segment_index, + my_params.clone(), + networking.clone(), + known_global_tail.clone(), + sequencers_rpc.clone(), + ), } }; Ok(Self { @@ -118,7 +126,7 @@ impl ReplicatedLoglet { pub enum SequencerAccess { /// The sequencer is remote (or retired/preempted) #[debug("Remote")] - Remote { sequencers_rpc: SequencersRpc }, + Remote { handle: RemoteSequencer }, /// We are the loglet leaders #[debug("Local")] Local { handle: Sequencer }, @@ -145,9 +153,7 @@ impl Loglet for ReplicatedLoglet { async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { match self.sequencer { SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await, - SequencerAccess::Remote { .. } => { - todo!("Access to remote sequencers is not implemented yet") - } + SequencerAccess::Remote { ref handle } => handle.append(payloads).await, } } diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index 41fbbfa61..b7b380174 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -16,6 +16,8 @@ mod network; mod provider; #[allow(dead_code)] mod record_cache; +#[allow(dead_code)] +mod remote_sequencer; pub mod replication; mod rpc_routers; #[allow(dead_code)] diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs new file mode 100644 index 000000000..bf96571cd --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -0,0 +1,604 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{ + ops::Deref, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore}; + +use restate_core::{ + network::{ + rpc_router::{RpcRouter, RpcToken}, + NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection, + }, + task_center, ShutdownError, TaskKind, +}; +use restate_types::{ + config::Configuration, + logs::{metadata::SegmentIndex, LogId, LogletOffset, Record}, + net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus}, + replicated_loglet::ReplicatedLogletParams, + GenerationalNodeId, +}; + +use super::rpc_routers::SequencersRpc; +use crate::loglet::{ + util::TailOffsetWatch, AppendError, LogletCommit, LogletCommitResolver, OperationError, +}; + +pub struct RemoteSequencer { + log_id: LogId, + segment_index: SegmentIndex, + params: ReplicatedLogletParams, + networking: Networking, + max_inflight_records_in_config: AtomicUsize, + record_permits: Arc, + sequencers_rpc: SequencersRpc, + known_global_tail: TailOffsetWatch, + connection: Arc>>, +} + +impl RemoteSequencer +where + T: TransportConnect, +{ + /// Creates a new instance of RemoteSequencer + pub fn new( + log_id: LogId, + segment_index: SegmentIndex, + params: ReplicatedLogletParams, + networking: Networking, + known_global_tail: TailOffsetWatch, + sequencers_rpc: SequencersRpc, + ) -> Self { + let max_inflight_records_in_config: usize = Configuration::pinned() + .bifrost + .replicated_loglet + .maximum_inflight_records + .into(); + + let record_permits = Arc::new(Semaphore::new(max_inflight_records_in_config)); + + Self { + log_id, + segment_index, + params, + networking, + max_inflight_records_in_config: AtomicUsize::new(max_inflight_records_in_config), + record_permits, + sequencers_rpc, + known_global_tail, + connection: Arc::default(), + } + } + + pub fn ensure_enough_permits(&self, required: usize) { + let mut available = self.max_inflight_records_in_config.load(Ordering::Relaxed); + while available < required { + let delta = required - available; + match self.max_inflight_records_in_config.compare_exchange( + available, + required, + Ordering::Release, + Ordering::Relaxed, + ) { + Ok(_) => { + self.record_permits.add_permits(delta); + } + Err(current) => { + available = current; + } + } + } + } + + pub async fn append(&self, payloads: Arc<[Record]>) -> Result { + if self.known_global_tail.is_sealed() { + return Ok(LogletCommit::sealed()); + } + + self.ensure_enough_permits(payloads.len()); + + let len = u32::try_from(payloads.len()).expect("batch sizes fit in u32"); + + let permits = self + .record_permits + .clone() + .acquire_many_owned(len) + .await + .unwrap(); + + let mut connection = self.get_connection().await?; + + let mut msg = Append { + header: CommonRequestHeader { + log_id: self.log_id, + loglet_id: self.params.loglet_id, + segment_index: self.segment_index, + }, + payloads, + }; + + let rpc_token = loop { + match connection + .send(&self.sequencers_rpc.append, self.params.sequencer, msg) + .await + { + Ok(token) => break token, + Err(err) => { + match err.source { + NetworkError::ConnectError(_) + | NetworkError::ConnectionClosed + | NetworkError::Timeout(_) => { + // we retry to re-connect one time + connection = self.renew_connection(connection).await?; + + msg = err.original; + continue; + } + err => return Err(err.into()), + } + } + }; + }; + let (commit_token, commit_resolver) = LogletCommit::deferred(); + + connection.resolve_on_appended(permits, rpc_token, commit_resolver); + + Ok(commit_token) + } + + /// Gets or starts a new remote sequencer connection + async fn get_connection(&self) -> Result { + let mut guard = self.connection.lock().await; + if let Some(connection) = guard.deref() { + return Ok(connection.clone()); + } + + let connection = self + .networking + .node_connection(self.params.sequencer.into()) + .await?; + let connection = + RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?; + + *guard = Some(connection.clone()); + + Ok(connection) + } + + /// Renew a connection to a remote sequencer. This guarantees that only a single connection + /// to the sequencer is available. + async fn renew_connection( + &self, + old: RemoteSequencerConnection, + ) -> Result { + let mut guard = self.connection.lock().await; + let current = guard.as_ref().expect("connection has been initialized"); + + // stream has already been renewed + if old.inner != current.inner { + return Ok(current.clone()); + } + + let connection = self + .networking + .node_connection(self.params.sequencer.into()) + .await?; + + let connection = + RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?; + + *guard = Some(connection.clone()); + + Ok(connection) + } +} + +/// RemoteSequencerConnection represents a single open connection +/// to a remote leader sequencer. +/// +/// This connection handles all [`Appended`] responses from the remote +/// sequencer. +/// +/// If the connection was lost or if any of the commits failed +/// with a terminal error (like [`SequencerStatus::Sealed`]) all pending commits +/// are resolved with an error. +#[derive(Clone)] +struct RemoteSequencerConnection { + inner: WeakConnection, + tx: mpsc::UnboundedSender, +} + +impl RemoteSequencerConnection { + fn start( + known_global_tail: TailOffsetWatch, + connection: WeakConnection, + ) -> Result { + let (tx, rx) = mpsc::unbounded_channel(); + + task_center().spawn( + TaskKind::Disposable, + "remote-sequencer-connection", + None, + Self::handle_appended_responses(known_global_tail, connection.clone(), rx), + )?; + + Ok(Self { + inner: connection, + tx, + }) + } + + /// Send append message to remote sequencer. + /// + /// It's up to the caller to retry on [`NetworkError`] + pub async fn send( + &self, + rpc_router: &RpcRouter, + sequencer: GenerationalNodeId, + msg: Append, + ) -> Result, NetworkSendError> { + let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone()); + + rpc_router + .send_on_connection(outgoing) + .await + .map_err(|err| NetworkSendError::new(err.original.into_body(), err.source)) + } + + pub fn resolve_on_appended( + &self, + permit: OwnedSemaphorePermit, + rpc_token: RpcToken, + commit_resolver: LogletCommitResolver, + ) { + let inflight_append = RemoteInflightAppend { + rpc_token, + commit_resolver, + permit, + }; + + if let Err(err) = self.tx.send(inflight_append) { + // if we failed to push this to be processed by the connection reactor task + // then we need to notify the caller + err.0 + .commit_resolver + .error(AppendError::retryable(NetworkError::ConnectionClosed)); + } + } + + /// Handle all [`Appended`] responses + /// + /// This task will run until the [`AppendStream`] is dropped. Once dropped + /// all pending commits will be resolved with an error. it's up to the enqueuer + /// to retry if needed. + async fn handle_appended_responses( + known_global_tail: TailOffsetWatch, + connection: WeakConnection, + mut rx: mpsc::UnboundedReceiver, + ) -> anyhow::Result<()> { + let mut closed = std::pin::pin!(connection.closed()); + + // handle all rpc tokens in an infinite loop + // this loop only breaks when it encounters a terminal + // AppendError. + // When this happens, the receiver channel is closed + // and drained. The same error is then used to resolve + // all pending tokens + let err = loop { + let inflight = tokio::select! { + inflight = rx.recv() => { + inflight + } + _ = &mut closed => { + break AppendError::retryable(NetworkError::ConnectionClosed); + } + }; + + let Some(inflight) = inflight else { + // connection was dropped. + break AppendError::retryable(NetworkError::ConnectionClosed); + }; + + let RemoteInflightAppend { + rpc_token, + commit_resolver, + permit: _permit, + } = inflight; + + let appended = tokio::select! { + incoming = rpc_token.recv() => { + incoming.map_err(AppendError::Shutdown) + }, + _ = &mut closed => { + Err(AppendError::retryable(NetworkError::ConnectionClosed)) + } + }; + + let appended = match appended { + Ok(appended) => appended.into_body(), + Err(err) => { + // this can only be a terminal error (either shutdown or connection is closing) + commit_resolver.error(err.clone()); + break err; + } + }; + + // good chance to update known global tail + if let Some(offset) = appended.known_global_tail { + known_global_tail.notify_offset_update(offset); + } + + // handle status of the response. + match appended.status { + SequencerStatus::Ok => { + commit_resolver.offset(appended.first_offset); + } + SequencerStatus::Malformed => { + // While the malformed status is non-terminal for the connection + // (since only one request is malformed), + // the AppendError for the caller is terminal + commit_resolver.error(AppendError::terminal(Malformed)); + } + SequencerStatus::Sealed => { + // A sealed status returns a terminal error since we can immediately cancel + // all inflight append jobs. + commit_resolver.sealed(); + break AppendError::Sealed; + } + } + }; + + // close channel to stop any further appends calls on the same connection + rx.close(); + + // Drain and resolve ALL pending appends on this connection. + // + // todo(azmy): The order of the RemoteInflightAppend's on the channel + // does not necessary matches the actual append calls. This is + // since sending on the connection and pushing on the rx channel is not an atomic + // operation. Which means that, it's possible when we are draining + // the pending requests here that we also end up cancelling some inflight appends + // that has already received a positive response from the sequencer. + // + // For now this should not be a problem since they can (possibly) retry + // to do the write again later. + while let Some(inflight) = rx.recv().await { + inflight.commit_resolver.error(err.clone()); + } + + Ok(()) + } +} + +struct MaybeTerminalAppendError { + pub original: AppendError, + pub terminal: bool, +} + +impl MaybeTerminalAppendError { + fn terminal(error: AppendError) -> Self { + Self { + original: error, + terminal: true, + } + } + + fn non_terminal(error: AppendError) -> Self { + Self { + original: error, + terminal: false, + } + } +} + +//todo: inflight remote append +pub(crate) struct RemoteInflightAppend { + rpc_token: RpcToken, + commit_resolver: LogletCommitResolver, + permit: OwnedSemaphorePermit, +} + +#[derive(thiserror::Error, Debug, Clone)] +#[error("Malformed request")] +pub struct Malformed; + +#[cfg(test)] +mod test { + use std::{ + future::Future, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Duration, + }; + + use rand::Rng; + + use restate_core::{ + network::{Incoming, MessageHandler, MockConnector}, + TaskCenterBuilder, TestCoreEnv, TestCoreEnvBuilder, + }; + use restate_types::{ + logs::{LogId, LogletOffset, Record, SequenceNumber, TailState}, + net::replicated_loglet::{Append, Appended, CommonResponseHeader, SequencerStatus}, + replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}, + GenerationalNodeId, + }; + + use super::RemoteSequencer; + use crate::{ + loglet::{util::TailOffsetWatch, AppendError}, + providers::replicated_loglet::rpc_routers::SequencersRpc, + }; + + struct SequencerMockHandler { + offset: AtomicU32, + reply_status: SequencerStatus, + } + + impl SequencerMockHandler { + fn with_reply_status(reply_status: SequencerStatus) -> Self { + Self { + reply_status, + ..Default::default() + } + } + } + + impl Default for SequencerMockHandler { + fn default() -> Self { + Self { + offset: AtomicU32::new(LogletOffset::OLDEST.into()), + reply_status: SequencerStatus::Ok, + } + } + } + + impl MessageHandler for SequencerMockHandler { + type MessageType = Append; + async fn on_message(&self, msg: Incoming) { + let first_offset = self + .offset + .fetch_add(msg.body().payloads.len() as u32, Ordering::Relaxed); + + let outgoing = msg.into_outgoing(Appended { + first_offset: LogletOffset::from(first_offset), + header: CommonResponseHeader { + known_global_tail: None, + sealed: Some(false), + status: self.reply_status, + }, + }); + let delay = rand::thread_rng().gen_range(50..350); + tokio::time::sleep(Duration::from_millis(delay)).await; + outgoing.send().await.unwrap(); + } + } + + struct TestEnv { + pub core_env: TestCoreEnv, + pub remote_sequencer: RemoteSequencer, + } + + async fn setup(sequencer: SequencerMockHandler, test: F) + where + O: Future, + F: FnOnce(TestEnv) -> O, + { + let (connector, _receiver) = MockConnector::new(100); + let tc = TaskCenterBuilder::default() + .default_runtime_handle(tokio::runtime::Handle::current()) + .ingress_runtime_handle(tokio::runtime::Handle::current()) + .build() + .expect("task_center builds"); + let connector = Arc::new(connector); + + let mut builder = + TestCoreEnvBuilder::with_transport_connector(tc.clone(), Arc::clone(&connector)) + .add_mock_nodes_config() + .add_message_handler(sequencer); + + let sequencer_rpc = SequencersRpc::new(&mut builder.router_builder); + + let params = ReplicatedLogletParams { + loglet_id: 1.into(), + nodeset: NodeSet::empty(), + replication: ReplicationProperty::new(1.try_into().unwrap()), + sequencer: GenerationalNodeId::new(1, 1), + write_set: None, + }; + let known_global_tail = TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)); + let remote_sequencer = RemoteSequencer::new( + LogId::new(1), + 1.into(), + params, + builder.networking.clone(), + known_global_tail, + sequencer_rpc, + ); + + let core_env = builder.build().await; + core_env + .tc + .clone() + .run_in_scope("test", None, async { + let env = TestEnv { + core_env, + remote_sequencer, + }; + test(env).await; + }) + .await; + } + + #[tokio::test] + async fn test_remote_stream_ok() { + let handler = SequencerMockHandler::default(); + + setup(handler, |test_env| async move { + let records: Vec = + vec!["record 1".into(), "record 2".into(), "record 3".into()]; + + let commit_1 = test_env + .remote_sequencer + .append(records.clone().into()) + .await + .unwrap(); + + let commit_2 = test_env + .remote_sequencer + .append(records.clone().into()) + .await + .unwrap(); + + let first_offset_1 = commit_1.await.unwrap(); + assert_eq!(first_offset_1, 1.into()); + let first_offset_2 = commit_2.await.unwrap(); + assert_eq!(first_offset_2, 4.into()); + }) + .await; + } + + #[tokio::test] + async fn test_remote_stream_sealed() { + let handler = SequencerMockHandler::with_reply_status(SequencerStatus::Sealed); + + setup(handler, |test_env| async move { + let records: Vec = + vec!["record 1".into(), "record 2".into(), "record 3".into()]; + + let commit_1 = test_env + .remote_sequencer + .append(records.clone().into()) + .await + .unwrap(); + + let commit_2 = test_env + .remote_sequencer + .append(records.clone().into()) + .await + .unwrap(); + + let first_offset_1 = commit_1.await; + assert!(matches!(first_offset_1, Err(AppendError::Sealed))); + let first_offset_2 = commit_2.await; + assert!(matches!(first_offset_2, Err(AppendError::Sealed))); + }) + .await; + } +} diff --git a/crates/core/src/network/rpc_router.rs b/crates/core/src/network/rpc_router.rs index c7c7250a2..a3a254e93 100644 --- a/crates/core/src/network/rpc_router.rs +++ b/crates/core/src/network/rpc_router.rs @@ -100,6 +100,19 @@ where .map_err(|_| RpcError::Shutdown(ShutdownError)) } + pub async fn send_on_connection( + &self, + outgoing: Outgoing, + ) -> Result, NetworkSendError>> { + let token = self + .response_tracker + .register(&outgoing) + .expect("msg-id is registered once"); + + outgoing.send().await?; + Ok(token) + } + pub fn num_in_flight(&self) -> usize { self.response_tracker.num_in_flight() } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 29a6165ef..3468b63e9 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -49,7 +49,7 @@ prost-types = { workspace = true } rand = { workspace = true } regress = { workspace = true } schemars = { workspace = true, optional = true } -serde = { workspace = true } +serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } serde_path_to_error = { version = "0.1" } serde_with = { workspace = true } diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index 3006ec336..85a3bcea1 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -11,10 +11,10 @@ //! Defines messages between replicated loglet instances use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use serde::{Deserialize, Serialize}; -use super::log_server::Status; use super::TargetName; use crate::logs::metadata::SegmentIndex; use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState}; @@ -29,6 +29,20 @@ define_rpc! { @response_target = TargetName::ReplicatedLogletAppended, } +/// Status of sequencer response. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum SequencerStatus { + /// Ok is returned when request is accepted and processes + /// successfully. Hence response body is valid + Ok, + /// Sealed is returned when the sequencer cannot accept more + /// [`Append`] requests because it's sealed + Sealed, + /// Malformed means request was not accepted duo to bad request + /// body or invalid data. + Malformed, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommonRequestHeader { /// This is used only to locate the loglet params if this operation activates @@ -43,7 +57,7 @@ pub struct CommonRequestHeader { pub struct CommonResponseHeader { pub known_global_tail: Option, pub sealed: Option, - pub status: Status, + pub status: SequencerStatus, } impl CommonResponseHeader { @@ -51,7 +65,7 @@ impl CommonResponseHeader { Self { known_global_tail: tail_state.map(|t| t.offset()), sealed: tail_state.map(|t| t.is_sealed()), - status: Status::Ok, + status: SequencerStatus::Ok, } } @@ -59,7 +73,7 @@ impl CommonResponseHeader { Self { known_global_tail: None, sealed: None, - status: Status::Disabled, + status: SequencerStatus::Ok, } } } @@ -69,7 +83,7 @@ impl CommonResponseHeader { pub struct Append { #[serde(flatten)] pub header: CommonRequestHeader, - pub payloads: Vec, + pub payloads: Arc<[Record]>, } impl Append { @@ -86,7 +100,7 @@ pub struct Appended { #[serde(flatten)] pub header: CommonResponseHeader, // INVALID if Status indicates that the append failed - first_offset: LogletOffset, + pub first_offset: LogletOffset, } impl Deref for Appended { @@ -118,7 +132,7 @@ impl Appended { } } - pub fn with_status(mut self, status: Status) -> Self { + pub fn with_status(mut self, status: SequencerStatus) -> Self { self.header.status = status; self }