Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add archive RPCs to subxt-rpcs #1940

Merged
merged 5 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 265 additions & 9 deletions rpcs/src/methods/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> {
Ok(response)
}

/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the runtime API result.
/// Call the `chainHead_v1_call` method and return an operation ID to obtain the runtime API result.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
Expand Down Expand Up @@ -224,20 +224,16 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> {

/// Return the genesis hash.
pub async fn chainspec_v1_genesis_hash(&self) -> Result<T::Hash, Error> {
let hash = self
.client
self.client
.request("chainSpec_v1_genesisHash", rpc_params![])
.await?;
Ok(hash)
.await
}

/// Return a string containing the human-readable name of the chain.
pub async fn chainspec_v1_chain_name(&self) -> Result<String, Error> {
let hash = self
.client
self.client
.request("chainSpec_v1_chainName", rpc_params![])
.await?;
Ok(hash)
.await
}

/// Returns the JSON payload found in the chain specification under the key properties.
Expand Down Expand Up @@ -295,6 +291,132 @@ impl<T: RpcConfig> ChainHeadRpcMethods<T> {
.request("transaction_v1_stop", rpc_params![operation_id])
.await
}

/// Fetch the block body (ie the extrinsics in the block) given its hash.
///
/// Returns an array of the hexadecimal-encoded scale-encoded extrinsics found in the block,
/// or `None` if the block wasn't found.
pub async fn archive_unstable_body(
&self,
block_hash: T::Hash,
) -> Result<Option<Vec<Bytes>>, Error> {
self.client
.request("archive_unstable_body", rpc_params![block_hash])
.await
}

/// Call the `archive_unstable_call` method and return the response.
pub async fn archive_unstable_call(
&self,
block_hash: T::Hash,
function: &str,
call_parameters: &[u8],
) -> Result<ArchiveCallResult, Error> {
use serde::de::Error as _;

// We deserialize to this intermediate shape, since
// we can't have a boolean tag to denote variants.
#[derive(Deserialize)]
struct Response {
success: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This success flag is annoying, why do we need it at all?

result/value or error should be sufficient?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true; I just added it as an extra check to catch any weird responses eg success:false but a value!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't success part of the spec? whats the issue here

Copy link
Contributor

@ryanleecode ryanleecode Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the flag is useful for pattern matching because you can have a union that looks like this

S.Union(
  S.Struct({
    success: S.Literal(true),
    result: HexString,
  }),
  S.Struct({
    success: S.Literal(false),
    error: S.String,
  }),
  S.Null
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this is quite annoying in rust, IIRC we only left the success flag because it might be easier for js/ts realm interpret the data easier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aight, I see

value: Option<Bytes>,
error: Option<String>,
// This was accidentally used instead of value in Substrate,
// so to support those impls we try it here if needed:
result: Option<Bytes>,
}

let res: Response = self
.client
.request(
"archive_unstable_call",
rpc_params![block_hash, function, to_hex(call_parameters)],
)
.await?;

let value = res.value.or(res.result);
match (res.success, value, res.error) {
(true, Some(value), _) => Ok(ArchiveCallResult::Success(value)),
(false, _, err) => Ok(ArchiveCallResult::Error(err.unwrap_or(String::new()))),
(true, None, _) => {
let m = "archive_unstable_call: 'success: true' response should have `value: 0x1234` alongside it";
Err(Error::Deserialization(serde_json::Error::custom(m)))
}
}
}

/// Return the finalized block height of the chain.
pub async fn archive_unstable_finalized_height(&self) -> Result<usize, Error> {
self.client
.request("archive_unstable_finalizedHeight", rpc_params![])
.await
}

/// Return the genesis hash.
pub async fn archive_unstable_genesis_hash(&self) -> Result<T::Hash, Error> {
self.client
.request("archive_unstable_genesisHash", rpc_params![])
.await
}

/// Given a block height, return the hashes of the zero or more blocks at that height.
/// For blocks older than the latest finalized block, only one entry will be returned. For blocks
/// newer than the latest finalized block, it's possible to have 0, 1 or multiple blocks at
/// that height given that forks could occur.
pub async fn archive_unstable_hash_by_height(
&self,
height: usize,
) -> Result<Vec<T::Hash>, Error> {
self.client
.request("archive_unstable_hashByHeight", rpc_params![height])
.await
}

/// Fetch the header for a block with the given hash, or `None` if no block with that hash exists.
pub async fn archive_unstable_header(
&self,
block_hash: T::Hash,
) -> Result<Option<T::Header>, Error> {
let maybe_encoded_header: Option<Bytes> = self
.client
.request("archive_unstable_header", rpc_params![block_hash])
.await?;

let Some(encoded_header) = maybe_encoded_header else {
return Ok(None);
};

let header =
<T::Header as codec::Decode>::decode(&mut &*encoded_header.0).map_err(Error::Decode)?;
Ok(Some(header))
}

/// Query the node storage and return a subscription which streams corresponding storage events back.
pub async fn archive_unstable_storage(
&self,
block_hash: T::Hash,
items: impl IntoIterator<Item = StorageQuery<&[u8]>>,
child_key: Option<&[u8]>,
) -> Result<ArchiveStorageSubscription<T::Hash>, Error> {
let items: Vec<StorageQuery<String>> = items
.into_iter()
.map(|item| StorageQuery {
key: to_hex(item.key),
query_type: item.query_type,
})
.collect();

let sub = self
.client
.subscribe(
"archive_unstable_storage",
rpc_params![block_hash, items, child_key.map(to_hex)],
"archive_unstable_stopStorage",
)
.await?;

Ok(ArchiveStorageSubscription { sub, done: false })
}
}

/// This represents events generated by the `follow` method.
Expand Down Expand Up @@ -754,6 +876,140 @@ pub struct TransactionBlockDetails<Hash> {
pub index: u64,
}

/// The response from calling `archive_call`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ArchiveCallResult {
/// The bytes returned from successfully making a call
Success(Bytes),
/// An error returned if the call was not successful.
Error(String),
}

