diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index 984272a417d6..75a1c67c1cdd 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -464,19 +464,11 @@ async def hold_invoice_callback(self, payment_hash: bytes) -> None: if key in self.swaps: swap = self.swaps[key] if swap.funding_txid is None: - password = self.wallet.get_unlocked_password() - for batch_rbf in [False]: - # FIXME: tx batching is disabled, because extra logic is needed to handle - # the case where the base tx gets mined. - tx = self.create_funding_tx(swap, None, password=password, batch_rbf=batch_rbf) - self.logger.info(f'adding funding_tx {tx.txid()}') - self.wallet.adb.add_transaction(tx) - try: - await self.broadcast_funding_tx(swap, tx) - except TxBroadcastError: - self.wallet.adb.remove_transaction(tx.txid()) - continue - break + output = self.create_funding_output(swap) + self.wallet.add_batch_payment(output) + swap.funding_txid = True + else: + self.logger.info(f'key not in swaps {key}') def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None): """ server method """ @@ -773,6 +765,9 @@ async def callback(payment_hash): await asyncio.sleep(0.1) return swap.funding_txid + def create_funding_output(self, swap): + return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount) + def create_funding_tx( self, swap: SwapData, @@ -785,7 +780,7 @@ def create_funding_tx( # note: rbf must not decrease payment # this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output if tx is None: - funding_output = PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount) + funding_output = self.create_funding_output(swap) tx = self.wallet.create_transaction( outputs=[funding_output], rbf=True, diff --git a/electrum/wallet.py b/electrum/wallet.py index 43eae2a390a8..a35aab5dc1e5 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -59,6 +59,7 @@ WalletFileException, BitcoinException, InvalidPassword, format_time, timestamp_to_datetime, Satoshis, Fiat, bfh, TxMinedInfo, quantize_feerate, OrderedDictWithIndex) +from .util import log_exceptions from .simple_config import SimpleConfig, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE from .bitcoin import COIN, TYPE_ADDRESS from .bitcoin import is_address, address_to_script, is_minikey, relayfee, dust_threshold @@ -88,6 +89,7 @@ from .util import EventListener, event_listener from . import descriptor from .descriptor import Descriptor +from .network import TxBroadcastError if TYPE_CHECKING: from .network import Network @@ -459,6 +461,7 @@ async def main_loop(self): async with self.taskgroup as group: await group.spawn(asyncio.Event().wait) # run forever (until cancel) await group.spawn(self.do_synchronize_loop()) + await group.spawn(self.manage_batch_payments()) except Exception as e: self.logger.exception("taskgroup died.") finally: @@ -1813,21 +1816,26 @@ def make_unsigned_transaction( self, *, coins: Sequence[PartialTxInput], outputs: List[PartialTxOutput], + inputs: Optional[List[PartialTxInput]] = None, fee=None, change_addr: str = None, is_sweep: bool = False, # used by Wallet_2fa subclass rbf: bool = True, - batch_rbf: Optional[bool] = None, + base_tx = None, send_change_to_lightning: Optional[bool] = None, ) -> PartialTransaction: """Can raise NotEnoughFunds or NoDynamicFeeEstimates.""" + assert base_tx is None or inputs is None + if not coins: # any bitcoin tx must have at least 1 input by consensus raise NotEnoughFunds() if any([c.already_has_some_signatures() for c in coins]): raise Exception("Some inputs already contain signatures!") - if batch_rbf is None: - batch_rbf = self.config.WALLET_BATCH_RBF + if inputs is None: + inputs = [] + if base_tx is None and self.config.WALLET_BATCH_RBF: + base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins) if send_change_to_lightning is None: send_change_to_lightning = self.config.WALLET_SEND_CHANGE_TO_LIGHTNING @@ -1866,7 +1874,6 @@ def make_unsigned_transaction( # Let the coin chooser select the coins to spend coin_chooser = coinchooser.get_coin_chooser(self.config) # If there is an unconfirmed RBF tx, merge with it - base_tx = self.get_unconfirmed_base_tx_for_batching(outputs, coins) if batch_rbf else None if base_tx: # make sure we don't try to spend change from the tx-to-be-replaced: coins = [c for c in coins if c.prevout.txid.hex() != base_tx.txid()] @@ -1888,7 +1895,7 @@ def fee_estimator(size: Union[int, float, Decimal]) -> int: old_change_addrs = [o.address for o in base_tx.outputs() if self.is_change(o.address)] rbf_merge_txid = base_tx.txid() else: - txi = [] + txi = list(inputs) txo = list(outputs) old_change_addrs = [] # change address. if empty, coin_chooser will set it @@ -3065,7 +3072,8 @@ def create_transaction( password=None, locktime=None, tx_version: Optional[int] = None, - batch_rbf: Optional[bool] = None, + base_tx=None, + inputs=None, send_change_to_lightning: Optional[bool] = None, nonlocal_only: bool = False, ) -> PartialTransaction: @@ -3082,10 +3090,11 @@ def create_transaction( fee_estimator = fee tx = self.make_unsigned_transaction( coins=coins, + inputs=inputs, outputs=outputs, fee=fee_estimator, change_addr=change_addr, - batch_rbf=batch_rbf, + base_tx=base_tx, send_change_to_lightning=send_change_to_lightning, rbf=rbf, ) @@ -3292,6 +3301,115 @@ def unlock(self, password): def get_unlocked_password(self): return self._password_in_memory + def add_batch_payment(self, output: 'PartialTxOutput'): + # todo: raise InsufficientFunds if needed + self.batch_payments.append(output) + + def find_confirmed_base_tx(self): + for tx in self.batch_txs: + tx_mined_status = self.adb.get_tx_height(tx.txid()) + if tx_mined_status.conf > 0: + return tx + + @log_exceptions + async def manage_batch_payments(self): + # batch rbf, and add it to adb before we broadcast it + # TODO: we should keep track of the transactions that have been replaced (base_tx) + # if a replaced transaction gets mined, we should ensure the payment is broadcast in a new tx + # + # output1 : tx1(o1) ----- + # \ + # output 2: tx1'(o1,o2) ---> tx2(tx1|o2) ----- + # \ \ + # output 3: tx1''(o1,o2,o3) --> tx2'(tx1|o2,o3) ---> tx3(tx2|o3) + # tx3(tx1'|o3) (if tx1'cannot be replaced) + # + # self.batch_txs = [tx1, tx1', tx1''] + # + # if tx1 gets mined: + # - use its output, batch all remaining payments: tx2(mined, o2,o3) + # + # if tx1' gets mined: tx3(mined, o3) + # + # what if we cannot RBF? -> we must add a child tx + # if cannot_rbf(tx1) -> broadcast tx2(tx1,o2) and remove first row: neww base is now tx2(tx,o2) + # if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3) + # + # that's the same strategy as if it was mined + # + # + # TODO: make this reorg-safe. + # + # + # TODO: persist batch_payments and batch_txs in wallet file. + # Note that it is probably fine not to persist them, but it is dangerous + # to persist one and not the other, as it might result in a double send. + self.batch_payments = [] # list of payments we need to make + self.batch_txs = [] # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined. + + + while True: + await asyncio.sleep(1) + password = self.get_unlocked_password() + if self.has_keystore_encryption() and not password: + continue + tx = self.find_confirmed_base_tx() + if tx: + # one of the batch_txs has been confirmed + # find which outputs still need to be paid + to_pay = [x for x in self.batch_payments if x not in tx.outputs()] + self.logger.info(f'base tx confirmed. to_pay={to_pay}') + if to_pay: + await self.create_new_base_tx(tx, to_pay, password) + else: + self.batch_txs = [] + self.batch_payments = [] + else: + base_tx = self.batch_txs[-1] if self.batch_txs else None + base_tx_outputs = base_tx.outputs() if base_tx else [] + # check if all payments are in that tx + to_pay = [o for o in self.batch_payments if o not in base_tx_outputs] + if not to_pay: + continue + self.logger.info(f'manage_batch_payments {to_pay}') + tx = self.create_transaction( + outputs=to_pay, + rbf=True, + password=password, + base_tx=base_tx, + ) + try: + #self.adb.add_transaction(tx) + await self.network.broadcast_transaction(tx) + self.batch_txs.append(tx) + except TxBroadcastError: + # base_tx is not replaceable, probabaly because it has children + #self.adb.remove_transaction(tx.txid()) + await self.create_new_base_tx(base_tx, to_pay, password) + + async def create_new_base_tx(self, tx, to_pay, password): + inputs = [] + for o in tx.get_change_outputs(): + coins = self.adb.get_addr_utxo(o.address) + inputs += list(coins.values()) + self.logger.info(f'create_new_base_tx: inputs={inputs} outputs={to_pay}') + tx2 = self.create_transaction( + inputs=inputs, + outputs=to_pay, + password=password, + ) + #self.adb.add_transaction(tx2) + try: + await self.network.broadcast_transaction(tx2) + except TxBroadcastError: + # we will retry later, because we have not changed batch_payments + self.logger.info(f'create_new_base_tx: failed to broadcast') + return + + self.logger.info(f'create_new_base_tx: success {tx2.txid()}') + self.batch_txs = [tx2] + self.batch_payments = to_pay # this removes payments in the old tx + class Simple_Wallet(Abstract_Wallet): # wallet with a single keystore