diff --git a/example/bundle_test.go b/example/bundle_test.go index a222b59..74fa113 100644 --- a/example/bundle_test.go +++ b/example/bundle_test.go @@ -1,6 +1,7 @@ package example import ( + "context" "github.com/everFinance/everpay-go/sdk" "github.com/everFinance/goar" "github.com/everFinance/goar/types" @@ -70,7 +71,8 @@ func TestBundleToArweave(t *testing.T) { // send to arweave wal, err := goar.NewWalletFromPath("jwkKey.json", "https://arweave.net") - tx, err := wal.SendBundleTx(bundle.BundleBinary, []types.Tag{ + assert.NoError(t, err) + tx, err := wal.SendBundleTx(context.TODO(), 0, bundle.BundleBinary, []types.Tag{ {Name: "App", Value: "goar"}, }) assert.NoError(t, err) diff --git a/go.mod b/go.mod index c06756f..8600534 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/panjf2000/ants/v2 v2.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/crypto v0.3.0 // indirect golang.org/x/sys v0.2.0 // indirect diff --git a/types/const.go b/types/const.go index 91c7078..886bb1e 100644 --- a/types/const.go +++ b/types/const.go @@ -11,6 +11,9 @@ const ( NOTE_SIZE = 32 HASH_SIZE = 32 + // concurrent submit chunks min size + DEFAULT_CHUNK_CONCURRENT_NUM = 50 // default concurrent number + // number of bits in a big.Word WordBits = 32 << (uint64(^big.Word(0)) >> 63) // number of bytes in a big.Word diff --git a/uploader.go b/uploader.go index 6d96d1d..a6bba66 100644 --- a/uploader.go +++ b/uploader.go @@ -1,11 +1,14 @@ package goar import ( + "context" "errors" "fmt" + "github.com/panjf2000/ants/v2" "math" "math/rand" "strconv" + "sync" "time" "github.com/everFinance/goar/types" @@ -148,6 +151,80 @@ func (tt *TransactionUploader) PctComplete() float64 { return math.Trunc(fval * 100) } +func (tt *TransactionUploader) ConcurrentOnce(ctx context.Context, concurrentNum int) error { + // post tx info + if err := tt.postTransaction(); err != nil { + return err + } + + if tt.IsComplete() { + return nil + } + + var wg sync.WaitGroup + if concurrentNum <= 0 { + concurrentNum = types.DEFAULT_CHUNK_CONCURRENT_NUM + } + p, _ := ants.NewPoolWithFunc(concurrentNum, func(i interface{}) { + defer wg.Done() + // process submit chunk + idx := i.(int) + + select { + case <-ctx.Done(): + log.Warn("ctx.done", "chunkIdx", idx) + return + default: + } + chunk, err := utils.GetChunk(*tt.Transaction, idx, tt.Data) + if err != nil { + log.Error("GetChunk error", "err", err, "idx", idx) + return + } + body, statusCode, err := tt.Client.SubmitChunks(chunk) // always body is errMsg + if statusCode == 200 { + return + } + + log.Error("concurrent submitChunk failed", "chunkIdx", idx, "statusCode", statusCode, "gatewayErr", body, "httpErr", err) + // try again + retryCount := 0 + for { + select { + case <-ctx.Done(): + log.Warn("ctx.done", "chunkIdx", idx) + return + default: + } + + retryCount++ + if statusCode == 429 { + time.Sleep(1 * time.Second) + } else { + time.Sleep(200 * time.Millisecond) + } + + body, statusCode, err = tt.Client.SubmitChunks(chunk) + if statusCode == 200 { + return + } + log.Warn("retry submitChunk failed", "retryCount", retryCount, "chunkIdx", idx, "statusCode", statusCode, "gatewayErr", body, "httpErr", err) + } + }) + + defer p.Release() + for i := 0; i < len(tt.Transaction.Chunks.Chunks); i++ { + wg.Add(1) + if err := p.Invoke(i); err != nil { + log.Error("p.Invoke(i)", "err", err, "i", i) + return err + } + } + + wg.Wait() + return nil +} + /** * Uploads the next part of the Transaction. * On the first call this posts the Transaction diff --git a/wallet.go b/wallet.go index ac23d51..67f1ab6 100644 --- a/wallet.go +++ b/wallet.go @@ -1,6 +1,7 @@ package goar import ( + "context" "errors" "fmt" "io/ioutil" @@ -101,19 +102,28 @@ func (w *Wallet) SendDataSpeedUp(data []byte, tags []types.Tag, speedFactor int6 return w.SendTransaction(tx) } -// SendTransaction: if send success, should return pending -func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, error) { - anchor, err := w.Client.GetTransactionAnchor() +func (w *Wallet) SendDataConcurrentSpeedUp(ctx context.Context, concurrentNum int, data []byte, tags []types.Tag, speedFactor int64) (types.Transaction, error) { + reward, err := w.Client.GetTransactionPrice(data, nil) if err != nil { return types.Transaction{}, err } - tx.LastTx = anchor - tx.Owner = w.Owner() - if err = w.Signer.SignTx(tx); err != nil { - return types.Transaction{}, err + + tx := &types.Transaction{ + Format: 2, + Target: "", + Quantity: "0", + Tags: utils.TagsEncode(tags), + Data: utils.Base64Encode(data), + DataSize: fmt.Sprintf("%d", len(data)), + Reward: fmt.Sprintf("%d", reward*(100+speedFactor)/100), } - uploader, err := CreateUploader(w.Client, tx, nil) + return w.SendTransactionConcurrent(ctx, concurrentNum, tx) +} + +// SendTransaction: if send success, should return pending +func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, error) { + uploader, err := w.getUploader(tx) if err != nil { return types.Transaction{}, err } @@ -121,6 +131,28 @@ func (w *Wallet) SendTransaction(tx *types.Transaction) (types.Transaction, erro return *tx, err } +func (w *Wallet) SendTransactionConcurrent(ctx context.Context, concurrentNum int, tx *types.Transaction) (types.Transaction, error) { + uploader, err := w.getUploader(tx) + if err != nil { + return types.Transaction{}, err + } + err = uploader.ConcurrentOnce(ctx, concurrentNum) + return *tx, err +} + +func (w *Wallet) getUploader(tx *types.Transaction) (*TransactionUploader, error) { + anchor, err := w.Client.GetTransactionAnchor() + if err != nil { + return nil, err + } + tx.LastTx = anchor + tx.Owner = w.Owner() + if err = w.Signer.SignTx(tx); err != nil { + return nil, err + } + return CreateUploader(w.Client, tx, nil) +} + func (w *Wallet) SendPst(contractId string, target string, qty *big.Int, customTags []types.Tag, speedFactor int64) (types.Transaction, error) { maxQty := big.NewInt(9007199254740991) // swc support max js integer if qty.Cmp(maxQty) > 0 { diff --git a/wallet_bundle.go b/wallet_bundle.go index c3b9fe2..b0c9ceb 100644 --- a/wallet_bundle.go +++ b/wallet_bundle.go @@ -1,11 +1,12 @@ package goar import ( + "context" "errors" "github.com/everFinance/goar/types" ) -func (w *Wallet) SendBundleTxSpeedUp(bundleBinary []byte, tags []types.Tag, txSpeed int64) (types.Transaction, error) { +func (w *Wallet) SendBundleTxSpeedUp(ctx context.Context, concurrentNum int, bundleBinary []byte, tags []types.Tag, txSpeed int64) (types.Transaction, error) { bundleTags := []types.Tag{ {Name: "Bundle-Format", Value: "binary"}, {Name: "Bundle-Version", Value: "2.0.0"}, @@ -23,9 +24,9 @@ func (w *Wallet) SendBundleTxSpeedUp(bundleBinary []byte, tags []types.Tag, txSp txTags := make([]types.Tag, 0) txTags = append(bundleTags, tags...) - return w.SendDataSpeedUp(bundleBinary, txTags, txSpeed) + return w.SendDataConcurrentSpeedUp(ctx, concurrentNum, bundleBinary, txTags, txSpeed) } -func (w *Wallet) SendBundleTx(bundleBinary []byte, tags []types.Tag) (types.Transaction, error) { - return w.SendBundleTxSpeedUp(bundleBinary, tags, 0) +func (w *Wallet) SendBundleTx(ctx context.Context, concurrentNum int, bundleBinary []byte, tags []types.Tag) (types.Transaction, error) { + return w.SendBundleTxSpeedUp(ctx, concurrentNum, bundleBinary, tags, 0) }