Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(store): synchronize Store writes #241

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 87 additions & 69 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -45,14 +45,13 @@ type Store[H header.Header[H]] struct {

// writing to datastore
//
// queue of headers to be written
writes chan []H
writesMu sync.Mutex
// writesPending keeps headers pending to be written in one batch
writesPending *batch[H]
// queue of batches to be written
writesCh chan *batch[H]
// signals when writes are finished
writesDn chan struct{}
// writeHead maintains the current write head
writeHead atomic.Pointer[H]
// pending keeps headers pending to be written in one batch
pending *batch[H]

Params Parameters
}
Expand Down Expand Up @@ -99,15 +98,15 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
}

return &Store[H]{
ds: wrappedStore,
cache: cache,
metrics: metrics,
heightIndex: index,
heightSub: newHeightSub[H](),
writes: make(chan []H, 16),
writesDn: make(chan struct{}),
pending: newBatch[H](params.WriteBatchSize),
Params: params,
ds: wrappedStore,
cache: cache,
metrics: metrics,
heightIndex: index,
heightSub: newHeightSub[H](),
writesCh: make(chan *batch[H], 4),
writesDn: make(chan struct{}),
writesPending: newBatch[H](params.WriteBatchSize),
Params: params,
}, nil
}

Expand All @@ -126,31 +125,41 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error {
return nil
}

// Start starts or restarts the Store.
func (s *Store[H]) Start(context.Context) error {
// closed s.writesDn means that store was stopped before, recreate chan.
select {
case <-s.writesDn:
s.writesCh = make(chan *batch[H], 4)
s.writesDn = make(chan struct{})
s.writesPending = newBatch[H](s.Params.WriteBatchSize)
default:
}

go s.flushLoop()
return nil
}

// Stop stops the store and cleans up resources.
// Canceling context while stopping may leave the store in an inconsistent state.
func (s *Store[H]) Stop(ctx context.Context) error {
s.writesMu.Lock()
defer s.writesMu.Unlock()
// check if store was already stopped
select {
case <-s.writesDn:
return errStoppedStore
default:
}
// signal to prevent further writes to Store
// write the pending leftover
select {
case s.writes <- nil:
case s.writesCh <- s.writesPending:
// signal closing to flushLoop
close(s.writesCh)
case <-ctx.Done():
return ctx.Err()
}
// wait till it is done writing
// wait till flushLoop is done writing
select {
case <-s.writesDn:
case <-ctx.Done():
Expand Down Expand Up @@ -193,7 +202,7 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
return v, nil
}
// check if the requested header is not yet written on disk
if h := s.pending.Get(hash); !h.IsZero() {
if h := s.writesPending.Get(hash); !h.IsZero() {
return h, nil
}

Expand Down Expand Up @@ -227,7 +236,8 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
// which means the requested 'height' should be present
//
// check if the requested header is not yet written on disk
if h := s.pending.GetByHeight(height); !h.IsZero() {
// TODO: Synchronize with prepareWrite?
if h := s.writesPending.GetByHeight(height); !h.IsZero() {
return h, nil
}

Expand Down Expand Up @@ -287,7 +297,7 @@ func (s *Store[H]) Has(ctx context.Context, hash header.Hash) (bool, error) {
return ok, nil
}
// check if the requested header is not yet written on disk
if ok := s.pending.Has(hash); ok {
if ok := s.writesPending.Has(hash); ok {
return ok, nil
}

Expand All @@ -304,23 +314,15 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
return nil
}

var err error
// take current write head to verify headers against
var head H
headPtr := s.writeHead.Load()
if headPtr == nil {
head, err = s.Head(ctx)
if err != nil {
return err
}
} else {
head = *headPtr
head, err := s.Head(ctx)
if err != nil {
return err
}

// collect valid headers
verified := make([]H, 0, lh)
for i, h := range headers {

err = head.Verify(h)
if err != nil {
var verErr *header.VerifyError
Expand All @@ -344,27 +346,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
head = h
}

onWrite := func() {
newHead := verified[len(verified)-1]
s.writeHead.Store(&newHead)
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
// prepare headers to be written
toWrite, err := s.prepareWrite(verified)
switch {
case err != nil:
return err
case toWrite == nil:
return nil
}

// queue headers to be written on disk
select {
case s.writes <- verified:
case s.writesCh <- toWrite:
// we return an error here after writing,
// as there might be an invalid header in between of a given range
onWrite()
return err
default:
s.metrics.writesQueueBlocked(ctx)
}
// if the writes queue is full, we block until it is not
// if the writesCh queue is full - we block anyway
select {
case s.writes <- verified:
onWrite()
case s.writesCh <- toWrite:
return err
case <-s.writesDn:
return errStoppedStore
Expand All @@ -373,28 +375,50 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
}
}

func (s *Store[H]) prepareWrite(headers []H) (*batch[H], error) {
s.writesMu.Lock()
defer s.writesMu.Unlock()
// check if store was stopped
select {
case <-s.writesDn:
return nil, errStoppedStore
default:
}

// keep verified headers as pending writes and ensure they are accessible for reads
s.writesPending.Append(headers...)
// notify waiters if any
// it is important to do Pub after updating pending
// so pending is consistent with atomic Height counter on the heightSub
s.heightSub.Pub(headers...)

// TODO: Head advancing
// announce our new head
newHead := headers[len(headers)-1]
s.metrics.newHead(newHead.Height())
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())

// don't flush and continue if pending write batch is not grown enough,
if s.writesPending.Len() < s.Params.WriteBatchSize {
return nil, nil
}

toWrite := s.writesPending
s.writesPending = newBatch[H](s.Params.WriteBatchSize)
return toWrite, nil
}

// flushLoop performs writing task to the underlying datastore in a separate routine
// This way writes are controlled and manageable from one place allowing
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
// (2) Batching header writes
// This way writesCh are controlled and manageable from one place allowing
// (1) Appends not to be blocked on long disk IO writesCh and underlying DB compactions
// (2) Batching header writesCh
func (s *Store[H]) flushLoop() {
defer close(s.writesDn)
ctx := context.Background()
for headers := range s.writes {
// add headers to the pending and ensure they are accessible
s.pending.Append(headers...)
// and notify waiters if any + increase current read head height
// it is important to do Pub after updating pending
// so pending is consistent with atomic Height counter on the heightSub
s.heightSub.Pub(headers...)
// don't flush and continue if pending batch is not grown enough,
// and Store is not stopping(headers == nil)
if s.pending.Len() < s.Params.WriteBatchSize && headers != nil {
continue
}

for headers := range s.writesCh {
startTime := time.Now()
toFlush := s.pending.GetAll()
toFlush := headers.GetAll()

for i := 0; ; i++ {
err := s.flush(ctx, toFlush...)
Expand All @@ -404,25 +428,19 @@ func (s *Store[H]) flushLoop() {

from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height()
log.Errorw("writing header batch", "try", i+1, "from", from, "to", to, "err", err)
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true)
s.metrics.flush(ctx, time.Since(startTime), s.writesPending.Len(), true)

const maxRetrySleep = time.Second
sleep := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep)
time.Sleep(sleep)
}

s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
// reset pending
s.pending.Reset()

if headers == nil {
// a signal to stop
return
}
s.metrics.flush(ctx, time.Since(startTime), s.writesPending.Len(), false)
headers.Reset()
}
}

// flush writes the given batch to datastore.
// flush writesCh the given batch to datastore.
func (s *Store[H]) flush(ctx context.Context, headers ...H) error {
ln := len(headers)
if ln == 0 {
Expand Down
Loading