Skip to content

Commit

Permalink
feat(net): implement program activation through Remoting
Browse files Browse the repository at this point in the history
  • Loading branch information
DennisInSky committed Oct 30, 2024
1 parent e300ff8 commit 6c11b8e
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 93 deletions.
1 change: 1 addition & 0 deletions net/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PackageVersion Include="Roslynator.Analyzers" Version="4.12.9" />
<PackageVersion Include="Roslynator.Formatting.Analyzers" Version="4.12.9" />
<PackageVersion Include="Substrate.NET.API" Version="0.9.24-rc6" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
</ItemGroup>

Expand Down
30 changes: 30 additions & 0 deletions net/src/Sails.Remoting.Abstractions/ActivationResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Substrate.Gear.Api.Generated.Model.gprimitives;

namespace Sails.Remoting.Abstractions;

/// <summary>
/// Represents result returned from the <see cref="IRemoting.ActivateAsync" /> method.
/// </summary>
public abstract class ActivationResult : IAsyncDisposable
{
public async ValueTask DisposeAsync()
{
await this.DisposeCoreAsync().ConfigureAwait(false);

GC.SuppressFinalize(this);
}

/// <summary>
/// Reads ProgramId of the activated program and SCALE-encoded reply
/// from its activation method.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public abstract Task<(ActorId ProgramId, byte[] EncodedPayload)> ReadReplyAsync(CancellationToken cancellationToken);

protected virtual ValueTask DisposeCoreAsync()
=> new();
}
4 changes: 2 additions & 2 deletions net/src/Sails.Remoting.Abstractions/IRemoting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public interface IRemoting
/// <param name="gasLimit"></param>
/// <param name="value"></param>
/// <param name="cancellationToken"></param>
/// <returns>A task for obtaining activated program ID and SCALE-encoded reply.</returns>
Task<Task<(ActorId ProgramId, byte[] EncodedReply)>> ActivateAsync(
/// <returns></returns>
Task<ActivationResult> ActivateAsync(
CodeId codeId,
IReadOnlyCollection<byte> salt,
IReadOnlyCollection<byte> encodedPayload,
Expand Down
2 changes: 1 addition & 1 deletion net/src/Sails.Remoting.Abstractions/IRemotingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Sails.Remoting.Abstractions;
public static class IRemotingExtensions
{
/// <inheritdoc cref="IRemoting.ActivateAsync(CodeId, IReadOnlyCollection{byte}, IReadOnlyCollection{byte}, GasUnit?, ValueUnit, CancellationToken)"/>
public static Task<Task<(ActorId ProgramId, byte[] EncodedReply)>> ActivateAsync(
public static Task<ActivationResult> ActivateAsync(
this IRemoting remoting,
CodeId codeId,
IReadOnlyCollection<byte> salt,
Expand Down
132 changes: 132 additions & 0 deletions net/src/Sails.Remoting/ActivationResultViaNodeClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Sails.Remoting.Abstractions;
using StreamJsonRpc;
using Substrate.Gear.Api.Generated;
using Substrate.Gear.Api.Generated.Model.frame_system;
using Substrate.Gear.Api.Generated.Model.gear_core.message.user;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using Substrate.Gear.Api.Generated.Model.vara_runtime;
using Substrate.Gear.Client;
using Substrate.Gear.Client.Model.Types.Base;
using Substrate.NetApi.Model.Types.Primitive;
using EnumGearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.EnumEvent;
using ExtrinsicInfo = Substrate.Gear.Client.ExtrinsicInfo;
using GearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.Event;
using MessageQueuedEventData = Substrate.NetApi.Model.Types.Base.BaseTuple<
Substrate.Gear.Api.Generated.Model.gprimitives.MessageId,
Substrate.Gear.Api.Generated.Model.sp_core.crypto.AccountId32,
Substrate.Gear.Api.Generated.Model.gprimitives.ActorId,
Substrate.Gear.Api.Generated.Model.gear_common.@event.EnumMessageEntry>;
using UserMessageSentEventData = Substrate.NetApi.Model.Types.Base.BaseTuple<
Substrate.Gear.Api.Generated.Model.gear_core.message.user.UserMessage,
Substrate.NetApi.Model.Types.Base.BaseOpt<Substrate.NetApi.Model.Types.Primitive.U32>>;

namespace Sails.Remoting;

internal sealed class ActivationResultViaNodeClient : ActivationResult
{
public static async Task<ActivationResultViaNodeClient> FromExecutionAsync(
SubstrateClientExt nodeClient,
Func<SubstrateClientExt, Task<ExtrinsicInfo>> executeExtrinsic,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
EnsureArg.IsNotNull(executeExtrinsic, nameof(executeExtrinsic));

var blocksStream = await nodeClient.GetAllBlocksStreamAsync(cancellationToken).ConfigureAwait(false);
try
{
var extrinsicInfo = await executeExtrinsic(nodeClient).ConfigureAwait(false);

var extrinsicBlockEvents = await nodeClient.ListBlockEventsAsync(
extrinsicInfo.BlockHash,
cancellationToken)
.ConfigureAwait(false);

// TODO: Requires checking for System.ExtrinsicFailed event and throwing an exception with
// details from it. (type + details)

var messageQueuedEventData = extrinsicBlockEvents
.Where(
eventRecord =>
eventRecord.Phase.Matches(
Phase.ApplyExtrinsic,
(U32 extrinsicIdxInBlock) => extrinsicIdxInBlock.Value == extrinsicInfo.IndexInBlock))
.Select(
eventRecord => eventRecord.Event)
.SelectIfMatches(
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent)
.SelectIfMatches(
GearEvent.MessageQueued,
(MessageQueuedEventData data) => data)
.SingleOrDefault()
?? throw new Exception("TODO: Custom exception. Something terrible happened.");

var result = new ActivationResultViaNodeClient(nodeClient, blocksStream, messageQueuedEventData);
blocksStream = null;
return result;
}
finally
{
if (blocksStream is not null)
{
await blocksStream.DisposeAsync().ConfigureAwait(false);
}
}
}

private ActivationResultViaNodeClient(
SubstrateClientExt nodeClient,
BlocksStream blocksStream,
MessageQueuedEventData messageQueuedEventData)
{
this.nodeClient = nodeClient;
this.blocksStream = blocksStream;
this.messageQueuedEventData = messageQueuedEventData;
}

private readonly SubstrateClientExt nodeClient;
private readonly BlocksStream blocksStream;
private readonly MessageQueuedEventData messageQueuedEventData;

protected override ValueTask DisposeCoreAsync()
=> this.blocksStream.DisposeAsync();

public override async Task<(ActorId ProgramId, byte[] EncodedPayload)> ReadReplyAsync(CancellationToken cancellationToken)
{
var queuedMessageId = (MessageId)this.messageQueuedEventData.Value[0];
var activatedProgramId = (ActorId)this.messageQueuedEventData.Value[2];

var replyMessage = await this.blocksStream.ReadAllHeadersAsync(cancellationToken)
.SelectAwait(
async blockHeader =>
await this.nodeClient.ListBlockEventsAsync(blockHeader.Number, cancellationToken) // TODO: It is weird block header doesn't contain hash.
.ConfigureAwait(false))
.SelectMany(
eventRecords => eventRecords.AsAsyncEnumerable())
.Select(
eventRecord => eventRecord.Event)
.SelectIfMatches(
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent)
.SelectIfMatches(
GearEvent.UserMessageSent,
(UserMessageSentEventData data) => (UserMessage)data.Value[0])
.FirstAsync(
userMessage => userMessage.Details.OptionFlag
&& userMessage.Details.Value.To.IsEqualTo(queuedMessageId),
cancellationToken)
.ConfigureAwait(false);

var replyPayload = replyMessage.Payload.Value.Value
.Select(@byte => @byte.Value)
.ToArray();

return (activatedProgramId, replyPayload);
}
}
60 changes: 10 additions & 50 deletions net/src/Sails.Remoting/RemotingViaNodeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,14 @@
using EnsureThat;
using Sails.Remoting.Abstractions;
using Substrate.Gear.Api.Generated;
using Substrate.Gear.Api.Generated.Model.frame_system;
using Substrate.Gear.Api.Generated.Model.gprimitives;
using Substrate.Gear.Api.Generated.Model.vara_runtime;
using Substrate.Gear.Api.Generated.Storage;
using Substrate.Gear.Client;
using Substrate.Gear.Client.Model.Types;
using Substrate.Gear.Client.Model.Types.Base;
using Substrate.NetApi.Model.Types;
using Substrate.NetApi.Model.Types.Base;
using Substrate.NetApi.Model.Types.Primitive;
using EnumGearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.EnumEvent;
using GasUnit = Substrate.NetApi.Model.Types.Primitive.U64;
using GearEvent = Substrate.Gear.Api.Generated.Model.pallet_gear.pallet.Event;
using MessageQueuedGearEventData = Substrate.NetApi.Model.Types.Base.BaseTuple<
Substrate.Gear.Api.Generated.Model.gprimitives.MessageId,
Substrate.Gear.Api.Generated.Model.sp_core.crypto.AccountId32,
Substrate.Gear.Api.Generated.Model.gprimitives.ActorId,
Substrate.Gear.Api.Generated.Model.gear_common.@event.EnumMessageEntry>;
using ValueUnit = Substrate.NetApi.Model.Types.Primitive.U128;

namespace Sails.Remoting;
Expand Down Expand Up @@ -56,7 +46,7 @@ public RemotingViaNodeClient(
private readonly Account signingAccount;

/// <inheritdoc/>
public async Task<Task<(ActorId ProgramId, byte[] EncodedReply)>> ActivateAsync(
public async Task<ActivationResult> ActivateAsync(
CodeId codeId,
IReadOnlyCollection<byte> salt,
IReadOnlyCollection<byte> encodedPayload,
Expand Down Expand Up @@ -87,46 +77,16 @@ public RemotingViaNodeClient(
value,
keep_alive: new Bool(true));

var (blockHash, extrinsicHash, extrinsicIdx) = await nodeClient.ExecuteExtrinsicAsync(
this.signingAccount,
createProgram,
DefaultExtrinsicTtlInBlocks,
cancellationToken)
.ConfigureAwait(false);

// It can be moved inside the task to return.
var blockEvents = await nodeClient.ListBlockEventsAsync(
blockHash,
return await ActivationResultViaNodeClient.FromExecutionAsync(
nodeClient,
async nodeClient => await nodeClient.ExecuteExtrinsicAsync(
this.signingAccount,
createProgram,
DefaultExtrinsicTtlInBlocks,
cancellationToken)
.ConfigureAwait(false),
cancellationToken)
.ConfigureAwait(false);

var messageQueuedGearEventData = blockEvents
.Where(
blockEvent =>
blockEvent.Phase.Matches(
Phase.ApplyExtrinsic,
(U32 blockExtrinsicIdx) => blockExtrinsicIdx.Value == extrinsicIdx))
.Select(
blockEvents =>
blockEvents.Event)
.SelectIfMatches(
RuntimeEvent.Gear,
(EnumGearEvent gearEvent) => gearEvent)
.SelectIfMatches(
GearEvent.MessageQueued,
(MessageQueuedGearEventData data) => data)
.SingleOrDefault()
?? throw new Exception("TODO: Custom exception. Something terrible happened.");

var programId = (ActorId)messageQueuedGearEventData.Value[2];

static Task<(ActorId ProgramId, byte[] EncodedPayload)> ReceiveReply(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
throw new NotImplementedException();
}

return ReceiveReply(cancellationToken);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -158,7 +118,7 @@ public async Task<Task<byte[]>> MessageAsync(
value,
keep_alive: new Bool(true));

var (blockHash, extrinsicHash, extrinsicIdx) = await nodeClient.ExecuteExtrinsicAsync(
var extrinsicInfo = await nodeClient.ExecuteExtrinsicAsync(
this.signingAccount,
sendMessage,
DefaultExtrinsicTtlInBlocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using EnsureThat;
using Substrate.Gear.Api.Generated;
using Substrate.NetApi;
using Substrate.NetApi.Model.Rpc;

namespace Substrate.Gear.Client;

public sealed class BlockHeadersStream : IAsyncDisposable
public sealed class BlocksStream : IAsyncDisposable
{
internal static async Task<BlockHeadersStream> CreateAsync(
SubstrateClientExt nodeClient,
Func<SubstrateClientExt, Action<string, Header>, Task<string>> subscribe,
Func<SubstrateClientExt, string, Task> unsubscribe)
internal static async Task<BlocksStream> CreateAsync(
SubstrateClient nodeClient,
Func<SubstrateClient, Action<string, Header>, Task<string>> subscribe,
Func<SubstrateClient, string, Task> unsubscribe)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
EnsureArg.IsNotNull(subscribe, nameof(subscribe));
Expand All @@ -32,12 +32,12 @@ internal static async Task<BlockHeadersStream> CreateAsync(
(_, blockHeader) => channel.Writer.TryWrite(blockHeader))
.ConfigureAwait(false);

return new BlockHeadersStream(
return new BlocksStream(
channel,
() => unsubscribe(nodeClient, subscriptionId));
}

private BlockHeadersStream(Channel<Header> channel, Func<Task> unsubscribe)
private BlocksStream(Channel<Header> channel, Func<Task> unsubscribe)
{
this.channel = channel;
this.unsubscribe = unsubscribe;
Expand All @@ -57,12 +57,12 @@ public async ValueTask DisposeAsync()
}

/// <summary>
/// Returns all finalized block headers since the stream was created.
/// Returns all block headers since the stream was created or the last call to this method.
/// Only one read operation is allowed at a time.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public IAsyncEnumerable<Header> ReadAllAsync(CancellationToken cancellationToken)
public IAsyncEnumerable<Header> ReadAllHeadersAsync(CancellationToken cancellationToken)
{
return Interlocked.CompareExchange(ref this.isReadInProgress, 1, 0) == 0
? ReadAllImpl(cancellationToken)
Expand Down
19 changes: 19 additions & 0 deletions net/src/Substrate.Gear.Client/ExtrinsicInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Substrate.NetApi.Model.Types.Base;

namespace Substrate.Gear.Client;

public sealed record ExtrinsicInfo
{
/// <summary>
/// Hash of block in which extrinsic was included.
/// </summary>
public required Hash BlockHash { get; init; }
/// <summary>
/// Index of extrinsic in block.
/// </summary>
public uint IndexInBlock { get; init; }
/// <summary>
/// Hash of extrinsic itself.
/// </summary>
public required Hash Hash { get; init; }
}
Loading

0 comments on commit 6c11b8e

Please sign in to comment.