Skip to content

Commit

Permalink
SDP-1374 Integrate Wallet Address in Processing Disbursement Instruct…
Browse files Browse the repository at this point in the history
…ions (#453)
  • Loading branch information
marwen-abid authored Nov 5, 2024
1 parent bc1384f commit 52e0a4d
Show file tree
Hide file tree
Showing 11 changed files with 552 additions and 179 deletions.
77 changes: 67 additions & 10 deletions internal/data/disbursement_instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"slices"

"github.com/stellar/go/support/log"
"golang.org/x/exp/maps"

"github.com/stellar/stellar-disbursement-platform-backend/db"
Expand All @@ -17,6 +19,7 @@ type DisbursementInstruction struct {
Amount string `csv:"amount"`
VerificationValue string `csv:"verification"`
ExternalPaymentId string `csv:"paymentID"`
WalletAddress string `csv:"walletAddress"`
}

func (di *DisbursementInstruction) Contact() (string, error) {
Expand Down Expand Up @@ -63,7 +66,6 @@ var (
type DisbursementInstructionsOpts struct {
UserID string
Instructions []*DisbursementInstruction
ReceiverContactType ReceiverContactType
Disbursement *Disbursement
DisbursementUpdate *DisbursementUpdate
MaxNumberOfInstructions int
Expand All @@ -82,9 +84,10 @@ type DisbursementInstructionsOpts struct {
// | | | | | |--- If the verification value does not match and the verification is confirmed, return an error.
// | | | | | |--- If the verification value does not match and the verification is not confirmed, update the verification value.
// | | | | | |--- If the verification value matches, continue.
// | | |--- Check if the receiver wallet exists.
// | | |--- [!ReceiverContactType.IncludesWalletAddress] Check if the receiver wallet exists.
// | | | |--- If the receiver wallet does not exist, create one.
// | | | |--- If the receiver wallet exists and it's not REGISTERED, retry the invitation SMS.
// | | |--- [ReceiverContactType.IncludesWalletAddress] Register the supplied wallet address
// | | |--- Delete all previously existing payments tied to this disbursement.
// | | |--- Create all payments passed in the instructions.
func (di DisbursementInstructionModel) ProcessAll(ctx context.Context, opts DisbursementInstructionsOpts) error {
Expand All @@ -95,23 +98,30 @@ func (di DisbursementInstructionModel) ProcessAll(ctx context.Context, opts Disb
// We need all the following logic to be executed in one transaction.
return db.RunInTransaction(ctx, di.dbConnectionPool, nil, func(dbTx db.DBTransaction) error {
// Step 1: Fetch all receivers by contact information (phone, email, etc.) and create missing ones
receiversByIDMap, err := di.reconcileExistingReceiversWithInstructions(ctx, dbTx, opts.Instructions, opts.ReceiverContactType)
registrationContactType := opts.Disbursement.RegistrationContactType
receiversByIDMap, err := di.reconcileExistingReceiversWithInstructions(ctx, dbTx, opts.Instructions, registrationContactType.ReceiverContactType)
if err != nil {
return fmt.Errorf("processing receivers: %w", err)
}

// Step 2: Fetch all receiver verifications and create missing ones.
err = di.processReceiverVerifications(ctx, dbTx, receiversByIDMap, opts.Instructions, opts.Disbursement, opts.ReceiverContactType)
if err != nil {
return fmt.Errorf("processing receiver verifications: %w", err)
}

// Step 3: Fetch all receiver wallets and create missing ones
// Step 2: Fetch all receiver wallets and create missing ones
receiverIDToReceiverWalletIDMap, err := di.processReceiverWallets(ctx, dbTx, receiversByIDMap, opts.Disbursement)
if err != nil {
return fmt.Errorf("processing receiver wallets: %w", err)
}

// Step 3: Register supplied wallets or process receiver verifications based on the registration contact type
if registrationContactType.IncludesWalletAddress {
if err = di.registerSuppliedWallets(ctx, dbTx, opts.Instructions, receiversByIDMap, receiverIDToReceiverWalletIDMap); err != nil {
return fmt.Errorf("registering supplied wallets: %w", err)
}
} else {
err = di.processReceiverVerifications(ctx, dbTx, receiversByIDMap, opts.Instructions, opts.Disbursement, registrationContactType.ReceiverContactType)
if err != nil {
return fmt.Errorf("processing receiver verifications: %w", err)
}
}

// Step 4: Delete all pre-existing payments tied to this disbursement for each receiver in one call
if err = di.paymentModel.DeleteAllForDisbursement(ctx, dbTx, opts.Disbursement.ID); err != nil {
return fmt.Errorf("deleting payments: %w", err)
Expand All @@ -136,6 +146,53 @@ func (di DisbursementInstructionModel) ProcessAll(ctx context.Context, opts Disb
})
}

func (di DisbursementInstructionModel) registerSuppliedWallets(ctx context.Context, dbTx db.DBTransaction, instructions []*DisbursementInstruction, receiversByIDMap map[string]*Receiver, receiverIDToReceiverWalletIDMap map[string]string) error {
// Construct a map of receiverWalletID to receiverWallet
receiverWalletsByIDMap, err := di.getReceiverWalletsByIDMap(ctx, dbTx, maps.Values(receiverIDToReceiverWalletIDMap))
if err != nil {
return fmt.Errorf("building receiver wallets lookup map: %w", err)
}

// Mark receiver wallets as registered
for _, instruction := range instructions {
receiver := findReceiverByInstruction(receiversByIDMap, instruction)
if receiver == nil {
return fmt.Errorf("receiver not found for instruction with ID %s", instruction.ID)
}
receiverWalletID, exists := receiverIDToReceiverWalletIDMap[receiver.ID]
if !exists {
return fmt.Errorf("receiver wallet not found for receiver with ID %s", receiver.ID)
}
receiverWallet := receiverWalletsByIDMap[receiverWalletID]

if slices.Contains([]ReceiversWalletStatus{RegisteredReceiversWalletStatus, FlaggedReceiversWalletStatus}, receiverWallet.Status) {
log.Ctx(ctx).Infof("receiver wallet with ID %s is %s, skipping registration", receiverWallet.ID, receiverWallet.Status)
continue
}

receiverWalletUpdate := ReceiverWalletUpdate{
Status: RegisteredReceiversWalletStatus,
StellarAddress: instruction.WalletAddress,
}
if updateErr := di.receiverWalletModel.Update(ctx, receiverWalletID, receiverWalletUpdate, dbTx); updateErr != nil {
return fmt.Errorf("marking receiver wallet as registered: %w", updateErr)
}
}
return nil
}

func (di DisbursementInstructionModel) getReceiverWalletsByIDMap(ctx context.Context, dbTx db.DBTransaction, receiverWalletIDs []string) (map[string]ReceiverWallet, error) {
receiverWallets, err := di.receiverWalletModel.GetByIDs(ctx, dbTx, receiverWalletIDs...)
if err != nil {
return nil, fmt.Errorf("fetching receiver wallets: %w", err)
}
receiverWalletsByIDMap := make(map[string]ReceiverWallet, len(receiverWallets))
for _, receiverWallet := range receiverWallets {
receiverWalletsByIDMap[receiverWallet.ID] = receiverWallet
}
return receiverWalletsByIDMap, nil
}

// reconcileExistingReceiversWithInstructions fetches all existing receivers by their contact information and creates missing ones.
func (di DisbursementInstructionModel) reconcileExistingReceiversWithInstructions(ctx context.Context, dbTx db.DBTransaction, instructions []*DisbursementInstruction, contactType ReceiverContactType) (map[string]*Receiver, error) {
// Step 1: Fetch existing receivers
Expand Down
126 changes: 114 additions & 12 deletions internal/data/disbursement_instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Wallet: wallet,
})

emailDisbursement := CreateDraftDisbursementFixture(t, ctx, dbConnectionPool, &DisbursementModel{dbConnectionPool: dbConnectionPool}, Disbursement{
Name: "disbursement2",
Asset: asset,
Wallet: wallet,
RegistrationContactType: RegistrationContactTypeEmail,
})

di := NewDisbursementInstructionModel(dbConnectionPool)

smsInstruction1 := DisbursementInstruction{
Expand Down Expand Up @@ -91,13 +98,118 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
FileContent: CreateInstructionsFixture(t, smsInstructions),
}

knownWalletDisbursement := CreateDraftDisbursementFixture(t, ctx, dbConnectionPool, &DisbursementModel{dbConnectionPool: dbConnectionPool}, Disbursement{
Name: "disbursement with provided receiver wallets",
Asset: asset,
Wallet: wallet,
RegistrationContactType: RegistrationContactTypePhoneAndWalletAddress,
})

knownWalletDisbursementUpdate := func(instructions []*DisbursementInstruction) *DisbursementUpdate {
return &DisbursementUpdate{
ID: knownWalletDisbursement.ID,
FileName: "instructions.csv",
FileContent: CreateInstructionsFixture(t, instructions),
}
}

cleanup := func() {
DeleteAllPaymentsFixtures(t, ctx, dbConnectionPool)
DeleteAllReceiverVerificationFixtures(t, ctx, dbConnectionPool)
DeleteAllReceiverWalletsFixtures(t, ctx, dbConnectionPool)
DeleteAllReceiversFixtures(t, ctx, dbConnectionPool)
}

t.Run("failure - invalid wallet address for known wallet address instructions", func(t *testing.T) {
defer cleanup()

instructions := []*DisbursementInstruction{
{
WalletAddress: "GCVL44LFV3BFI627ABY3YRITFBRJVXUQVPLXQ3LISMI5UVKS5LHWTPT6",
Amount: "100.01",
ID: "1",
Phone: "+380-12-345-679",
},
}

err := di.ProcessAll(ctx, DisbursementInstructionsOpts{
UserID: "user-id",
Instructions: instructions,
Disbursement: knownWalletDisbursement,
DisbursementUpdate: knownWalletDisbursementUpdate(instructions),
MaxNumberOfInstructions: 10,
})
assert.ErrorContains(t, err, "validating receiver wallet update: invalid stellar address")
})

t.Run("success - known wallet address instructions", func(t *testing.T) {
defer cleanup()

instructions := []*DisbursementInstruction{
{
WalletAddress: "GCVL44LFV3BFI627ABY3YRITFBRJVXUQVPLXQ3LISMI5UVKS5LHWTPT7",
Amount: "100.01",
ID: "1",
Phone: "+380-12-345-671",
},
{
WalletAddress: "GC524YE6Z6ISMNLHWFYXQZRR5DTF2A75DYE5TE6G7UMZJ6KZRNVHPOQS",
Amount: "100.02",
ID: "2",
Phone: "+380-12-345-672",
},
}

update := knownWalletDisbursementUpdate(instructions)
err := di.ProcessAll(ctx, DisbursementInstructionsOpts{
UserID: "user-id",
Instructions: instructions,
Disbursement: knownWalletDisbursement,
DisbursementUpdate: update,
MaxNumberOfInstructions: 10,
})
require.NoError(t, err)

// Verify Receivers
receivers, err := di.receiverModel.GetByContacts(ctx, dbConnectionPool, instructions[0].Phone, instructions[1].Phone)
require.NoError(t, err)
assertEqualReceivers(t, []string{instructions[0].Phone, instructions[1].Phone}, []string{"1", "2"}, receivers)

// Verify Receiver Verifications
receiver1Verifications, err := di.receiverVerificationModel.GetAllByReceiverId(ctx, dbConnectionPool, receivers[0].ID)
require.NoError(t, err)
assert.Len(t, receiver1Verifications, 0)
receiver2Verifications, err := di.receiverVerificationModel.GetAllByReceiverId(ctx, dbConnectionPool, receivers[1].ID)
require.NoError(t, err)
assert.Len(t, receiver2Verifications, 0)

// Verify Receiver Wallets
receiverWallets, err := di.receiverWalletModel.GetWithReceiverIds(ctx, dbConnectionPool, []string{receivers[0].ID, receivers[1].ID})
require.NoError(t, err)
assert.Len(t, receiverWallets, 2)
for _, receiverWallet := range receiverWallets {
assert.Equal(t, wallet.ID, receiverWallet.Wallet.ID)
assert.Contains(t, []string{instructions[0].WalletAddress, instructions[1].WalletAddress}, receiverWallet.StellarAddress)
assert.Equal(t, RegisteredReceiversWalletStatus, receiverWallet.Status)
}

// Verify Payments
actualPayments := GetPaymentsByDisbursementID(t, ctx, dbConnectionPool, knownWalletDisbursement.ID)
assert.Len(t, actualPayments, 2)
assert.Contains(t, actualPayments, instructions[0].Amount)
assert.Contains(t, actualPayments, instructions[1].Amount)

actualExternalPaymentIDs := GetExternalPaymentIDsByDisbursementID(t, ctx, dbConnectionPool, knownWalletDisbursement.ID)
assert.Len(t, actualExternalPaymentIDs, 0)

// Verify Disbursement
actualDisbursement, err := di.disbursementModel.Get(ctx, dbConnectionPool, knownWalletDisbursement.ID)
require.NoError(t, err)
assert.Equal(t, ReadyDisbursementStatus, actualDisbursement.Status)
assert.Equal(t, update.FileContent, actualDisbursement.FileContent)
assert.Equal(t, update.FileName, actualDisbursement.FileName)
})

t.Run("success - sms instructions", func(t *testing.T) {
defer cleanup()

Expand All @@ -106,7 +218,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -152,9 +263,8 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
err := di.ProcessAll(ctx, DisbursementInstructionsOpts{
UserID: "user-id",
Instructions: emailInstructions,
Disbursement: disbursement,
Disbursement: emailDisbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeEmail,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand All @@ -176,9 +286,8 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
err := di.ProcessAll(ctx, DisbursementInstructionsOpts{
UserID: "user-id",
Instructions: smsInstructions,
Disbursement: disbursement,
Disbursement: emailDisbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeEmail,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.ErrorContains(t, err, "has no contact information for contact type EMAIL")
Expand All @@ -202,7 +311,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: emailAndSMSInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeEmail,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
errorMsg := "processing receivers: resolving contact information for instruction with ID %s: phone and email are both provided"
Expand All @@ -218,7 +326,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand All @@ -229,7 +336,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -298,7 +404,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: newInstructions,
Disbursement: readyDisbursement,
DisbursementUpdate: readyDisbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -342,7 +447,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: newInstructions,
Disbursement: readyDisbursement,
DisbursementUpdate: readyDisbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand Down Expand Up @@ -383,7 +487,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.NoError(t, err)
Expand All @@ -406,7 +509,6 @@ func Test_DisbursementInstructionModel_ProcessAll(t *testing.T) {
Instructions: smsInstructions,
Disbursement: disbursement,
DisbursementUpdate: disbursementUpdate,
ReceiverContactType: ReceiverContactTypeSMS,
MaxNumberOfInstructions: MaxInstructionsPerDisbursement,
})
require.Error(t, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/data/disbursements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,9 @@ func Test_DisbursementModel_Update(t *testing.T) {
})

disbursementFileContent := CreateInstructionsFixture(t, []*DisbursementInstruction{
{"1234567890", "", "1", "123.12", "1995-02-20", ""},
{"0987654321", "", "2", "321", "1974-07-19", ""},
{"0987654321", "", "3", "321", "1974-07-19", ""},
{Phone: "1234567890", ID: "1", Amount: "123.12", VerificationValue: "1995-02-20"},
{Phone: "0987654321", ID: "2", Amount: "321", VerificationValue: "1974-07-19"},
{Phone: "0987654321", ID: "3", Amount: "321", VerificationValue: "1974-07-19"},
})

t.Run("update instructions", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions internal/data/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@ func CreateDraftDisbursementFixture(t *testing.T, ctx context.Context, sqlExec d
insert.RegistrationContactType = RegistrationContactTypePhone
}

if utils.IsEmpty(insert.RegistrationContactType) {
insert.RegistrationContactType = RegistrationContactTypePhone
}

id, err := model.Insert(ctx, &insert)
require.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions internal/data/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func Test_Fixtures_CreateInstructionsFixture(t *testing.T) {

t.Run("writes records correctly", func(t *testing.T) {
instructions := []*DisbursementInstruction{
{"1234567890", "", "1", "123.12", "1995-02-20", ""},
{"0987654321", "", "2", "321", "1974-07-19", ""},
{Phone: "1234567890", ID: "1", Amount: "123.12", VerificationValue: "1995-02-20"},
{Phone: "0987654321", ID: "2", Amount: "321", VerificationValue: "1974-07-19"},
}
buf := CreateInstructionsFixture(t, instructions)
lines := strings.Split(string(buf), "\n")
Expand All @@ -117,9 +117,9 @@ func Test_Fixtures_UpdateDisbursementInstructionsFixture(t *testing.T) {
})

instructions := []*DisbursementInstruction{
{"1234567890", "", "1", "123.12", "1995-02-20", ""},
{"0987654321", "", "2", "321", "1974-07-19", ""},
{"0987654321", "", "3", "321", "1974-07-19", ""},
{Phone: "1234567890", ID: "1", Amount: "123.12", VerificationValue: "1995-02-20"},
{Phone: "0987654321", ID: "2", Amount: "321", VerificationValue: "1974-07-19"},
{Phone: "0987654321", ID: "3", Amount: "321", VerificationValue: "1974-07-19"},
}

t.Run("update instructions", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 52e0a4d

Please sign in to comment.