From 8f61bb0a7248711b90528afebf1b6a8987d6dcce Mon Sep 17 00:00:00 2001 From: Sean Gilligan Date: Thu, 5 Oct 2023 18:28:34 -0700 Subject: [PATCH] Add ChainTipPublisher class, be a little lazier * Add ChainTipPublisher class (to deal with type erasure in DI, etc.) * RxBitcoinClient: don't always wait for connection in initChainTipService(), actively connect if useZmq, lazily wait if not useZmq * TxOutSetService: require ChainTipPublisher in constructor, this allows the service to be passive/lazy depending upon the ChainTipPublisher implementation. * ChainTipPublishers: static method never() returns do-nothing Publisher --- .../jsonrpc/PollingChainTipServiceImpl.java | 5 +-- .../bitcoin/rx/jsonrpc/RxBitcoinClient.java | 8 +++-- .../rx/jsonrpc/TxOutSetWatcherSample.java | 2 +- .../rx/jsonrpc/service/TxOutSetService.java | 17 +++++++--- .../jsonrpc/test/TestChainTipPublishers.java | 16 +++++++++ .../rx/zeromq/RxBitcoinZmqService.java | 5 +-- .../bitcoin/rx/ChainTipPublisher.java | 34 +++++++++++++++++-- .../bitcoin/rx/ChainTipService.java | 2 +- 8 files changed, 72 insertions(+), 17 deletions(-) create mode 100644 cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/test/TestChainTipPublishers.java diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java index 8221ce068..d6aad9325 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java @@ -9,6 +9,7 @@ import org.consensusj.bitcoin.json.pojo.ChainTip; import org.consensusj.bitcoin.jsonrpc.BitcoinClient; import org.consensusj.bitcoin.jsonrpc.ChainTipClient; +import org.consensusj.bitcoin.rx.ChainTipPublisher; import org.consensusj.bitcoin.rx.ChainTipService; import org.consensusj.jsonrpc.JsonRpcStatusException; import org.consensusj.rx.jsonrpc.RxJsonRpcClient; @@ -63,9 +64,9 @@ public synchronized void start() { } @Override - public Publisher chainTipPublisher() { + public ChainTipPublisher chainTipPublisher() { start(); - return chainTipProcessor; + return ChainTipPublisher.of(chainTipProcessor); } /** diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/RxBitcoinClient.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/RxBitcoinClient.java index d4f2ae3a3..be343647a 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/RxBitcoinClient.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/RxBitcoinClient.java @@ -5,6 +5,7 @@ import org.consensusj.bitcoin.jsonrpc.BitcoinClient; import io.reactivex.rxjava3.core.Flowable; import org.consensusj.bitcoin.jsonrpc.BitcoinExtendedClient; +import org.consensusj.bitcoin.rx.ChainTipPublisher; import org.consensusj.bitcoin.rx.ChainTipService; import org.consensusj.bitcoin.rx.zeromq.RxBitcoinZmqService; import org.consensusj.jsonrpc.JsonRpcTransport; @@ -51,8 +52,9 @@ public RxBitcoinClient(SSLContext sslContext, Network network, URI server, Strin private void initChainTipService(Duration timeout) { if (chainTipService == null) { - this.waitForConnected().orTimeout(timeout.toSeconds(), TimeUnit.SECONDS).join(); if (useZmq) { + log.warn("(useZmq enabled) Initiating server connection (with timeout of {} seconds)", timeout.toSeconds()); + this.connectToServer(timeout).join(); chainTipService = new RxBitcoinZmqService(this); } else { chainTipService = new PollingChainTipServiceImpl(this); @@ -77,8 +79,8 @@ public Publisher pollOnNewBlockAsync(Supplier * @return a publisher of Chain Tips */ @Override - public Publisher chainTipPublisher() { + public ChainTipPublisher chainTipPublisher() { initChainTipService(Duration.ofMinutes(60)); - return Flowable.fromPublisher(chainTipService.chainTipPublisher()); + return chainTipService.chainTipPublisher(); } } diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/TxOutSetWatcherSample.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/TxOutSetWatcherSample.java index 15b0c3a32..deecd10a9 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/TxOutSetWatcherSample.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/TxOutSetWatcherSample.java @@ -24,7 +24,7 @@ public static void main(String[] args) throws InterruptedException { boolean useZmq = true; try ( RxBitcoinClient client = new RxBitcoinClient(network, rpcUri, rpcUser, rpcPassword, useZmq); - TxOutSetService txOutSetService = new TxOutSetService(client) ) { + TxOutSetService txOutSetService = new TxOutSetService(client, client.chainTipPublisher()) ) { client.connectToServer(Duration.ofMinutes(5)).join(); // Subscribe to ChainTips diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/service/TxOutSetService.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/service/TxOutSetService.java index 0e4eeb61c..bcbb44bc1 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/service/TxOutSetService.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/service/TxOutSetService.java @@ -7,11 +7,10 @@ import org.bitcoinj.base.Sha256Hash; import org.consensusj.bitcoin.json.pojo.ChainTip; import org.consensusj.bitcoin.json.pojo.TxOutSetInfo; +import org.consensusj.bitcoin.rx.ChainTipPublisher; import org.consensusj.bitcoin.rx.jsonrpc.RxBitcoinClient; import org.consensusj.jsonrpc.DefaultRpcClient; import org.consensusj.jsonrpc.AsyncSupport; -import org.consensusj.jsonrpc.DefaultRpcClient; -import org.consensusj.rx.jsonrpc.RxJsonRpcClient; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +30,7 @@ public class TxOutSetService implements Closeable { private static final Logger log = LoggerFactory.getLogger(TxOutSetService.class); private final RxBitcoinClient client; + private final ChainTipPublisher chainTipPublisher; private final FlowableProcessor txOutSetProcessor = BehaviorProcessor.create(); private final int CACHE_BLOCK_DEPTH = 1; private final int MAX_OUTSTANDING_CALLS = 2; @@ -38,15 +38,21 @@ public class TxOutSetService implements Closeable { // TODO: Combine lastCall with cache and with in-memory SPV blockchain private CompletableFuture lastCall; private final ConcurrentHashMap txOutSetCache = new ConcurrentHashMap<>(); - private Disposable txOutSetSubscription; + private volatile Disposable txOutSetSubscription; - public TxOutSetService(RxBitcoinClient client) { + // TODO: Use BitcoinClient not RxBitcoinClient + /** + * @param client Client for requesting {@link TxOutSetInfo} objects. + * @param chainTipPublisher Publisher telling us when new blocks are available. + */ + public TxOutSetService(RxBitcoinClient client, ChainTipPublisher chainTipPublisher) { this.client = client; + this.chainTipPublisher = chainTipPublisher; } private synchronized void start() { if (txOutSetSubscription == null) { - txOutSetSubscription = Flowable.fromPublisher(client.chainTipPublisher()) + txOutSetSubscription = Flowable.fromPublisher(chainTipPublisher) .doOnNext(this::onNewBlock) .flatMap(this::fetchCacheMaybe) .subscribe(txOutSetProcessor::onNext, txOutSetProcessor::onError, txOutSetProcessor::onComplete); @@ -76,6 +82,7 @@ private void onNewBlock(ChainTip tip) { (t) -> log.warn("Ignoring transient error: ", t) // Log if ignored ); + // TODO: Convert to using CompletableFuture to drop dependency on RxBitcoinClient and RxJava /** * Try to fetch a TxOutSetInfo from the cache or from the network. Returning an empty stream if a transient error occurs. * To use this you will typically call {@link Flowable#flatMap(io.reactivex.rxjava3.functions.Function)}. diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/test/TestChainTipPublishers.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/test/TestChainTipPublishers.java new file mode 100644 index 000000000..979401a1c --- /dev/null +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/test/TestChainTipPublishers.java @@ -0,0 +1,16 @@ +package org.consensusj.bitcoin.rx.jsonrpc.test; + +import io.reactivex.rxjava3.core.Flowable; +import org.consensusj.bitcoin.rx.ChainTipPublisher; + +/** + * Useful for testing. + */ +public class TestChainTipPublishers { + /** + * @return A {@link ChainTipPublisher} that never emits items and never closes. + */ + public static ChainTipPublisher never() { + return ChainTipPublisher.of(Flowable.never()); + } +} diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/zeromq/RxBitcoinZmqService.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/zeromq/RxBitcoinZmqService.java index 6699b34b6..8617da75a 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/zeromq/RxBitcoinZmqService.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/zeromq/RxBitcoinZmqService.java @@ -11,6 +11,7 @@ import org.bitcoinj.base.Sha256Hash; import org.bitcoinj.core.Transaction; import org.consensusj.bitcoin.jsonrpc.BitcoinClient; +import org.consensusj.bitcoin.rx.ChainTipPublisher; import org.consensusj.bitcoin.rx.ChainTipService; import org.consensusj.bitcoin.rx.jsonrpc.RxBitcoinClient; import org.consensusj.bitcoin.rx.RxBlockchainService; @@ -80,8 +81,8 @@ public Publisher blockHeightPublisher() { } @Override - public Publisher chainTipPublisher() { - return flowableChainTip; + public ChainTipPublisher chainTipPublisher() { + return ChainTipPublisher.of(flowableChainTip); } @Override diff --git a/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipPublisher.java b/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipPublisher.java index 5f3fe7703..6ca9d1a1b 100644 --- a/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipPublisher.java +++ b/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipPublisher.java @@ -1,5 +1,33 @@ -package org.consensusj.bitcoin.rx;/** - * +package org.consensusj.bitcoin.rx; + +import org.consensusj.bitcoin.json.pojo.ChainTip; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +/** + * Marker type for {@code Publisher}. In a future release this may use {@link java.util.concurrent.Flow.Publisher}. + * Because of type erasure in Java generics we need this to strongly type parameters that require a {@code Publisher}. */ -public class ChainTipPublisher { +public interface ChainTipPublisher extends Publisher { + /** + * Adapt a {@code Publisher publisher) { + return new Wrapper(publisher); + } + + class Wrapper implements ChainTipPublisher { + private final Publisher publisher; + + Wrapper(Publisher publisher) { + this.publisher = publisher; + } + + @Override + public void subscribe(Subscriber s) { + publisher.subscribe(s); + } + } } diff --git a/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipService.java b/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipService.java index 9c87b997b..3849bad05 100644 --- a/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipService.java +++ b/cj-btc-rx/src/main/java/org/consensusj/bitcoin/rx/ChainTipService.java @@ -12,5 +12,5 @@ public interface ChainTipService { * * @return A Publisher for the sequence */ - Publisher chainTipPublisher(); + ChainTipPublisher chainTipPublisher(); }