Skip to content

Commit

Permalink
feat(SPV-1026): removed broadcast cronjob and updated transaction str…
Browse files Browse the repository at this point in the history
…ategies (#692)
  • Loading branch information
chris-4chain authored Sep 10, 2024
1 parent b8e53bc commit bf555b1
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 812 deletions.
2 changes: 0 additions & 2 deletions engine/action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
if syncTransaction, err = GetSyncTransactionByID(ctx, transaction.ID, c.DefaultModelOptions()...); err != nil {
return err
}
syncTransaction.BroadcastStatus = SyncStatusCanceled
syncTransaction.P2PStatus = SyncStatusCanceled
syncTransaction.SyncStatus = SyncStatusCanceled
if err = syncTransaction.Save(ctx); err != nil {
return err
Expand Down
12 changes: 0 additions & 12 deletions engine/action_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ func Test_RevertTransaction(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, uint64(100000), xpub.CurrentBalance) // 100000 was initial value

// check sync transaction was canceled
var syncTx *SyncTransaction
syncTx, err = GetSyncTransactionByID(ctx, transaction.ID, client.DefaultModelOptions()...)
require.NoError(t, err)
assert.Equal(t, SyncStatusCanceled, syncTx.BroadcastStatus)

// check utxos where reverted
var utxos []*Utxo
conditions := map[string]interface{}{
Expand Down Expand Up @@ -307,12 +301,6 @@ func initRevertTransactionData(t *testing.T, clientOpts ...ClientOps) (context.C
assert.Equal(t, transaction.ID, tx.ID)
assert.Equal(t, testXPubID, tx.XpubInIDs[0])

// check sync transaction
var syncTx *SyncTransaction
syncTx, err = GetSyncTransactionByID(ctx, transaction.ID, client.DefaultModelOptions()...)
require.NoError(t, err)
assert.Equal(t, SyncStatusReady, syncTx.BroadcastStatus)

var utxos []*Utxo
conditions := map[string]interface{}{
xPubIDField: transaction.XPubID,
Expand Down
5 changes: 0 additions & 5 deletions engine/cron_job_declarations.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func (c *Client) cronJobs() taskmanager.CronJobs {
60*time.Second,
taskCleanupDraftTransactions,
)
addJob(
CronJobNameSyncTransactionBroadcast,
2*time.Minute,
taskBroadcastTransactions,
)
addJob(
CronJobNameSyncTransactionSync,
5*time.Minute,
Expand Down
11 changes: 0 additions & 11 deletions engine/cron_job_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ func taskCleanupDraftTransactions(ctx context.Context, client *Client) error {
return nil
}

// taskBroadcastTransactions will broadcast any transactions
func taskBroadcastTransactions(ctx context.Context, client *Client) error {
client.Logger().Info().Msg("running broadcast transaction(s) task...")

err := processBroadcastTransactions(ctx, 1000, WithClient(client))
if err == nil || errors.Is(err, datastore.ErrNoResults) {
return nil
}
return err
}

// taskSyncTransactions will sync any transactions
func taskSyncTransactions(ctx context.Context, client *Client) error {
logClient := client.Logger()
Expand Down
1 change: 0 additions & 1 deletion engine/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ const (
metadataField = "metadata"
nextExternalNumField = "next_external_num"
nextInternalNumField = "next_internal_num"
p2pStatusField = "p2p_status"
satoshisField = "satoshis"
spendingTxIDField = "spending_tx_id"
statusField = "status"
Expand Down
38 changes: 20 additions & 18 deletions engine/model_sync_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"github.com/bitcoin-sv/spv-wallet/engine/datastore"
"github.com/bitcoin-sv/spv-wallet/engine/spverrors"
Expand Down Expand Up @@ -33,31 +34,17 @@ func newSyncTransaction(txID string, config *SyncConfig, opts ...ModelOps) *Sync
return nil
}

// Broadcasting
bs := SyncStatusReady
if !config.Broadcast {
bs = SyncStatusSkipped
}

// Notify Paymail P2P
ps := SyncStatusPending
if !config.PaymailP2P {
ps = SyncStatusSkipped
}

// Sync
ss := SyncStatusReady
if !config.SyncOnChain {
ss = SyncStatusSkipped
}

return &SyncTransaction{
BroadcastStatus: bs,
Configuration: *config,
ID: txID,
Model: *NewBaseModel(ModelSyncTransaction, opts...),
P2PStatus: ps,
SyncStatus: ss,
Configuration: *config,
ID: txID,
Model: *NewBaseModel(ModelSyncTransaction, opts...),
SyncStatus: ss,
}
}

Expand Down Expand Up @@ -139,3 +126,18 @@ func (m *SyncTransaction) Migrate(client datastore.ClientInterface) error {
err := client.IndexMetadata(client.GetTableName(tableSyncTransactions), metadataField)
return spverrors.Wrapf(err, "failed to index metadata column on model %s", m.GetModelName())
}

func (m *SyncTransaction) addSyncResult(ctx context.Context, action, provider, message string) {
m.Results.Results = append(m.Results.Results, &SyncResult{
Action: action,
ExecutedAt: time.Now().UTC(),
Provider: provider,
StatusMessage: message,
})

if m.IsNew() {
return // do not save if new record! caller should decide if want to save new record
}

_ = m.Save(ctx)
}
85 changes: 0 additions & 85 deletions engine/model_sync_transactions_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -24,90 +23,6 @@ func TestSyncTransaction_GetModelName(t *testing.T) {
})
}

func Test_areParentsBroadcast(t *testing.T) {
ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup())
defer deferMe()

opts := []ModelOps{WithClient(client)}

tx, err := txFromHex(testTxHex, append(opts, New())...)
require.NoError(t, err)

txErr := tx.Save(ctx)
require.NoError(t, txErr)

tx2, err := txFromHex(testTx2Hex, append(opts, New())...)
require.NoError(t, err)

txErr = tx2.Save(ctx)
require.NoError(t, txErr)

tx3, err := txFromHex(testTx3Hex, append(opts, New())...)
require.NoError(t, err)

txErr = tx3.Save(ctx)
require.NoError(t, txErr)

// input of testTxID
syncTx := newSyncTransaction("65bb8d2733298b2d3b441a871868d6323c5392facf0d3eced3a6c6a17dc84c10", &SyncConfig{SyncOnChain: false, Broadcast: false}, append(opts, New())...)
syncTx.BroadcastStatus = SyncStatusComplete
txErr = syncTx.Save(ctx)
require.NoError(t, txErr)

// input of testTxInID
syncTx = newSyncTransaction("89fbccca3a5e2bfc8a161bf7f54e8cb5898e296ae8c23b620b89ed570711f931", &SyncConfig{SyncOnChain: false, Broadcast: false}, append(opts, New())...)
txErr = syncTx.Save(ctx)
require.NoError(t, txErr)

type args struct {
tx *Transaction
opts []ModelOps
}
tests := []struct {
name string
args args
want bool
wantErr assert.ErrorAssertionFunc
}{
{
name: "no parents",
args: args{
tx: tx3,
opts: opts,
},
want: true,
wantErr: assert.NoError,
},
{
name: "parent not broadcast",
args: args{
tx: tx2,
opts: opts,
},
want: false,
wantErr: assert.NoError,
},
{
name: "parent broadcast",
args: args{
tx: tx,
opts: opts,
},
want: true,
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := _areParentsBroadcasted(ctx, tt.args.tx, tt.args.opts...)
if !tt.wantErr(t, err, fmt.Sprintf("areParentsBroadcast(%v, %v, %v)", ctx, tt.args.tx, tt.args.opts)) {
return
}
assert.Equalf(t, tt.want, got, "areParentsBroadcast(%v, %v, %v)", ctx, tt.args.tx, tt.args.opts)
})
}
}

func TestSyncTransaction_SaveHook(t *testing.T) {
t.Parallel()

Expand Down
1 change: 0 additions & 1 deletion engine/model_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var (
testTxID = "1b52eac9d1eb0adf3ce6a56dee1c4768780b8126e288aca65dd1db32f173b853"
testTxID2 = "104cc87da1c6a6d3ce3e0dcffa92533c32d66818871a443b2d8b2933278dbb65"
testTx2Hex = "020000000189fbccca3a5e2bfc8a161bf7f54e8cb5898e296ae8c23b620b89ed570711f931000000006a47304402204e94380ae4d27f8bb9b40dd9944b4fea532d5fe12cf62c1994a6a495c81490f202204aab42f8f1b15259a032e58a3810fbbfd691771b92317f8a12a0da84761a400641210382229c0295e4d63ee54c541eba40be2963f0e80489b7da34e022d513a723181fffffffff0259970400000000001976a914e069bd2e2fe3ea702c40d5e65b491b734c01686788ac00000000000000000c006a09446f7457616c6c657400000000"
testTx3Hex = "01000000012c1466b3f92c703033fd1d21c1ff3a8b4ab8ceb32debfa9f7c3b2eb21b97dabf010000006a47304402204212fbb123f339c75eb28d3d6254af4ff1b9aa7c163e8f6cbb187ed49e12aaa5022010713e5b4bac82aea2232529370c2b41d3f6d5bfae0e5fb5689d74a7d29b3e48412103f26186a9f5ee7efaaf614de6451f2ad67712e728d4e1ac705cc73550546817e7feffffff01d8aa191e000000001976a914010af176de3faac864f148461340be6a7bb9eff488ac00000000"
testTxInID = "9b0495704e23e4b3bef3682c6a5c40abccc32a3e6b7b01ae3295e93a9d3a0482"
testTxInScriptPubKey = "76a914e069bd2e2fe3ea702c40d5e65b491b734c01686788ac"
testTxScriptPubKey1 = "76a91413473d21dc9e1fb392f05a028b447b165a052d4d88ac"
Expand Down
9 changes: 0 additions & 9 deletions engine/paymail_service_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (p *PaymailDefaultServiceProvider) GetPaymailByAlias(
alias, domain string,
_ *server.RequestMetadata,
) (*paymail.AddressInformation, error) {

pm, err := getPaymailAddress(ctx, alias+"@"+domain, p.client.DefaultModelOptions()...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -152,12 +151,6 @@ func (p *PaymailDefaultServiceProvider) RecordTransaction(ctx context.Context,
return nil, err //nolint:wrapcheck // returns our internal errors
}

rts.ForceBroadcast(true)

if p2pTx.Beef != "" {
rts.FailOnBroadcastError(true)
}

transaction, err := recordTransaction(ctx, p.client, rts, WithMetadatas(metadata))
if err != nil {
return nil, err
Expand Down Expand Up @@ -398,8 +391,6 @@ func saveBeefTransactionInput(ctx context.Context, c ClientInterface, input *bee
inputTx.Client().DefaultSyncConfig(),
inputTx.GetOptions(true)...,
)
sync.BroadcastStatus = SyncStatusSkipped
sync.P2PStatus = SyncStatusSkipped
sync.SyncStatus = SyncStatusReady

if bump != nil {
Expand Down
101 changes: 101 additions & 0 deletions engine/process_p2p_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package engine

import (
"context"
"fmt"
"time"

"github.com/bitcoin-sv/go-paymail"
)

// processP2PTransaction will process the sync transaction record, or save the failure
func processP2PTransaction(ctx context.Context, tx *Transaction) error {
// Successfully capture any panics, convert to readable string and log the error
defer recoverAndLog(tx.Client().Logger())

syncTx := tx.syncTransaction
// Create the lock and set the release for after the function completes
unlock, err := newWriteLock(
ctx, fmt.Sprintf(lockKeyProcessP2PTx, syncTx.GetID()), syncTx.Client().Cachestore(),
)
defer unlock()
if err != nil {
return err
}

// No draft?
if len(tx.DraftID) == 0 {
syncTx.addSyncResult(ctx, syncActionP2P, "all", "no draft found, cannot complete p2p")

return nil // TODO: why nil here??
}

// Notify any P2P paymail providers associated to the transaction
var results []*SyncResult
if results, err = _notifyPaymailProviders(ctx, tx); err != nil {
syncTx.addSyncResult(ctx, syncActionP2P, "", err.Error())
return err
}

// Update if we have some results
if len(results) > 0 {
syncTx.Results.Results = append(syncTx.Results.Results, results...)
}

// Update sync status to be ready now
if syncTx.SyncStatus == SyncStatusPending {
syncTx.SyncStatus = SyncStatusReady
}

if err = syncTx.Save(ctx); err != nil {
syncTx.addSyncResult(ctx, syncActionP2P, "internal", err.Error())
return err
}

// Done!
return nil
}

// _notifyPaymailProviders will notify any associated Paymail providers
func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*SyncResult, error) {
pm := transaction.Client().PaymailClient()
outputs := transaction.draftTransaction.Configuration.Outputs

notifiedReceivers := make([]string, 0)
results := make([]*SyncResult, len(outputs))

var payload *paymail.P2PTransactionPayload
var err error

for _, out := range outputs {
p4 := out.PaymailP4

if p4 == nil || p4.ResolutionType != ResolutionTypeP2P {
continue
}

receiver := fmt.Sprintf("%s@%s", p4.Alias, p4.Domain)
if contains(notifiedReceivers, func(x string) bool { return x == receiver }) {
continue // no need to send the same transaction to the same receiver second time
}

if payload, err = finalizeP2PTransaction(
ctx,
pm,
p4,
transaction,
); err != nil {
return nil, err
}

notifiedReceivers = append(notifiedReceivers, receiver)
results = append(results, &SyncResult{
Action: syncActionP2P,
ExecutedAt: time.Now().UTC(),
Provider: p4.ReceiveEndpoint,
StatusMessage: "success: " + payload.TxID,
})

}
return results, nil
}
Loading

0 comments on commit bf555b1

Please sign in to comment.