impl ArchiveCallResult {
/// Return the bytes on success, or `None` if not an [`ArchiveCallResult::Success`].
pub fn as_success(self) -> Option<Bytes> {
match self {
ArchiveCallResult::Success(bytes) => Some(bytes),
_ => None,
}
}

/// Return the error message on call failure, or `None` if not an [`ArchiveCallResult::Error`].
pub fn as_error(self) -> Option<String> {
match self {
ArchiveCallResult::Success(_) => None,
ArchiveCallResult::Error(e) => Some(e),
}
}
}

/// A subscription which returns follow events, and ends when a Stop event occurs.
pub struct ArchiveStorageSubscription<Hash> {
sub: RpcSubscription<ArchiveStorageEvent<Hash>>,
done: bool,
}

impl<Hash: BlockHash> ArchiveStorageSubscription<Hash> {
/// Fetch the next item in the stream.
pub async fn next(&mut self) -> Option<<Self as Stream>::Item> {
<Self as StreamExt>::next(self).await
}
/// Fetch the subscription ID for the stream.
pub fn subscription_id(&self) -> Option<&str> {
self.sub.subscription_id()
}
}

impl<Hash: BlockHash> Stream for ArchiveStorageSubscription<Hash> {
type Item = <RpcSubscription<ArchiveStorageEvent<Hash>> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}

let res = self.sub.poll_next_unpin(cx);

if let Poll::Ready(Some(Ok(ArchiveStorageEvent::Done | ArchiveStorageEvent::Error(..)))) =
&res
{
// No more events will occur after "done" or "error" events.
self.done = true;
}

res
}
}

/// Responses returned from [`ArchiveStorageSubscription`].
#[derive(Debug, Deserialize)]
#[serde(tag = "event")]
pub enum ArchiveStorageEvent<Hash> {
/// A storage response for one of the requested items.
#[serde(rename = "storage")]
Item(ArchiveStorageEventItem<Hash>),
/// A human-readable error indicating what went wrong. No more storage events
/// will be emitted after this.
#[serde(rename = "storageError")]
Error(ArchiveStorageEventError),
/// No more storage events will be emitted after this.
#[serde(rename = "storageDone")]
Done,
}

impl<Hash> ArchiveStorageEvent<Hash> {
/// Return a storage item or `None` if not an [`ArchiveStorageEvent::Item`].
pub fn as_item(self) -> Option<ArchiveStorageEventItem<Hash>> {
match self {
ArchiveStorageEvent::Item(item) => Some(item),
_ => None,
}
}

/// Return a storage error or `None` if not an [`ArchiveStorageEvent::Error`].
pub fn as_error(self) -> Option<ArchiveStorageEventError> {
match self {
ArchiveStorageEvent::Error(e) => Some(e),
_ => None,
}
}

/// Is this an [`ArchiveStorageEvent::Done`].
pub fn is_done(self) -> bool {
matches!(self, ArchiveStorageEvent::Done)
}
}

/// Something went wrong during the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ArchiveStorageEventError {
/// The human readable error message indicating what went wrong.
pub error: String,
}

/// A storage item returned from the [`ChainHeadRpcMethods::archive_unstable_storage()`] subscription.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ArchiveStorageEventItem<Hash> {
/// String containing the hexadecimal-encoded key of the storage entry.
pub key: Bytes,
/// String containing the hexadecimal-encoded value of the storage entry.
/// Returned when the request type is [`StorageQueryType::Value`] or [`StorageQueryType::DescendantsValues`].
pub value: Option<Bytes>,
/// String containing the hexadecimal-encoded hash of the storage entry.
/// Returned when the request type is [`StorageQueryType::Hash`] or [`StorageQueryType::DescendantsHashes`].
pub hash: Option<Hash>,
/// String containing the hexadecimal-encoded Merkle value of the closest descendant of key (including branch nodes).
/// Returned when the request type is [`StorageQueryType::ClosestDescendantMerkleValue`].
pub closest_descendant_merkle_value: Option<Bytes>,
/// String containing the hexadecimal-encoded key of the child trie of the "default" namespace if the storage entry
/// is part of a child trie. If the storage entry is part of the main trie, this field is not present.
pub child_trie_key: Option<Bytes>,
}

/// Hex-serialized shim for `Vec<u8>`.
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Hash, PartialOrd, Ord, Debug)]
pub struct Bytes(#[serde(with = "impl_serde::serialize")] pub Vec<u8>);
Expand Down
Loading
Loading