Skip to content
This repository has been archived by the owner on Oct 20, 2023. It is now read-only.

Commit

Permalink
Implement Websocket subscriptions for Geth (Ethereum)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Weichhold committed Oct 17, 2021
1 parent 089f16d commit b9dcc74
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 22 deletions.
115 changes: 102 additions & 13 deletions src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
using System.Linq;
using System.Numerics;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Miningcore.Blockchain.Bitcoin;
using Miningcore.Blockchain.Ethereum.Configuration;
using Miningcore.Blockchain.Ethereum.DaemonResponses;
using Miningcore.Configuration;
Expand All @@ -25,6 +28,8 @@
using Contract = Miningcore.Contracts.Contract;
using EC = Miningcore.Blockchain.Ethereum.EthCommands;
using static Miningcore.Util.ActionUtils;
using System.Reactive;
using System.Reactive.Concurrency;

namespace Miningcore.Blockchain.Ethereum
{
Expand Down Expand Up @@ -65,24 +70,24 @@ public EthereumJobManager(
private EthereumPoolConfigExtra extraPoolConfig;
private readonly JsonSerializer serializer;

protected async Task<bool> UpdateJobAsync(CancellationToken ct)
protected async Task<bool> UpdateJob(CancellationToken ct, string via = null)
{
logger.LogInvoke();

try
{
return UpdateJob(await GetBlockTemplateAsync(ct));
return UpdateJob(await GetBlockTemplateAsync(ct), via);
}

catch(Exception ex)
{
logger.Error(ex, () => $"Error during {nameof(UpdateJobAsync)}");
logger.Error(ex, () => $"Error during {nameof(UpdateJob)}");
}

return false;
}

protected bool UpdateJob(EthereumBlockTemplate blockTemplate)
protected bool UpdateJob(EthereumBlockTemplate blockTemplate, string via = null)
{
logger.LogInvoke();

Expand Down Expand Up @@ -121,6 +126,8 @@ protected bool UpdateJob(EthereumBlockTemplate blockTemplate)

currentJob = job;

logger.Info(() => $"New work at height {currentJob.BlockTemplate.Height} and header {currentJob.BlockTemplate.Header} via [{(via ?? "Unknown")}]");

// update stats
BlockchainStats.LastNetworkBlockTime = clock.Now;
BlockchainStats.BlockHeight = job.BlockTemplate.Height;
Expand Down Expand Up @@ -400,6 +407,8 @@ public async ValueTask<Share> SubmitShareAsync(StratumConnection worker, string[
if(share.IsBlockCandidate)
{
logger.Info(() => $"Daemon accepted block {share.BlockHeight} submitted by {context.Miner}");

OnBlockFound();
}
}

Expand Down Expand Up @@ -528,7 +537,7 @@ protected override async Task PostStartInitAsync(CancellationToken ct)

ConfigureRewards();

SetupJobUpdates(ct);
await SetupJobUpdates(ct);
}

private void ConfigureRewards()
Expand All @@ -550,18 +559,98 @@ private void ConfigureRewards()
}
}

protected virtual void SetupJobUpdates(CancellationToken cancellationToken)
protected virtual async Task SetupJobUpdates(CancellationToken ct)
{
var pollingInterval = poolConfig.BlockRefreshInterval > 0 ? poolConfig.BlockRefreshInterval : 1000;

Jobs = Observable.Interval(TimeSpan.FromMilliseconds(pollingInterval))
.Select(_ => Observable.FromAsync(UpdateJobAsync))
.Concat()
.Do(isNew =>
var blockSubmission = blockFoundSubject.Synchronize();
var pollTimerRestart = blockFoundSubject.Synchronize();

var triggers = new List<IObservable<(string Via, string Data)>>
{
blockSubmission.Select(x => (JobRefreshBy.BlockFound, (string) null))
};

var enableStreaming = extraPoolConfig?.EnableDaemonWebsocketStreaming == true;

var streamingEndpoint = daemonEndpoints
.Where(x => x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>() != null)
.Select(x=> Tuple.Create(x, x.Extra.SafeExtensionDataAs<EthereumDaemonEndpointConfigExtra>()))
.FirstOrDefault();

if(enableStreaming && streamingEndpoint.Item2?.PortWs.HasValue != true)
{
logger.Warn(() => $"'{nameof(EthereumPoolConfigExtra.EnableDaemonWebsocketStreaming).ToLowerCamelCase()}' enabled but not a single daemon found with a configured websocket port ('{nameof(EthereumDaemonEndpointConfigExtra.PortWs).ToLowerCamelCase()}'). Falling back to polling.");
enableStreaming = false;
}

if(enableStreaming)
{
var (endpointConfig, extra) = streamingEndpoint;

var wsEndpointConfig = new DaemonEndpointConfig
{
Host = endpointConfig.Host,
Port = extra.PortWs.Value,
HttpPath = extra.HttpPathWs,
Ssl = extra.SslWs
};

logger.Info(() => $"Subscribing to WebSocket {(wsEndpointConfig.Ssl ? "wss" : "ws")}://{wsEndpointConfig.Host}:{wsEndpointConfig.Port}");

var wsSubscription = "newHeads";
var isRetry = false;
retry:

// stream work updates
var getWorkObs = rpcClient.WebsocketSubscribe(logger, ct, wsEndpointConfig, EC.Subscribe, new[] { wsSubscription })
.Publish()
.RefCount();

// test subscription
var subcriptionResponse = await getWorkObs
.Take(1)
.Select(x => JsonConvert.DeserializeObject<JsonRpcResponse<string>>(Encoding.UTF8.GetString(x)))
.ToTask(ct);

if(subcriptionResponse.Error != null)
{
if(isNew)
logger.Info(() => $"New work at height {currentJob.BlockTemplate.Height} and header {currentJob.BlockTemplate.Header} detected [{JobRefreshBy.Poll}]");
})
// older versions of geth only support subscriptions to "newBlocks"
if(!isRetry && subcriptionResponse.Error.Code == (int) BitcoinRPCErrorCode.RPC_METHOD_NOT_FOUND)
{
wsSubscription = "newBlocks";

isRetry = true;
goto retry;
}

logger.ThrowLogPoolStartupException($"Unable to subscribe to geth websocket '{wsSubscription}': {subcriptionResponse.Error.Message} [{subcriptionResponse.Error.Code}]");
}

var blockNotify = getWorkObs.Where(x => x != null)
.Do(x => Console.WriteLine("** WS"))
.Publish()
.RefCount();

pollTimerRestart = Observable.Merge(
blockSubmission,
blockNotify.Select(_ => Unit.Default))
.Publish()
.RefCount();

// Websocket
triggers.Add(blockNotify
.Select(_ => (JobRefreshBy.WebSocket, (string) null)));
}

triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(pollingInterval))
.TakeUntil(pollTimerRestart)
.Select(_ => (JobRefreshBy.Poll, (string) null))
.Repeat());

