Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: support WaitWithCancel for unsuccessful WaitForBlock API calls #5814

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions daemon/algod/api/server/v2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"io"
"math"
"net/http"
Expand All @@ -31,6 +30,7 @@ import (
"time"

"github.com/labstack/echo/v4"
"golang.org/x/sync/semaphore"

"github.com/algorand/avm-abi/apps"
"github.com/algorand/go-codec/codec"
Expand Down Expand Up @@ -96,6 +96,7 @@ type LedgerForAPI interface {
LatestTotals() (basics.Round, ledgercore.AccountTotals, error)
BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error)
Wait(r basics.Round) chan struct{}
WaitWithCancel(r basics.Round) (chan struct{}, func())
GetCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error)
EncodedBlockCert(rnd basics.Round) (blk []byte, cert []byte, err error)
Block(rnd basics.Round) (blk bookkeeping.Block, err error)
Expand Down Expand Up @@ -940,11 +941,15 @@ func (v2 *Handlers) WaitForBlock(ctx echo.Context, round uint64) error {
}

// Wait
ledgerWaitCh, cancelLedgerWait := ledger.WaitWithCancel(basics.Round(round + 1))
defer cancelLedgerWait()
select {
case <-v2.Shutdown:
return internalError(ctx, err, errServiceShuttingDown, v2.Log)
case <-ctx.Request().Context().Done():
return ctx.NoContent(http.StatusRequestTimeout)
case <-time.After(WaitForBlockTimeout):
case <-ledger.Wait(basics.Round(round + 1)):
case <-ledgerWaitCh:
}

// Return status after the wait
Expand Down
6 changes: 5 additions & 1 deletion daemon/algod/api/server/v2/test/handlers_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package test
import (
"encoding/json"
"fmt"
"github.com/algorand/go-algorand/data/transactions/logic"
"net/http"
"net/http/httptest"
"testing"

"github.com/algorand/go-algorand/data/transactions/logic"

"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -135,6 +136,9 @@ func (l *mockLedger) BlockHdr(rnd basics.Round) (bookkeeping.BlockHeader, error)
func (l *mockLedger) Wait(r basics.Round) chan struct{} {
panic("not implemented")
}
func (l *mockLedger) WaitWithCancel(r basics.Round) (chan struct{}, func()) {
panic("not implemented")
}
func (l *mockLedger) GetCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (c basics.Address, ok bool, err error) {
panic("not implemented")
}
Expand Down
48 changes: 27 additions & 21 deletions ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package ledger

import (
"context"
"sync/atomic"

"github.com/algorand/go-deadlock"

Expand All @@ -28,29 +27,17 @@ import (
"github.com/algorand/go-algorand/ledger/store/trackerdb"
)

// notifier is a struct that encapsulates a single-shot channel; it will only be signaled once.
// notifier is a struct that encapsulates a single-shot channel; it should only be signaled once.
type notifier struct {
signal chan struct{}
notified *atomic.Uint32
}

// makeNotifier constructs a notifier that has not been signaled.
func makeNotifier() notifier {
return notifier{signal: make(chan struct{}), notified: &atomic.Uint32{}}
}

// notify signals the channel if it hasn't already done so
func (notifier *notifier) notify() {
if notifier.notified.CompareAndSwap(0, 1) {
close(notifier.signal)
}
signal chan struct{}
count int
}

// bulletin provides an easy way to wait on a round to be written to the ledger.
// To use it, call <-Wait(round).
type bulletin struct {
mu deadlock.Mutex
pendingNotificationRequests map[basics.Round]notifier
pendingNotificationRequests map[basics.Round]*notifier
latestRound basics.Round
}

Expand All @@ -62,7 +49,7 @@ type bulletinMem struct {

func makeBulletin() *bulletin {
b := new(bulletin)
b.pendingNotificationRequests = make(map[basics.Round]notifier)
b.pendingNotificationRequests = make(map[basics.Round]*notifier)
return b
}

Expand All @@ -80,14 +67,32 @@ func (b *bulletin) Wait(round basics.Round) chan struct{} {

signal, exists := b.pendingNotificationRequests[round]
if !exists {
signal = makeNotifier()
signal = &notifier{signal: make(chan struct{})}
b.pendingNotificationRequests[round] = signal
}
// Increment count of waiters, to support canceling.
signal.count++

return signal.signal
}

// CancelWait removes a wait for a particular round. If no one else is waiting, the
// notifier channel for that round is removed.
func (b *bulletin) CancelWait(round basics.Round) {
b.mu.Lock()
defer b.mu.Unlock()

signal, exists := b.pendingNotificationRequests[round]
if exists {
signal.count--
if signal.count <= 0 {
delete(b.pendingNotificationRequests, round)
}
}
}
jannotti marked this conversation as resolved.
Show resolved Hide resolved

func (b *bulletin) loadFromDisk(l ledgerForTracker, _ basics.Round) error {
b.pendingNotificationRequests = make(map[basics.Round]notifier)
b.pendingNotificationRequests = make(map[basics.Round]*notifier)
b.latestRound = l.Latest()
return nil
}
Expand All @@ -105,7 +110,8 @@ func (b *bulletin) notifyRound(rnd basics.Round) {
}

delete(b.pendingNotificationRequests, pending)
signal.notify()
// signal the channel by closing it; this is under lock and will only happen once
close(signal.signal)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

b.latestRound = rnd
Expand Down
108 changes: 108 additions & 0 deletions ledger/bulletin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"testing"
"time"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

const epsilon = 5 * time.Millisecond
Expand Down Expand Up @@ -100,3 +102,109 @@ func TestBulletin(t *testing.T) {
t.Errorf("<-Wait(10) finished late")
}
}

func TestCancelWait(t *testing.T) {
partitiontest.PartitionTest(t)

bul := makeBulletin()

// Calling Wait before CancelWait
waitCh := bul.Wait(5)
bul.CancelWait(5)
bul.committedUpTo(5)
select {
case <-waitCh:
t.Errorf("<-Wait(5) should have been cancelled")
case <-time.After(epsilon):
// Correct
}
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(5))

// Calling CancelWait before Wait
bul.CancelWait(6)
select {
case <-bul.Wait(6):
t.Errorf("<-Wait(6) should have been cancelled")
case <-time.After(epsilon):
// Correct
}
require.Contains(t, bul.pendingNotificationRequests, basics.Round(6))
require.Equal(t, bul.pendingNotificationRequests[basics.Round(6)].count, 1)
bul.CancelWait(6)
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(6))

// Two Waits, one cancelled
waitCh1 := bul.Wait(7)
waitCh2 := bul.Wait(7)
require.Equal(t, waitCh1, waitCh2)
bul.CancelWait(7)
select {
case <-waitCh1:
t.Errorf("<-Wait(7) should not be notified yet")
case <-time.After(epsilon):
// Correct
}
// Still one waiter
require.Contains(t, bul.pendingNotificationRequests, basics.Round(7))
require.Equal(t, bul.pendingNotificationRequests[basics.Round(7)].count, 1)

bul.committedUpTo(7)
select {
case <-waitCh1:
// Correct
case <-time.After(epsilon):
t.Errorf("<-Wait(7) should have been notified")
}
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(7))

