Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: backpressure chainhead_v1_follow #6058

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
18 changes: 18 additions & 0 deletions prdoc/pr_6058.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: backpressure `chainhead_v1_follow`

doc:
- audience: Node Operator
description: |
The RPC endpoint `chainHead_v1_follow` now relies on backpressure
to determine whether or not the subscription should be closed instead of continuing to send more events
to a consumer which can't keep up.
This should significantly improve memory consumption as substrate will be keeping less messages in memory.

crates:
- name: sc-rpc-spec-v2
pkhry marked this conversation as resolved.
Show resolved Hide resolved
bump: patch
- name: sc-rpc
bump: major
87 changes: 32 additions & 55 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use crate::chain_head::{
};
use futures::{
channel::oneshot,
stream::{self, Stream, StreamExt},
stream::{self, Stream, StreamExt, TryStreamExt},
};
use futures_util::future::Either;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
Expand Down Expand Up @@ -705,71 +704,49 @@ where
async fn submit_events<EventStream>(
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
stream: EventStream,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
EventStream: Stream<Item = NotificationType<Block>> + Unpin + Send,
{
let mut stream_item = stream.next();

// The stop event can be triggered by the chainHead logic when the pinned
// block guarantee cannot be hold. Or when the client is disconnected.
let connection_closed = sink.closed();
tokio::pin!(connection_closed);
let mut stop_event = futures_util::future::select(rx_stop, connection_closed);

while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
let events = match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};
// create a channel to propagate error messages
let mut handle_events = |event| match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

let events = match events {
Ok(events) => events,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
self.sub_id,
err
);
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
let stream = stream
Copy link
Member

@niklasad1 niklasad1 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tempting to add something similar to pipe_from_stream that takes a TryStream i.,e that produces Result<T, Error> and bails on the first error similar to how try_for_each works to avoid having this channel to be injected here just to get back the error and then quit....

Nothing that needs to be addressed in this PR but would be neat :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to the pr

.map(|event| handle_events(event))
.map_ok(|items| stream::iter(items).map(Ok))
.try_flatten();

tokio::pin!(stream);

let sink_future = sink
.pipe_from_try_stream(stream, sc_rpc::utils::BoundedVecDeque::new(MAX_PINNED_BLOCKS));

for event in events {
if let Err(err) = sink.send(&event).await {
// Failed to submit event.
let result = tokio::select! {
_ = rx_stop => Ok(()),
result = sink_future => {
if let Err(ref e) = result {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
"[follow][id={:?}] Failed to handle stream notification {:?}",
&self.sub_id,
e
);

let _ = sink.send(&FollowEvent::<String>::Stop).await;
// No need to propagate this error further, the client disconnected.
return Ok(())
}
};
result
}

stream_item = stream.next();
stop_event = next_stop_event;
}

// If we got here either:
// - the substrate streams have closed
// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
// - the client disconnected.
};
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
result
}