Jobs = Observable.Merge(triggers)
.Select(x => Observable.FromAsync(() => UpdateJob(ct, x.Via)))
.Concat()
.Where(isNew => isNew)
.Select(_ => GetJobParamsForStratum(true))
.Publish()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Miningcore.Blockchain.Ethereum
{
public class EthereumStratumMethods
public static class EthereumStratumMethods
{
/// <summary>
/// Used to subscribe to work from a server, required before all other communication.
Expand Down
15 changes: 7 additions & 8 deletions src/Miningcore/JsonRpc/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,15 @@ public async Task<RpcResponse<JToken>[]> ExecuteBatchAsync(ILogger logger, Cance
}
}

public IObservable<byte[]> WebsocketSubscribe(ILogger logger, CancellationToken ct, Dictionary<DaemonEndpointConfig,
(int Port, string HttpPath, bool Ssl)> portMap, string method, object payload = null,
public IObservable<byte[]> WebsocketSubscribe(ILogger logger, CancellationToken ct, DaemonEndpointConfig endPoint,
string method, object payload = null,
JsonSerializerSettings payloadJsonSerializerSettings = null)
{
Contract.Requires<ArgumentException>(!string.IsNullOrEmpty(method), $"{nameof(method)} must not be empty");

logger.LogInvoke(new object[] { method });

return Observable.Merge(portMap.Keys
.Select(endPoint => WebsocketSubscribeEndpoint(logger, ct, endPoint, portMap[endPoint], method, payload, payloadJsonSerializerSettings)))
return WebsocketSubscribeEndpoint(logger, ct, endPoint, method, payload, payloadJsonSerializerSettings)
.Publish()
.RefCount();
}
Expand Down Expand Up @@ -260,8 +259,8 @@ protected string GetRequestId()
return rpcRequestId;
}

private IObservable<byte[]> WebsocketSubscribeEndpoint(ILogger logger, CancellationToken ct, NetworkEndpointConfig endPoint,
(int Port, string HttpPath, bool Ssl) conf, string method, object payload = null,
private IObservable<byte[]> WebsocketSubscribeEndpoint(ILogger logger, CancellationToken ct,
DaemonEndpointConfig endPoint, string method, object payload = null,
JsonSerializerSettings payloadJsonSerializerSettings = null)
{
return Observable.Defer(() => Observable.Create<byte[]>(obs =>
Expand All @@ -281,8 +280,8 @@ private IObservable<byte[]> WebsocketSubscribeEndpoint(ILogger logger, Cancellat
using(var client = new ClientWebSocket())
{
// connect
var protocol = conf.Ssl ? "wss" : "ws";
var uri = new Uri($"{protocol}://{endPoint.Host}:{conf.Port}{conf.HttpPath}");
var protocol = endPoint.Ssl ? "wss" : "ws";
var uri = new Uri($"{protocol}://{endPoint.Host}:{endPoint.Port}{endPoint.HttpPath}");
client.Options.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true;

logger.Debug(() => $"Establishing WebSocket connection to {uri}");
Expand Down

0 comments on commit b9dcc74

Please sign in to comment.