Skip to content

Commit

Permalink
SDP-1374 Integrate Wallet Address in Processing Disbursement Instruct…
Browse files Browse the repository at this point in the history
…ions

# Conflicts:
#	internal/data/disbursements.go
#	internal/serve/httphandler/disbursement_handler.go
  • Loading branch information
marwen-abid committed Nov 4, 2024
1 parent bc1384f commit d1fb675
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 114 deletions.
74 changes: 65 additions & 9 deletions internal/data/disbursement_instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"slices"

"github.com/stellar/go/support/log"
"golang.org/x/exp/maps"

"github.com/stellar/stellar-disbursement-platform-backend/db"
Expand All @@ -17,6 +19,7 @@ type DisbursementInstruction struct {
Amount string `csv:"amount"`
VerificationValue string `csv:"verification"`
ExternalPaymentId string `csv:"paymentID"`
WalletAddress string `csv:"walletAddress"`
}

func (di *DisbursementInstruction) Contact() (string, error) {
Expand Down Expand Up @@ -63,7 +66,6 @@ var (
type DisbursementInstructionsOpts struct {
UserID string
Instructions []*DisbursementInstruction
ReceiverContactType ReceiverContactType
Disbursement *Disbursement
DisbursementUpdate *DisbursementUpdate
MaxNumberOfInstructions int
Expand Down Expand Up @@ -95,23 +97,30 @@ func (di DisbursementInstructionModel) ProcessAll(ctx context.Context, opts Disb
// We need all the following logic to be executed in one transaction.
return db.RunInTransaction(ctx, di.dbConnectionPool, nil, func(dbTx db.DBTransaction) error {
// Step 1: Fetch all receivers by contact information (phone, email, etc.) and create missing ones
receiversByIDMap, err := di.reconcileExistingReceiversWithInstructions(ctx, dbTx, opts.Instructions, opts.ReceiverContactType)
registrationContactType := opts.Disbursement.RegistrationContactType
receiversByIDMap, err := di.reconcileExistingReceiversWithInstructions(ctx, dbTx, opts.Instructions, registrationContactType.ReceiverContactType)
if err != nil {
return fmt.Errorf("processing receivers: %w", err)
}

// Step 2: Fetch all receiver verifications and create missing ones.
err = di.processReceiverVerifications(ctx, dbTx, receiversByIDMap, opts.Instructions, opts.Disbursement, opts.ReceiverContactType)
if err != nil {
return fmt.Errorf("processing receiver verifications: %w", err)
}

// Step 3: Fetch all receiver wallets and create missing ones
// Step 2: Fetch all receiver wallets and create missing ones
receiverIDToReceiverWalletIDMap, err := di.processReceiverWallets(ctx, dbTx, receiversByIDMap, opts.Disbursement)
if err != nil {
return fmt.Errorf("processing receiver wallets: %w", err)
}

// Step 3: Register supplied wallets or process receiver verifications based on the registration contact type
if registrationContactType.IncludesWalletAddress {
if err = di.registerSuppliedWallets(ctx, dbTx, opts.Instructions, receiversByIDMap, receiverIDToReceiverWalletIDMap); err != nil {
return fmt.Errorf("registering supplied wallets: %w", err)
}
} else {
err = di.processReceiverVerifications(ctx, dbTx, receiversByIDMap, opts.Instructions, opts.Disbursement, registrationContactType.ReceiverContactType)
if err != nil {
return fmt.Errorf("processing receiver verifications: %w", err)
}
}

// Step 4: Delete all pre-existing payments tied to this disbursement for each receiver in one call
if err = di.paymentModel.DeleteAllForDisbursement(ctx, dbTx, opts.Disbursement.ID); err != nil {
return fmt.Errorf("deleting payments: %w", err)
Expand All @@ -136,6 +145,53 @@ func (di DisbursementInstructionModel) ProcessAll(ctx context.Context, opts Disb
})
}

func (di DisbursementInstructionModel) registerSuppliedWallets(ctx context.Context, dbTx db.DBTransaction, instructions []*DisbursementInstruction, receiversByIDMap map[string]*Receiver, receiverIDToReceiverWalletIDMap map[string]string) error {
// Construct a map of receiverWalletID to receiverWallet
receiverWalletsByIDMap, err := di.getReceiverWalletsByIDMap(ctx, dbTx, maps.Values(receiverIDToReceiverWalletIDMap))
if err != nil {
return fmt.Errorf("building receiver wallets lookup map: %w", err)
}

// Mark receiver wallets as registered
for _, instruction := range instructions {
receiver := findReceiverByInstruction(receiversByIDMap, instruction)
if receiver == nil {
return fmt.Errorf("receiver not found for instruction with ID %s", instruction.ID)
}
receiverWalletID, exists := receiverIDToReceiverWalletIDMap[receiver.ID]
if !exists {
return fmt.Errorf("receiver wallet not found for receiver with ID %s", receiver.ID)
}
receiverWallet := receiverWalletsByIDMap[receiverWalletID]

if slices.Contains([]ReceiversWalletStatus{RegisteredReceiversWalletStatus, FlaggedReceiversWalletStatus}, receiverWallet.Status) {
log.Ctx(ctx).Infof("receiver wallet with ID %s is %s, skipping registration", receiverWallet.ID, receiverWallet.Status)
continue
}

receiverWalletUpdate := ReceiverWalletUpdate{
Status: RegisteredReceiversWalletStatus,
StellarAddress: instruction.WalletAddress,
}
if updateErr := di.receiverWalletModel.Update(ctx, receiverWalletID, receiverWalletUpdate, dbTx); updateErr != nil {
return fmt.Errorf("marking receiver wallet as registered: %w", updateErr)
}
}
return nil
}

func (di DisbursementInstructionModel) getReceiverWalletsByIDMap(ctx context.Context, dbTx db.DBTransaction, receiverWalletIDs []string) (map[string]ReceiverWallet, error) {
receiverWallets, err := di.receiverWalletModel.GetByIDs(ctx, dbTx, receiverWalletIDs...)
if err != nil {
return nil, fmt.Errorf("fetching receiver wallets: %w", err)
}
receiverWalletsByIDMap := make(map[string]ReceiverWallet, len(receiverWallets))
for _, receiverWallet := range receiverWallets {
receiverWalletsByIDMap[receiverWallet.ID] = receiverWallet
}
return receiverWalletsByIDMap, nil
}

// reconcileExistingReceiversWithInstructions fetches all existing receivers by their contact information and creates missing ones.
func (di DisbursementInstructionModel) reconcileExistingReceiversWithInstructions(ctx context.Context, dbTx db.DBTransaction, instructions []*DisbursementInstruction, contactType ReceiverContactType) (map[string]*Receiver, error) {
// Step 1: Fetch existing receivers
Expand Down
22 changes: 10 additions & 12 deletions internal/data/disbursement_instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Wallet: wallet,
})

emailDisbursement := CreateDraftDisbursementFixture(t, ctx, dbConnectionPool, &DisbursementModel{dbConnectionPool: dbConnectionPool}, Disbursement{
Name: "disbursement2",
Asset: asset,
Country: country,

Check failure on line 38 in internal/data/disbursement_instructions_test.go

View workflow job for this annotation

GitHub Actions / check

unknown field Country in struct literal of type Disbursement

Check failure on line 38 in internal/data/disbursement_instructions_test.go

View workflow job for this annotation

GitHub Actions / check

undefined: country (typecheck)

Check failure on line 38 in internal/data/disbursement_instructions_test.go

View workflow job for this annotation

GitHub Actions / test

unknown field Country in struct literal of type Disbursement

Check failure on line 38 in internal/data/disbursement_instructions_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: country

Check failure on line 38 in internal/data/disbursement_instructions_test.go

View workflow job for this annotation

GitHub Actions / test

unknown field Country in struct literal of type Disbursement

Check failure on line 38 in internal/data/disbursement_instructions_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: country
Wallet: wallet,
RegistrationContactType: RegistrationContactTypeEmail,
})

di := NewDisbursementInstructionModel(dbConnectionPool)

smsInstruction1 := DisbursementInstruction{
Expand Down Expand Up @@ -106,7 +114,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -152,9 +159,8 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
err := di.ProcessAll(ctx, DisbursementInstructionsOpts{
UserID: "user-id",
Instructions: emailInstructions,
Disbursement: disbursement,
Disbursement: emailDisbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeEmail,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand All @@ -176,9 +182,8 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
err := di.ProcessAll(ctx, DisbursementInstructionsOpts{
UserID: "user-id",
Instructions: smsInstructions,
Disbursement: disbursement,
Disbursement: emailDisbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeEmail,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.ErrorContains(t, err, "has no contact information for contact type EMAIL")
Expand All @@ -202,7 +207,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: emailAndSMSInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeEmail,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
errorMsg := "processing receivers: resolving contact information for instruction with ID %s: phone and email are both provided"
Expand All @@ -218,7 +222,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand All @@ -229,7 +232,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -298,7 +300,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: newInstructions,
Disbursement: readyDisbursement,
DisbursementUpdate: readyDisbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -342,7 +343,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: newInstructions,
Disbursement: readyDisbursement,
DisbursementUpdate: readyDisbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -383,7 +383,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand All @@ -406,7 +405,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.Error(t, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/data/disbursements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,9 @@ func Test_DisbursementModel_Update(t *testing.T) {
})

disbursementFileContent := CreateInstructionsFixture(t, []*DisbursementInstruction{
{"1234567890", "", "1", "123.12", "1995-02-20", ""},
{"0987654321", "", "2", "321", "1974-07-19", ""},
{"0987654321", "", "3", "321", "1974-07-19", ""},
{Phone: "1234567890", ID: "1", Amount: "123.12", VerificationValue: "1995-02-20"},
{Phone: "0987654321", ID: "2", Amount: "321", VerificationValue: "1974-07-19"},
{Phone: "0987654321", ID: "3", Amount: "321", VerificationValue: "1974-07-19"},
})

t.Run("update instructions", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions internal/data/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@ func CreateDraftDisbursementFixture(t *testing.T, ctx context.Context, sqlExec d
insert.RegistrationContactType = RegistrationContactTypePhone
}

if utils.IsEmpty(insert.RegistrationContactType) {
insert.RegistrationContactType = RegistrationContactTypePhone
}

id, err := model.Insert(ctx, &insert)
require.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions internal/data/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func Test_Fixtures_CreateInstructionsFixture(t *testing.T) {

t.Run("writes records correctly", func(t *testing.T) {
instructions := []*DisbursementInstruction{
{"1234567890", "", "1", "123.12", "1995-02-20", ""},
{"0987654321", "", "2", "321", "1974-07-19", ""},
{Phone: "1234567890", ID: "1", Amount: "123.12", VerificationValue: "1995-02-20"},
{Phone: "0987654321", ID: "2", Amount: "321", VerificationValue: "1974-07-19"},
}
buf := CreateInstructionsFixture(t, instructions)
lines := strings.Split(string(buf), "\n")
Expand All @@ -117,9 +117,9 @@ func Test_Fixtures_UpdateDisbursementInstructionsFixture(t *testing.T) {
})

instructions := []*DisbursementInstruction{
{"1234567890", "", "1", "123.12", "1995-02-20", ""},
{"0987654321", "", "2", "321", "1974-07-19", ""},
{"0987654321", "", "3", "321", "1974-07-19", ""},
{Phone: "1234567890", ID: "1", Amount: "123.12", VerificationValue: "1995-02-20"},
{Phone: "0987654321", ID: "2", Amount: "321", VerificationValue: "1974-07-19"},
{Phone: "0987654321", ID: "3", Amount: "321", VerificationValue: "1974-07-19"},
}

t.Run("update instructions", func(t *testing.T) {
Expand Down
86 changes: 41 additions & 45 deletions internal/data/receivers_wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,44 @@ func (rw *ReceiverWalletModel) GetWithReceiverIds(ctx context.Context, sqlExec d
return receiverWallets, nil
}

const selectReceiverWalletQuery = `
SELECT
rw.id,
rw.receiver_id as "receiver.id",
rw.status,
COALESCE(rw.anchor_platform_transaction_id, '') as anchor_platform_transaction_id,
COALESCE(rw.stellar_address, '') as stellar_address,
COALESCE(rw.stellar_memo, '') as stellar_memo,
COALESCE(rw.stellar_memo_type, '') as stellar_memo_type,
COALESCE(rw.otp, '') as otp,
rw.otp_created_at,
rw.otp_confirmed_at,
COALESCE(rw.otp_confirmed_with, '') as otp_confirmed_with,
w.id as "wallet.id",
w.name as "wallet.name",
w.sep_10_client_domain as "wallet.sep_10_client_domain"
FROM
receiver_wallets rw
JOIN
wallets w ON rw.wallet_id = w.id
`

// GetByIDs returns a receiver wallet by IDs
func (rw *ReceiverWalletModel) GetByIDs(ctx context.Context, sqlExec db.SQLExecuter, ids ...string) ([]ReceiverWallet, error) {
if len(ids) == 0 {
return nil, fmt.Errorf("no receiver wallet IDs provided")
}

query := fmt.Sprintf("%s WHERE rw.id = ANY($1)", selectReceiverWalletQuery)

receiverWallets := make([]ReceiverWallet, 0)
err := sqlExec.SelectContext(ctx, &receiverWallets, query, pq.Array(ids))
if err != nil {
return nil, fmt.Errorf("querying receiver wallet: %w", err)
}
return receiverWallets, nil
}

// GetByReceiverIDsAndWalletID returns a list of receiver wallets by receiver IDs and wallet ID.
func (rw *ReceiverWalletModel) GetByReceiverIDsAndWalletID(ctx context.Context, sqlExec db.SQLExecuter, receiverIds []string, walletId string) ([]*ReceiverWallet, error) {
receiverWallets := []*ReceiverWallet{}
Expand Down Expand Up @@ -350,33 +388,9 @@ func (rw *ReceiverWalletModel) Insert(ctx context.Context, sqlExec db.SQLExecute

// GetByReceiverIDAndWalletDomain returns a receiver wallet that match the receiver ID and wallet domain.
func (rw *ReceiverWalletModel) GetByReceiverIDAndWalletDomain(ctx context.Context, receiverId string, walletDomain string, sqlExec db.SQLExecuter) (*ReceiverWallet, error) {
var receiverWallet ReceiverWallet
query := `
SELECT
rw.id,
rw.receiver_id as "receiver.id",
rw.status,
COALESCE(rw.anchor_platform_transaction_id, '') as anchor_platform_transaction_id,
COALESCE(rw.stellar_address, '') as stellar_address,
COALESCE(rw.stellar_memo, '') as stellar_memo,
COALESCE(rw.stellar_memo_type, '') as stellar_memo_type,
COALESCE(rw.otp, '') as otp,
rw.otp_created_at,
rw.otp_confirmed_at,
COALESCE(rw.otp_confirmed_with, '') as otp_confirmed_with,
w.id as "wallet.id",
w.name as "wallet.name",
w.sep_10_client_domain as "wallet.sep_10_client_domain"
FROM
receiver_wallets rw
JOIN
wallets w ON rw.wallet_id = w.id
WHERE
rw.receiver_id = $1
AND
w.sep_10_client_domain = $2
`
query := fmt.Sprintf("%s %s", selectReceiverWalletQuery, "WHERE rw.receiver_id = $1 AND w.sep_10_client_domain = $2")

var receiverWallet ReceiverWallet
err := sqlExec.GetContext(ctx, &receiverWallet, query, receiverId, walletDomain)
if err != nil {
return nil, fmt.Errorf("error querying receiver wallet: %w", err)
Expand Down Expand Up @@ -448,25 +462,7 @@ func (rw *ReceiverWalletModel) UpdateStatusByDisbursementID(ctx context.Context,
func (rw *ReceiverWalletModel) GetByStellarAccountAndMemo(ctx context.Context, stellarAccount, stellarMemo, clientDomain string) (*ReceiverWallet, error) {
// build query
var receiverWallets ReceiverWallet
query := `
SELECT
rw.id,
rw.receiver_id as "receiver.id",
rw.status,
COALESCE(rw.anchor_platform_transaction_id, '') as anchor_platform_transaction_id,
COALESCE(rw.stellar_address, '') as stellar_address,
COALESCE(rw.stellar_memo, '') as stellar_memo,
COALESCE(rw.stellar_memo_type, '') as stellar_memo_type,
COALESCE(rw.otp, '') as otp,
rw.otp_created_at,
COALESCE(rw.otp_confirmed_with, '') as otp_confirmed_with,
w.id as "wallet.id",
w.name as "wallet.name",
w.homepage as "wallet.homepage"
FROM receiver_wallets rw
JOIN wallets w ON rw.wallet_id = w.id
WHERE rw.stellar_address = ?
`
query := fmt.Sprintf("%s %s", selectReceiverWalletQuery, "WHERE rw.stellar_address = ?")

// append memo to query if it is not empty
args := []interface{}{stellarAccount}
Expand Down
Loading

0 comments on commit d1fb675

Please sign in to comment.