/// Generate the block events for the `chainHead_follow` method.
Expand Down
52 changes: 51 additions & 1 deletion substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use sp_core::{
use sp_runtime::traits::Block as BlockT;
use sp_version::RuntimeVersion;
use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -4033,3 +4033,53 @@ async fn follow_event_with_unknown_parent() {
// When importing the block 2, chainHead detects a gap in our blocks and stops.
assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
}

#[tokio::test]
async fn events_are_backpressured() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = Arc::new(builder.build());

let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TokioTestExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
.into_rpc();

let mut parent_hash = client.chain_info().genesis_hash;
let mut header = VecDeque::new();
let mut sub = api.subscribe("chainHead_v1_follow", [false], 511).await.unwrap();

// insert more events than the user can consume
for i in 0..=512 {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(i)
.build()
.unwrap()
.build()
.unwrap()
.block;
header.push_front(block.header().clone());

parent_hash = block.hash();
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
}

let mut events = Vec::new();

while let Some(event) = sub.next::<FollowEvent<String>>().await {
events.push(event);
}

assert_eq!(events.len(), 64);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: i'm not that familiar with testing libs but why subscription falls out at 63 instead of 511 here?
the error from overflowing buffer still happens tho
cc: @niklasad1

Copy link
Member

@niklasad1 niklasad1 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024-10-15T15:07:45.783080Z TRACE jsonrpsee-server: [Methods::subscribe] Method: chainHead_v1_follow, params: Some(RawValue([false]))
2024-10-15T15:07:45.783324Z DEBUG rpc-spec-v2: [follow][id="8999421366389847"] Subscription accepted
2024-10-15T15:07:45.783524Z  INFO sc_rpc::utils: Starting subscription=chainHead_v1_followEvent
2024-10-15T15:07:45.783642Z TRACE jsonrpsee-server: [Methods::inner_call] Method: chainHead_v1_follow, response: {"jsonrpc":"2.0","id":0,"result":8999421366389847}
2024-10-15T15:07:47.380636Z DEBUG jsonrpsee-server: [Subscription::next]: rx {"jsonrpc":"2.0","method":"chainHead_v1_followEvent","params":{"subscription":8999421366389847,"result":{"event":"initialized","finalizedBlockHashes":["0xb4b48648e4869cffd7333c79458c3db011b9d6556af2a53eb5a186763837a73c"]}}}
2024-10-15T15:07:47.382317Z DEBUG rpc-spec-v2: [follow][id="8999421366389847"] Failed to handle stream notification ExceededLimits
2024-10-15T15:07:47.382353Z DEBUG rpc-spec-v2: [follow][id="8999421366389847"] Failed to handle stream notification SubscriptionAbsent
2024-10-15T15:07:47.383164Z DEBUG rpc-spec-v2: [follow][id="8999421366389847"] Subscription removed

It looks like you are hitting max block pinning limits when you import so many blocks.
Thus, it may be better to just add subscription buffer cap to the ChainHeadConfig to make it easy to change for tests etc and it's a bit extreme to import 512 blocks for a unit test.

assert_matches!(events.pop().unwrap().map(|x| x.0), Ok(FollowEvent::Stop));
}
79 changes: 75 additions & 4 deletions substrate/client/rpc/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::SubscriptionTaskExecutor;
use futures::{
future::{self, Either, Fuse, FusedFuture},
Future, FutureExt, Stream, StreamExt,
Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
};
use jsonrpsee::{
types::SubscriptionId, DisconnectError, PendingSubscriptionSink, SubscriptionMessage,
Expand Down Expand Up @@ -173,10 +173,10 @@ impl From<SubscriptionSink> for Subscription {
impl Subscription {
/// Feed items to the subscription from the underlying stream
/// with specified buffer strategy.
pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
pub async fn pipe_from_stream<S, T, B>(&self, mut stream: S, mut buf: B)
where
S: Stream<Item = T> + Unpin + Send + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok nice catch :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this was causing the lifetime issue Pavlo mentioned before, nice catch indeed :D

T: Serialize + Send + 'static,
S: Stream<Item = T> + Unpin,
T: Serialize + Send,
B: Buffer<Item = T>,
{
let mut next_fut = Box::pin(Fuse::terminated());
Expand Down Expand Up @@ -238,6 +238,77 @@ impl Subscription {
}
}

/// Feed items to the subscription from the underlying stream
/// with specified buffer strategy.
pub async fn pipe_from_try_stream<S, T, B, E>(&self, mut stream: S, mut buf: B) -> Result<(), E>
where
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize + Send,
B: Buffer<Item = T>,
{
let mut next_fut = Box::pin(Fuse::terminated());
let mut next_item = stream.try_next();
let closed = self.0.closed();

futures::pin_mut!(closed);

loop {
if next_fut.is_terminated() {
if let Some(v) = buf.pop() {
let val = self.to_sub_message(&v);
next_fut.set(async { self.0.send(val).await }.fuse());
}
}

match future::select(closed, future::select(next_fut, next_item)).await {
// Send operation finished.
Either::Right((Either::Left((_, n)), c)) => {
next_item = n;
closed = c;
next_fut = Box::pin(Fuse::terminated());
},
// New item from the stream
Either::Right((Either::Right((Ok(Some(v)), n)), c)) => {
if buf.push(v).is_err() {
log::debug!(
target: "rpc",
"Subscription buffer full for subscription={} conn_id={}; dropping subscription",
self.0.method_name(),
self.0.connection_id().0
);
return Ok(());
}

next_fut = n;
closed = c;
next_item = stream.try_next();
},
// Error occured while processing the stream.
//
// terminate the stream.
Either::Right((Either::Right((Err(e), _)), _)) => return Err(e),
// Stream "finished".
//
// Process remaining items and terminate.
Either::Right((Either::Right((Ok(None), pending_fut)), _)) => {
if !pending_fut.is_terminated() && pending_fut.await.is_err() {
return Ok(());
}

while let Some(v) = buf.pop() {
if self.send(&v).await.is_err() {
return Ok(());
}
}

return Ok(());
},
// Subscription was closed.
Either::Left(_) => return Ok(()),
}
}
}

/// Send a message on the subscription.
pub async fn send(&self, result: &impl Serialize) -> Result<(), DisconnectError> {
self.0.send(self.to_sub_message(result)).await
Expand Down
Loading