From b9dcc74cfbad89c1283a0dc7547a55f34c60effc Mon Sep 17 00:00:00 2001 From: Oliver Weichhold Date: Sun, 17 Oct 2021 14:33:54 +0200 Subject: [PATCH] Implement Websocket subscriptions for Geth (Ethereum) --- .../Blockchain/Ethereum/EthereumJobManager.cs | 115 ++++++++++++++++-- .../Ethereum/EthereumStratumMethods.cs | 2 +- src/Miningcore/JsonRpc/RpcClient.cs | 15 ++- 3 files changed, 110 insertions(+), 22 deletions(-) diff --git a/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs b/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs index ae983373c..1d9cdb8b4 100644 --- a/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs +++ b/src/Miningcore/Blockchain/Ethereum/EthereumJobManager.cs @@ -5,9 +5,12 @@ using System.Linq; using System.Numerics; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; +using System.Text; using System.Threading; using System.Threading.Tasks; using Autofac; +using Miningcore.Blockchain.Bitcoin; using Miningcore.Blockchain.Ethereum.Configuration; using Miningcore.Blockchain.Ethereum.DaemonResponses; using Miningcore.Configuration; @@ -25,6 +28,8 @@ using Contract = Miningcore.Contracts.Contract; using EC = Miningcore.Blockchain.Ethereum.EthCommands; using static Miningcore.Util.ActionUtils; +using System.Reactive; +using System.Reactive.Concurrency; namespace Miningcore.Blockchain.Ethereum { @@ -65,24 +70,24 @@ public EthereumJobManager( private EthereumPoolConfigExtra extraPoolConfig; private readonly JsonSerializer serializer; - protected async Task UpdateJobAsync(CancellationToken ct) + protected async Task UpdateJob(CancellationToken ct, string via = null) { logger.LogInvoke(); try { - return UpdateJob(await GetBlockTemplateAsync(ct)); + return UpdateJob(await GetBlockTemplateAsync(ct), via); } catch(Exception ex) { - logger.Error(ex, () => $"Error during {nameof(UpdateJobAsync)}"); + logger.Error(ex, () => $"Error during {nameof(UpdateJob)}"); } return false; } - protected bool UpdateJob(EthereumBlockTemplate blockTemplate) + protected bool UpdateJob(EthereumBlockTemplate blockTemplate, string via = null) { logger.LogInvoke(); @@ -121,6 +126,8 @@ protected bool UpdateJob(EthereumBlockTemplate blockTemplate) currentJob = job; + logger.Info(() => $"New work at height {currentJob.BlockTemplate.Height} and header {currentJob.BlockTemplate.Header} via [{(via ?? "Unknown")}]"); + // update stats BlockchainStats.LastNetworkBlockTime = clock.Now; BlockchainStats.BlockHeight = job.BlockTemplate.Height; @@ -400,6 +407,8 @@ public async ValueTask SubmitShareAsync(StratumConnection worker, string[ if(share.IsBlockCandidate) { logger.Info(() => $"Daemon accepted block {share.BlockHeight} submitted by {context.Miner}"); + + OnBlockFound(); } } @@ -528,7 +537,7 @@ protected override async Task PostStartInitAsync(CancellationToken ct) ConfigureRewards(); - SetupJobUpdates(ct); + await SetupJobUpdates(ct); } private void ConfigureRewards() @@ -550,18 +559,98 @@ private void ConfigureRewards() } } - protected virtual void SetupJobUpdates(CancellationToken cancellationToken) + protected virtual async Task SetupJobUpdates(CancellationToken ct) { var pollingInterval = poolConfig.BlockRefreshInterval > 0 ? poolConfig.BlockRefreshInterval : 1000; - Jobs = Observable.Interval(TimeSpan.FromMilliseconds(pollingInterval)) - .Select(_ => Observable.FromAsync(UpdateJobAsync)) - .Concat() - .Do(isNew => + var blockSubmission = blockFoundSubject.Synchronize(); + var pollTimerRestart = blockFoundSubject.Synchronize(); + + var triggers = new List> + { + blockSubmission.Select(x => (JobRefreshBy.BlockFound, (string) null)) + }; + + var enableStreaming = extraPoolConfig?.EnableDaemonWebsocketStreaming == true; + + var streamingEndpoint = daemonEndpoints + .Where(x => x.Extra.SafeExtensionDataAs() != null) + .Select(x=> Tuple.Create(x, x.Extra.SafeExtensionDataAs())) + .FirstOrDefault(); + + if(enableStreaming && streamingEndpoint.Item2?.PortWs.HasValue != true) + { + logger.Warn(() => $"'{nameof(EthereumPoolConfigExtra.EnableDaemonWebsocketStreaming).ToLowerCamelCase()}' enabled but not a single daemon found with a configured websocket port ('{nameof(EthereumDaemonEndpointConfigExtra.PortWs).ToLowerCamelCase()}'). Falling back to polling."); + enableStreaming = false; + } + + if(enableStreaming) + { + var (endpointConfig, extra) = streamingEndpoint; + + var wsEndpointConfig = new DaemonEndpointConfig + { + Host = endpointConfig.Host, + Port = extra.PortWs.Value, + HttpPath = extra.HttpPathWs, + Ssl = extra.SslWs + }; + + logger.Info(() => $"Subscribing to WebSocket {(wsEndpointConfig.Ssl ? "wss" : "ws")}://{wsEndpointConfig.Host}:{wsEndpointConfig.Port}"); + + var wsSubscription = "newHeads"; + var isRetry = false; + retry: + + // stream work updates + var getWorkObs = rpcClient.WebsocketSubscribe(logger, ct, wsEndpointConfig, EC.Subscribe, new[] { wsSubscription }) + .Publish() + .RefCount(); + + // test subscription + var subcriptionResponse = await getWorkObs + .Take(1) + .Select(x => JsonConvert.DeserializeObject>(Encoding.UTF8.GetString(x))) + .ToTask(ct); + + if(subcriptionResponse.Error != null) { - if(isNew) - logger.Info(() => $"New work at height {currentJob.BlockTemplate.Height} and header {currentJob.BlockTemplate.Header} detected [{JobRefreshBy.Poll}]"); - }) + // older versions of geth only support subscriptions to "newBlocks" + if(!isRetry && subcriptionResponse.Error.Code == (int) BitcoinRPCErrorCode.RPC_METHOD_NOT_FOUND) + { + wsSubscription = "newBlocks"; + + isRetry = true; + goto retry; + } + + logger.ThrowLogPoolStartupException($"Unable to subscribe to geth websocket '{wsSubscription}': {subcriptionResponse.Error.Message} [{subcriptionResponse.Error.Code}]"); + } + + var blockNotify = getWorkObs.Where(x => x != null) + .Do(x => Console.WriteLine("** WS")) + .Publish() + .RefCount(); + + pollTimerRestart = Observable.Merge( + blockSubmission, + blockNotify.Select(_ => Unit.Default)) + .Publish() + .RefCount(); + + // Websocket + triggers.Add(blockNotify + .Select(_ => (JobRefreshBy.WebSocket, (string) null))); + } + + triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(pollingInterval)) + .TakeUntil(pollTimerRestart) + .Select(_ => (JobRefreshBy.Poll, (string) null)) + .Repeat()); + + Jobs = Observable.Merge(triggers) + .Select(x => Observable.FromAsync(() => UpdateJob(ct, x.Via))) + .Concat() .Where(isNew => isNew) .Select(_ => GetJobParamsForStratum(true)) .Publish() diff --git a/src/Miningcore/Blockchain/Ethereum/EthereumStratumMethods.cs b/src/Miningcore/Blockchain/Ethereum/EthereumStratumMethods.cs index 79779aae0..afebdf5e9 100644 --- a/src/Miningcore/Blockchain/Ethereum/EthereumStratumMethods.cs +++ b/src/Miningcore/Blockchain/Ethereum/EthereumStratumMethods.cs @@ -1,6 +1,6 @@ namespace Miningcore.Blockchain.Ethereum { - public class EthereumStratumMethods + public static class EthereumStratumMethods { /// /// Used to subscribe to work from a server, required before all other communication. diff --git a/src/Miningcore/JsonRpc/RpcClient.cs b/src/Miningcore/JsonRpc/RpcClient.cs index 5a136f2d3..ca6e6e2cd 100644 --- a/src/Miningcore/JsonRpc/RpcClient.cs +++ b/src/Miningcore/JsonRpc/RpcClient.cs @@ -120,16 +120,15 @@ public async Task[]> ExecuteBatchAsync(ILogger logger, Cance } } - public IObservable WebsocketSubscribe(ILogger logger, CancellationToken ct, Dictionary portMap, string method, object payload = null, + public IObservable WebsocketSubscribe(ILogger logger, CancellationToken ct, DaemonEndpointConfig endPoint, + string method, object payload = null, JsonSerializerSettings payloadJsonSerializerSettings = null) { Contract.Requires(!string.IsNullOrEmpty(method), $"{nameof(method)} must not be empty"); logger.LogInvoke(new object[] { method }); - return Observable.Merge(portMap.Keys - .Select(endPoint => WebsocketSubscribeEndpoint(logger, ct, endPoint, portMap[endPoint], method, payload, payloadJsonSerializerSettings))) + return WebsocketSubscribeEndpoint(logger, ct, endPoint, method, payload, payloadJsonSerializerSettings) .Publish() .RefCount(); } @@ -260,8 +259,8 @@ protected string GetRequestId() return rpcRequestId; } - private IObservable WebsocketSubscribeEndpoint(ILogger logger, CancellationToken ct, NetworkEndpointConfig endPoint, - (int Port, string HttpPath, bool Ssl) conf, string method, object payload = null, + private IObservable WebsocketSubscribeEndpoint(ILogger logger, CancellationToken ct, + DaemonEndpointConfig endPoint, string method, object payload = null, JsonSerializerSettings payloadJsonSerializerSettings = null) { return Observable.Defer(() => Observable.Create(obs => @@ -281,8 +280,8 @@ private IObservable WebsocketSubscribeEndpoint(ILogger logger, Cancellat using(var client = new ClientWebSocket()) { // connect - var protocol = conf.Ssl ? "wss" : "ws"; - var uri = new Uri($"{protocol}://{endPoint.Host}:{conf.Port}{conf.HttpPath}"); + var protocol = endPoint.Ssl ? "wss" : "ws"; + var uri = new Uri($"{protocol}://{endPoint.Host}:{endPoint.Port}{endPoint.HttpPath}"); client.Options.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true; logger.Debug(() => $"Establishing WebSocket connection to {uri}");