Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[payment] prune reservation periods and ondemand payments #994

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 39 additions & 39 deletions api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Accountant struct {

// local accounting
// contains 3 bins; circular wrapping of indices
binRecords []BinRecord
usageLock sync.Mutex
cumulativePayment *big.Int
reservationPeriodRecords []ReservationPeriodRecord
usageLock sync.Mutex
cumulativePayment *big.Int

// number of bins in the circular accounting, restricted by minNumBins which is 3
// number of bins in the circular accounting, restricted by MinNumPeriods which is 3
numBins uint32
}

type BinRecord struct {
type ReservationPeriodRecord struct {
Index uint32
Usage uint64
}
Expand All @@ -43,20 +43,20 @@ func NewAccountant(accountID string, reservation *core.ReservedPayment, onDemand
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
// There's a subsequent PR that handles populating the accountant with on-chain state from the disperser
binRecords := make([]BinRecord, numBins)
for i := range binRecords {
binRecords[i] = BinRecord{Index: uint32(i), Usage: 0}
reservationPeriodRecords := make([]ReservationPeriodRecord, numBins)
for i := range reservationPeriodRecords {
reservationPeriodRecords[i] = ReservationPeriodRecord{Index: uint32(i), Usage: 0}
}
a := Accountant{
accountID: accountID,
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerSymbol: pricePerSymbol,
minNumSymbols: minNumSymbols,
binRecords: binRecords,
cumulativePayment: big.NewInt(0),
numBins: max(numBins, uint32(meterer.MinNumBins)),
accountID: accountID,
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerSymbol: pricePerSymbol,
minNumSymbols: minNumSymbols,
reservationPeriodRecords: reservationPeriodRecords,
cumulativePayment: big.NewInt(0),
numBins: max(numBins, uint32(meterer.MinNumPeriods)),
}
// TODO: add a routine to refresh the on-chain state occasionally?
return &a
Expand All @@ -67,39 +67,39 @@ func NewAccountant(accountID string, reservation *core.ReservedPayment, onDemand
// then on-demand if the reservation is not available. The returned values are
// reservation period for reservation payments and cumulative payment for on-demand payments,
// and both fields are used to create the payment header and signature
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quorumNumbers []uint8) (uint32, *big.Int, error) {
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
now := time.Now().Unix()
currentReservationPeriod := meterer.GetReservationPeriod(uint64(now), a.reservationWindow)
symbolUsage := uint64(a.SymbolsCharged(numSymbols))
symbolUsage := uint64(a.SymbolsCharged(uint32(numSymbols)))

a.usageLock.Lock()
defer a.usageLock.Unlock()
relativeBinRecord := a.GetRelativeBinRecord(currentReservationPeriod)
relativeBinRecord.Usage += symbolUsage
relativeReservationPeriodRecord := a.GetRelativeReservationPeriodRecord(currentReservationPeriod)
relativeReservationPeriodRecord.Usage += symbolUsage

// first attempt to use the active reservation
binLimit := a.reservation.SymbolsPerSecond * uint64(a.reservationWindow)
if relativeBinRecord.Usage <= binLimit {
if relativeReservationPeriodRecord.Usage <= binLimit {
if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
return 0, big.NewInt(0), err
}
return currentReservationPeriod, big.NewInt(0), nil
}

overflowBinRecord := a.GetRelativeBinRecord(currentReservationPeriod + 2)
overflowReservationPeriodRecord := a.GetRelativeReservationPeriodRecord(currentReservationPeriod + 2)
// Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit
if overflowBinRecord.Usage == 0 && relativeBinRecord.Usage-symbolUsage < binLimit && symbolUsage <= binLimit {
overflowBinRecord.Usage += relativeBinRecord.Usage - binLimit
if overflowReservationPeriodRecord.Usage == 0 && relativeReservationPeriodRecord.Usage-symbolUsage < binLimit && symbolUsage <= binLimit {
overflowReservationPeriodRecord.Usage += relativeReservationPeriodRecord.Usage - binLimit
if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
return 0, big.NewInt(0), err
}
return currentReservationPeriod, big.NewInt(0), nil
}

// reservation not available, rollback reservation records, attempt on-demand
//todo: rollback on-demand if disperser respond with some type of rejection?
relativeBinRecord.Usage -= symbolUsage
incrementRequired := big.NewInt(int64(a.PaymentCharged(numSymbols)))
// reservation not available, attempt on-demand
//todo: rollback later if disperser respond with some type of rejection?
relativeReservationPeriodRecord.Usage -= symbolUsage
incrementRequired := big.NewInt(int64(a.PaymentCharged(uint32(numSymbols))))
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
if err := QuorumCheck(quorumNumbers, requiredQuorums); err != nil {
Expand All @@ -111,7 +111,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo
}

// AccountBlob accountant provides and records payment information
func (a *Accountant) AccountBlob(ctx context.Context, numSymbols uint32, quorums []uint8, salt uint32) (*core.PaymentMetadata, error) {
func (a *Accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8, salt uint32) (*core.PaymentMetadata, error) {
reservationPeriod, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums)
if err != nil {
return nil, err
Expand Down Expand Up @@ -143,16 +143,16 @@ func (a *Accountant) SymbolsCharged(numSymbols uint32) uint32 {
return uint32(core.RoundUpDivide(uint(numSymbols), uint(a.minNumSymbols))) * a.minNumSymbols
}

func (a *Accountant) GetRelativeBinRecord(index uint32) *BinRecord {
func (a *Accountant) GetRelativeReservationPeriodRecord(index uint32) *ReservationPeriodRecord {
relativeIndex := index % a.numBins
if a.binRecords[relativeIndex].Index != uint32(index) {
a.binRecords[relativeIndex] = BinRecord{
if a.reservationPeriodRecords[relativeIndex].Index != uint32(index) {
a.reservationPeriodRecords[relativeIndex] = ReservationPeriodRecord{
Index: uint32(index),
Usage: 0,
}
}

return &a.binRecords[relativeIndex]
return &a.reservationPeriodRecords[relativeIndex]
}

// SetPaymentState sets the accountant's state from the disperser's response
Expand Down Expand Up @@ -214,18 +214,18 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState
}
}

binRecords := make([]BinRecord, len(paymentState.GetBinRecords()))
for i, record := range paymentState.GetBinRecords() {
reservationPeriodRecords := make([]ReservationPeriodRecord, len(paymentState.GetReservationPeriodRecords()))
for i, record := range paymentState.GetReservationPeriodRecords() {
if record == nil {
binRecords[i] = BinRecord{Index: 0, Usage: 0}
reservationPeriodRecords[i] = ReservationPeriodRecord{Index: 0, Usage: 0}
} else {
binRecords[i] = BinRecord{
reservationPeriodRecords[i] = ReservationPeriodRecord{
Index: record.Index,
Usage: record.Usage,
}
}
}
a.binRecords = binRecords
a.reservationPeriodRecords = reservationPeriodRecords
return nil
}

Expand Down
40 changes: 20 additions & 20 deletions api/clients/v2/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestNewAccountant(t *testing.T) {
assert.Equal(t, reservationWindow, accountant.reservationWindow)
assert.Equal(t, pricePerSymbol, accountant.pricePerSymbol)
assert.Equal(t, minNumSymbols, accountant.minNumSymbols)
assert.Equal(t, []BinRecord{{Index: 0, Usage: 0}, {Index: 1, Usage: 0}, {Index: 2, Usage: 0}}, accountant.binRecords)
assert.Equal(t, []ReservationPeriodRecord{{Index: 0, Usage: 0}, {Index: 1, Usage: 0}, {Index: 2, Usage: 0}}, accountant.reservationPeriodRecords)
assert.Equal(t, big.NewInt(0), accountant.cumulativePayment)
}

Expand All @@ -68,24 +68,24 @@ func TestAccountBlob_Reservation(t *testing.T) {
accountant := NewAccountant(accountId, reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, numBins)

ctx := context.Background()
symbolLength := uint32(500)
symbolLength := uint64(500)
quorums := []uint8{0, 1}

header, err := accountant.AccountBlob(ctx, symbolLength, quorums, salt)

assert.NoError(t, err)
assert.Equal(t, meterer.GetReservationPeriod(uint64(time.Now().Unix()), reservationWindow), header.ReservationPeriod)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

symbolLength = uint32(700)
symbolLength = uint64(700)

header, err = accountant.AccountBlob(ctx, symbolLength, quorums, salt)

assert.NoError(t, err)
assert.NotEqual(t, 0, header.ReservationPeriod)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

// Second call should use on-demand payment
header, err = accountant.AccountBlob(ctx, 300, quorums, salt)
Expand Down Expand Up @@ -116,16 +116,16 @@ func TestAccountBlob_OnDemand(t *testing.T) {
accountant := NewAccountant(accountId, reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, numBins)

ctx := context.Background()
numSymbols := uint32(1500)
numSymbols := uint64(1500)
quorums := []uint8{0, 1}

header, err := accountant.AccountBlob(ctx, numSymbols, quorums, salt)
assert.NoError(t, err)

expectedPayment := big.NewInt(int64(numSymbols * pricePerSymbol))
expectedPayment := big.NewInt(int64(numSymbols * uint64(pricePerSymbol)))
assert.Equal(t, uint32(0), header.ReservationPeriod)
assert.Equal(t, expectedPayment, header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)
assert.Equal(t, expectedPayment, accountant.cumulativePayment)
}

Expand All @@ -144,7 +144,7 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) {
accountant := NewAccountant(accountId, reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, numBins)

ctx := context.Background()
numSymbols := uint32(2000)
numSymbols := uint64(2000)
quorums := []uint8{0, 1}

_, err = accountant.AccountBlob(ctx, numSymbols, quorums, salt)
Expand Down Expand Up @@ -225,20 +225,20 @@ func TestAccountBlob_BinRotation(t *testing.T) {
// First call
_, err = accountant.AccountBlob(ctx, 800, quorums, salt)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

// next reservation duration
time.Sleep(1000 * time.Millisecond)

// Second call
_, err = accountant.AccountBlob(ctx, 300, quorums, salt)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{800, 300, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{800, 300, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

// Third call
_, err = accountant.AccountBlob(ctx, 500, quorums, salt)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{800, 800, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{800, 800, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)
}

func TestConcurrentBinRotationAndAccountBlob(t *testing.T) {
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) {
wg.Wait()

// Check final state
usages := mapRecordUsage(accountant.binRecords)
usages := mapRecordUsage(accountant.reservationPeriodRecords)
assert.Equal(t, uint64(1000), usages[0]+usages[1]+usages[2])
}

Expand Down Expand Up @@ -313,22 +313,22 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
assert.Equal(t, salt, header.Salt)
assert.Equal(t, meterer.GetReservationPeriod(uint64(now), reservationWindow), header.ReservationPeriod)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

// Second call: Allow one overflow
header, err = accountant.AccountBlob(ctx, 500, quorums, salt+1)
assert.NoError(t, err)
assert.Equal(t, salt+1, header.Salt)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

// Third call: Should use on-demand payment
header, err = accountant.AccountBlob(ctx, 200, quorums, salt+2)
assert.NoError(t, err)
assert.Equal(t, salt+2, header.Salt)
assert.Equal(t, uint32(0), header.ReservationPeriod)
assert.Equal(t, big.NewInt(200), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.reservationPeriodRecords)), true)
}

func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
Expand Down Expand Up @@ -357,12 +357,12 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
// full reservation
_, err = accountant.AccountBlob(ctx, 1000, quorums, salt)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)

// no overflow
header, err := accountant.AccountBlob(ctx, 500, quorums, salt)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Wait for next reservation duration
Expand All @@ -371,7 +371,7 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
// Third call: Should use new bin and allow overflow again
_, err = accountant.AccountBlob(ctx, 500, quorums, salt)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 500, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, isRotation([]uint64{1000, 500, 0}, mapRecordUsage(accountant.reservationPeriodRecords)), true)
}

func TestQuorumCheck(t *testing.T) {
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestQuorumCheck(t *testing.T) {
}
}

func mapRecordUsage(records []BinRecord) []uint64 {
func mapRecordUsage(records []ReservationPeriodRecord) []uint64 {
return []uint64{records[0].Usage, records[1].Usage, records[2].Usage}
}

Expand Down
2 changes: 1 addition & 1 deletion api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *disperserClient) DisperseBlob(
}

symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, uint32(symbolLength), quorums, salt)
payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums, salt)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}
Expand Down
Loading
Loading