// Wait followed by Cancel for a round that already completed
waitCh = bul.Wait(5)
bul.CancelWait(5)
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(5))
select {
case <-waitCh:
// Correct
case <-time.After(epsilon):
t.Errorf("<-Wait(5) should have been notified right away")
}

// Cancel Wait after Wait triggered
waitCh = bul.Wait(8)
require.Contains(t, bul.pendingNotificationRequests, basics.Round(8))
require.Equal(t, bul.pendingNotificationRequests[basics.Round(8)].count, 1)
bul.committedUpTo(8)
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(8))
select {
case <-waitCh:
// Correct
case <-time.After(epsilon):
t.Errorf("<-Wait(8) should have been notified")
}
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(8))
bul.CancelWait(8) // should do nothing

// Cancel Wait after Wait triggered, but before Wait returned
waitCh = bul.Wait(9)
require.Contains(t, bul.pendingNotificationRequests, basics.Round(9))
require.Equal(t, bul.pendingNotificationRequests[basics.Round(9)].count, 1)
bul.committedUpTo(9)
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(9))
bul.CancelWait(9) // should do nothing
select {
case <-waitCh:
// Correct
case <-time.After(epsilon):
t.Errorf("<-Wait(9) should have been notified")
}
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(9))

// Two waits, both cancelled
waitCh1 = bul.Wait(10)
waitCh2 = bul.Wait(10)
require.Equal(t, waitCh1, waitCh2)
bul.CancelWait(10)
require.Contains(t, bul.pendingNotificationRequests, basics.Round(10))
require.Equal(t, bul.pendingNotificationRequests[basics.Round(10)].count, 1)
bul.CancelWait(10)
require.NotContains(t, bul.pendingNotificationRequests, basics.Round(10))
}
10 changes: 10 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,16 @@ func (l *Ledger) Wait(r basics.Round) chan struct{} {
return l.bulletinDisk.Wait(r)
}

// WaitWithCancel returns a channel that closes once a given round is
// stored durably in the ledger. The returned function can be used to
// cancel the wait, which cleans up resources if no other Wait call is
// active for the same round.
func (l *Ledger) WaitWithCancel(r basics.Round) (chan struct{}, func()) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
return l.bulletinDisk.Wait(r), func() { l.bulletinDisk.CancelWait(r) }
}

// WaitMem returns a channel that closes once a given round is
// available in memory in the ledger, but might not be stored
// durably on disk yet.
Expand Down