-
Notifications
You must be signed in to change notification settings - Fork 262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add archive RPCs to subxt-rpcs #1940
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,7 +179,7 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> { | |
Ok(response) | ||
} | ||
|
||
/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the runtime API result. | ||
/// Call the `chainHead_v1_call` method and return an operation ID to obtain the runtime API result. | ||
/// | ||
/// The response events are provided on the `chainHead_follow` subscription and identified by | ||
/// the returned operation ID. | ||
|
@@ -224,20 +224,16 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> { | |
|
||
/// Return the genesis hash. | ||
pub async fn chainspec_v1_genesis_hash(&self) -> Result<T::Hash, Error> { | ||
let hash = self | ||
.client | ||
self.client | ||
.request("chainSpec_v1_genesisHash", rpc_params![]) | ||
.await?; | ||
Ok(hash) | ||
.await | ||
} | ||
|
||
/// Return a string containing the human-readable name of the chain. | ||
pub async fn chainspec_v1_chain_name(&self) -> Result<String, Error> { | ||
let hash = self | ||
.client | ||
self.client | ||
.request("chainSpec_v1_chainName", rpc_params![]) | ||
.await?; | ||
Ok(hash) | ||
.await | ||
} | ||
|
||
/// Returns the JSON payload found in the chain specification under the key properties. | ||
|
@@ -295,6 +291,132 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> { | |
.request("transaction_v1_stop", rpc_params![operation_id]) | ||
.await | ||
} | ||
|
||
/// Fetch the block body (ie the extrinsics in the block) given its hash. | ||
/// | ||
/// Returns an array of the hexadecimal-encoded scale-encoded extrinsics found in the block, | ||
/// or `None` if the block wasn't found. | ||
pub async fn archive_unstable_body( | ||
&self, | ||
block_hash: T::Hash, | ||
) -> Result<Option<Vec<Bytes>>, Error> { | ||
self.client | ||
.request("archive_unstable_body", rpc_params![block_hash]) | ||
.await | ||
} | ||
|
||
/// Call the `archive_unstable_call` method and return the response. | ||
pub async fn archive_unstable_call( | ||
&self, | ||
block_hash: T::Hash, | ||
function: &str, | ||
call_parameters: &[u8], | ||
) -> Result<ArchiveCallResult, Error> { | ||
use serde::de::Error as _; | ||
|
||
// We deserialize to this intermediate shape, since | ||
// we can't have a boolean tag to denote variants. | ||
#[derive(Deserialize)] | ||
struct Response { | ||
success: bool, | ||
value: Option<Bytes>, | ||
error: Option<String>, | ||
// This was accidentally used instead of value in Substrate, | ||
// so to support those impls we try it here if needed: | ||
result: Option<Bytes>, | ||
} | ||
|
||
let res: Response = self | ||
.client | ||
.request( | ||
"archive_unstable_call", | ||
rpc_params![block_hash, function, to_hex(call_parameters)], | ||
) | ||
.await?; | ||
|
||
let value = res.value.or(res.result); | ||
match (res.success, value, res.error) { | ||
(true, Some(value), _) => Ok(ArchiveCallResult::Success(value)), | ||
(false, _, err) => Ok(ArchiveCallResult::Error(err.unwrap_or(String::new()))), | ||
(true, None, _) => { | ||
let m = "archive_unstable_call: 'success: true' response should have `value: 0x1234` alongside it"; | ||
Err(Error::Deserialization(serde_json::Error::custom(m))) | ||
} | ||
} | ||
} | ||
|
||
/// Return the finalized block height of the chain. | ||
pub async fn archive_unstable_finalized_height(&self) -> Result<usize, Error> { | ||
self.client | ||
.request("archive_unstable_finalizedHeight", rpc_params![]) | ||
.await | ||
} | ||
|
||
/// Return the genesis hash. | ||
pub async fn archive_unstable_genesis_hash(&self) -> Result<T::Hash, Error> { | ||
self.client | ||
.request("archive_unstable_genesisHash", rpc_params![]) | ||
.await | ||
} | ||
|
||
/// Given a block height, return the hashes of the zero or more blocks at that height. | ||
/// For blocks older than the latest finalized block, only one entry will be returned. For blocks | ||
/// newer than the latest finalized block, it's possible to have 0, 1 or multiple blocks at | ||
/// that height given that forks could occur. | ||
pub async fn archive_unstable_hash_by_height( | ||
&self, | ||
height: usize, | ||
) -> Result<Vec<T::Hash>, Error> { | ||
self.client | ||
.request("archive_unstable_hashByHeight", rpc_params![height]) | ||
.await | ||
} | ||
|
||
/// Fetch the header for a block with the given hash, or `None` if no block with that hash exists. | ||
pub async fn archive_unstable_header( | ||
&self, | ||
block_hash: T::Hash, | ||
) -> Result<Option<T::Header>, Error> { | ||
let maybe_encoded_header: Option<Bytes> = self | ||
.client | ||
.request("archive_unstable_header", rpc_params![block_hash]) | ||
.await?; | ||
|
||
let Some(encoded_header) = maybe_encoded_header else { | ||
return Ok(None); | ||
}; | ||
|
||
let header = | ||
<T::Header as codec::Decode>::decode(&mut &*encoded_header.0).map_err(Error::Decode)?; | ||
Ok(Some(header)) | ||
} | ||
|
||
/// Query the node storage and return a subscription which streams corresponding storage events back. | ||
pub async fn archive_unstable_storage( | ||
&self, | ||
block_hash: T::Hash, | ||
items: impl IntoIterator<Item = StorageQuery<&[u8]>>, | ||
child_key: Option<&[u8]>, | ||
) -> Result<ArchiveStorageSubscription<T::Hash>, Error> { | ||
let items: Vec<StorageQuery<String>> = items | ||
.into_iter() | ||
.map(|item| StorageQuery { | ||
key: to_hex(item.key), | ||
query_type: item.query_type, | ||
}) | ||
.collect(); | ||
|
||
let sub = self | ||
.client | ||
.subscribe( | ||
"archive_unstable_storage", | ||
rpc_params![block_hash, items, child_key.map(to_hex)], | ||
"archive_unstable_storage_unsub", // TODO unsub method undefined in spec: look it up/add to spec | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how can archive method have subscription? isn't that a chainhead only thing? B/c ur looking in the past right, so how can it change? whereas chainhead is constantly moving forward so it makes sense to have a subscription? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have discussed quite a bit about this in the spec, I ll try to summarize as best as I can remember: Our general approach is that an external party can craft a request that would take the server a considerable time to produce the response. To avoid such cases (DoS), we decided that subscriptions are more appropriate because they can enforce backpressure in an easy way that does not break higher-level tools (like subxt or jsonrpsee). It might be possible to add backpressure by sending chunks of the method response, but then you'd need to implement custom serializer / deserializer on top of the existing tools, and the amount of change that would require leaned towards us using subscriptions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So its basically a subscription that will only ever emit 1 value right? Interesting! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, technically, the subscription could emit only one value, but that’s not the intended behavior. The implementation ensures backpressure by sending one "storage-lookup/item" at a time (the substrate impls does this) Let’s take a step back: this API allows querying multiple storage items in a single request, which could generate a massive response and become a DoS risk. To mitigate this, the system enforces backpressure—ensuring that a client cannot send a few calls that are computationally expensive for the server all at once. Without a subscription, achieving proper backpressure would require a custom implementation that process the items as long as the client can keep up in a "streaming way" producing a single JSON-RPC method response. However, general JSON-RPC libraries like jsonrpsee don’t natively support such behavior and tricky to implement especially for websocket frames. So, while it's possible for a subscription to emit just one value, the intended behavior is to send items one by one via subscription, maintaining backpressure and preventing overwhelming responses. |
||
) | ||
.await?; | ||
|
||
Ok(ArchiveStorageSubscription { sub, done: false }) | ||
} | ||
} | ||
|
||
/// This represents events generated by the `follow` method. | ||
|
@@ -754,6 +876,140 @@ pub struct TransactionBlockDetails<Hash> { | |
pub index: u64, | ||
} | ||
|
||
/// The response from calling `archive_call`. | ||
#[derive(Debug, Clone, PartialEq, Eq)] | ||
pub enum ArchiveCallResult { | ||
/// The bytes returned from successfully making a call | ||
Success(Bytes), | ||
/// An error returned if the call was not successful. | ||
Error(String), | ||
} | ||
|
||
impl ArchiveCallResult { | ||
/// Return the bytes on success, or `None` if not an [`ArchiveCallResult::Success`]. | ||
pub fn as_success(self) -> Option<Bytes> { | ||
match self { | ||
ArchiveCallResult::Success(bytes) => Some(bytes), | ||
_ => None, | ||
} | ||
} | ||
|
||
/// Return the error message on call failure, or `None` if not an [`ArchiveCallResult::Error`]. | ||
pub fn as_error(self) -> Option<String> { | ||
match self { | ||
ArchiveCallResult::Success(_) => None, | ||
ArchiveCallResult::Error(e) => Some(e), | ||
} | ||
} | ||
} | ||
|
||
/// A subscription which returns follow events, and ends when a Stop event occurs. | ||
pub struct ArchiveStorageSubscription<Hash> { | ||
sub: RpcSubscription<ArchiveStorageEvent<Hash>>, | ||
done: bool, | ||
} | ||
|
||
impl<Hash: BlockHash> ArchiveStorageSubscription<Hash> { | ||
/// Fetch the next item in the stream. | ||
pub async fn next(&mut self) -> Option<<Self as Stream>::Item> { | ||
<Self as StreamExt>::next(self).await | ||
} | ||
/// Fetch the subscription ID for the stream. | ||
pub fn subscription_id(&self) -> Option<&str> { | ||
self.sub.subscription_id() | ||
} | ||
} | ||
|
||
impl<Hash: BlockHash> Stream for ArchiveStorageSubscription<Hash> { | ||
type Item = <RpcSubscription<ArchiveStorageEvent<Hash>> as Stream>::Item; | ||
fn poll_next( | ||
mut self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
if self.done { | ||
return Poll::Ready(None); | ||
} | ||
|
||
let res = self.sub.poll_next_unpin(cx); | ||
|
||
if let Poll::Ready(Some(Ok(ArchiveStorageEvent::Done | ArchiveStorageEvent::Error(..)))) = | ||
&res | ||
{ | ||
// No more events will occur after "done" or "error" events. | ||
self.done = true; | ||
} | ||
|
||
res | ||
} | ||
} | ||
|
||
/// Responses returned from [`ArchiveStorageSubscription`]. | ||
#[derive(Debug, Deserialize)] | ||
#[serde(tag = "event")] | ||
pub enum ArchiveStorageEvent<Hash> { | ||
/// A storage response for one of the requested items. | ||
#[serde(rename = "storage")] | ||
Item(ArchiveStorageEventItem<Hash>), | ||
/// A human-readable error indicating what went wrong. No more storage events | ||
/// will be emitted after this. | ||
#[serde(rename = "storageError")] | ||
Error(ArchiveStorageEventError), | ||
/// No more storage events will be emitted after this. | ||
#[serde(rename = "storageDone")] | ||
Done, | ||
} | ||
|
||
impl<Hash> ArchiveStorageEvent<Hash> { | ||
/// Return a storage item or `None` if not an [`ArchiveStorageEvent::Item`]. | ||
pub fn as_item(self) -> Option<ArchiveStorageEventItem<Hash>> { | ||
match self { | ||
ArchiveStorageEvent::Item(item) => Some(item), | ||
_ => None, | ||
} | ||
} | ||
|
||
/// Return a storage error or `None` if not an [`ArchiveStorageEvent::Error`]. | ||
pub fn as_error(self) -> Option<ArchiveStorageEventError> { | ||
match self { | ||
ArchiveStorageEvent::Error(e) => Some(e), | ||
_ => None, | ||
} | ||
} | ||
|
||
/// Is this an [`ArchiveStorageEvent::Done`]. | ||
pub fn is_done(self) -> bool { | ||
matches!(self, ArchiveStorageEvent::Done) | ||
} | ||
} | ||
|
||
/// Something went wrong during the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription. | ||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] | ||
#[serde(rename_all = "camelCase")] | ||
pub struct ArchiveStorageEventError { | ||
/// The human readable error message indicating what went wrong. | ||
pub error: String, | ||
} | ||
|
||
/// A storage item returned from the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription. | ||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] | ||
#[serde(rename_all = "camelCase")] | ||
pub struct ArchiveStorageEventItem<Hash> { | ||
/// String containing the hexadecimal-encoded key of the storage entry. | ||
pub key: Bytes, | ||
/// String containing the hexadecimal-encoded value of the storage entry. | ||
/// Returned when the request type is [`StorageQueryType::Value`] or [`StorageQueryType::DescendantsValues`]. | ||
pub value: Option<Bytes>, | ||
/// String containing the hexadecimal-encoded hash of the storage entry. | ||
/// Returned when the request type is [`StorageQueryType::Hash`] or [`StorageQueryType::DescendantsHashes`]. | ||
pub hash: Option<Hash>, | ||
/// String containing the hexadecimal-encoded Merkle value of the closest descendant of key (including branch nodes). | ||
/// Returned when the request type is [`StorageQueryType::ClosestDescendantMerkleValue`]. | ||
pub closest_descendant_merkle_value: Option<Bytes>, | ||
/// String containing the hexadecimal-encoded key of the child trie of the "default" namespace if the storage entry | ||
/// is part of a child trie. If the storage entry is part of the main trie, this field is not present. | ||
pub child_trie_key: Option<Bytes>, | ||
} | ||
|
||
/// Hex-serialized shim for `Vec<u8>`. | ||
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Hash, PartialOrd, Ord, Debug)] | ||
pub struct Bytes(#[serde(with = "impl_serde::serialize")] pub Vec<u8>); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This success flag is annoying, why do we need it at all?
result/value or error should be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah true; I just added it as an extra check to catch any weird responses eg success:false but a value!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't success part of the spec? whats the issue here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the flag is useful for pattern matching because you can have a union that looks like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep this is quite annoying in rust, IIRC we only left the success flag because it might be easier for js/ts realm interpret the data easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aight, I see