diff --git a/store/store.go b/store/store.go index 83303fd1..8e81bf30 100644 --- a/store/store.go +++ b/store/store.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "sync/atomic" + "sync" "time" lru "github.com/hashicorp/golang-lru/v2" @@ -45,14 +45,13 @@ type Store[H header.Header[H]] struct { // writing to datastore // - // queue of headers to be written - writes chan []H + writesMu sync.Mutex + // writesPending keeps headers pending to be written in one batch + writesPending *batch[H] + // queue of batches to be written + writesCh chan *batch[H] // signals when writes are finished writesDn chan struct{} - // writeHead maintains the current write head - writeHead atomic.Pointer[H] - // pending keeps headers pending to be written in one batch - pending *batch[H] Params Parameters } @@ -99,15 +98,15 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store } return &Store[H]{ - ds: wrappedStore, - cache: cache, - metrics: metrics, - heightIndex: index, - heightSub: newHeightSub[H](), - writes: make(chan []H, 16), - writesDn: make(chan struct{}), - pending: newBatch[H](params.WriteBatchSize), - Params: params, + ds: wrappedStore, + cache: cache, + metrics: metrics, + heightIndex: index, + heightSub: newHeightSub[H](), + writesCh: make(chan *batch[H], 4), + writesDn: make(chan struct{}), + writesPending: newBatch[H](params.WriteBatchSize), + Params: params, }, nil } @@ -126,11 +125,14 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { return nil } +// Start starts or restarts the Store. func (s *Store[H]) Start(context.Context) error { // closed s.writesDn means that store was stopped before, recreate chan. select { case <-s.writesDn: + s.writesCh = make(chan *batch[H], 4) s.writesDn = make(chan struct{}) + s.writesPending = newBatch[H](s.Params.WriteBatchSize) default: } @@ -138,19 +140,26 @@ func (s *Store[H]) Start(context.Context) error { return nil } +// Stop stops the store and cleans up resources. +// Canceling context while stopping may leave the store in an inconsistent state. func (s *Store[H]) Stop(ctx context.Context) error { + s.writesMu.Lock() + defer s.writesMu.Unlock() + // check if store was already stopped select { case <-s.writesDn: return errStoppedStore default: } - // signal to prevent further writes to Store + // write the pending leftover select { - case s.writes <- nil: + case s.writesCh <- s.writesPending: + // signal closing to flushLoop + close(s.writesCh) case <-ctx.Done(): return ctx.Err() } - // wait till it is done writing + // wait till flushLoop is done writing select { case <-s.writesDn: case <-ctx.Done(): @@ -193,7 +202,7 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { return v, nil } // check if the requested header is not yet written on disk - if h := s.pending.Get(hash); !h.IsZero() { + if h := s.writesPending.Get(hash); !h.IsZero() { return h, nil } @@ -227,7 +236,8 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { // which means the requested 'height' should be present // // check if the requested header is not yet written on disk - if h := s.pending.GetByHeight(height); !h.IsZero() { + // TODO: Synchronize with prepareWrite? + if h := s.writesPending.GetByHeight(height); !h.IsZero() { return h, nil } @@ -287,7 +297,7 @@ func (s *Store[H]) Has(ctx context.Context, hash header.Hash) (bool, error) { return ok, nil } // check if the requested header is not yet written on disk - if ok := s.pending.Has(hash); ok { + if ok := s.writesPending.Has(hash); ok { return ok, nil } @@ -304,23 +314,15 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { return nil } - var err error // take current write head to verify headers against - var head H - headPtr := s.writeHead.Load() - if headPtr == nil { - head, err = s.Head(ctx) - if err != nil { - return err - } - } else { - head = *headPtr + head, err := s.Head(ctx) + if err != nil { + return err } // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - err = head.Verify(h) if err != nil { var verErr *header.VerifyError @@ -344,27 +346,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = h } - onWrite := func() { - newHead := verified[len(verified)-1] - s.writeHead.Store(&newHead) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) + // prepare headers to be written + toWrite, err := s.prepareWrite(verified) + switch { + case err != nil: + return err + case toWrite == nil: + return nil } // queue headers to be written on disk select { - case s.writes <- verified: + case s.writesCh <- toWrite: // we return an error here after writing, // as there might be an invalid header in between of a given range - onWrite() return err default: s.metrics.writesQueueBlocked(ctx) } - // if the writes queue is full, we block until it is not + // if the writesCh queue is full - we block anyway select { - case s.writes <- verified: - onWrite() + case s.writesCh <- toWrite: return err case <-s.writesDn: return errStoppedStore @@ -373,28 +375,50 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { } } +func (s *Store[H]) prepareWrite(headers []H) (*batch[H], error) { + s.writesMu.Lock() + defer s.writesMu.Unlock() + // check if store was stopped + select { + case <-s.writesDn: + return nil, errStoppedStore + default: + } + + // keep verified headers as pending writes and ensure they are accessible for reads + s.writesPending.Append(headers...) + // notify waiters if any + // it is important to do Pub after updating pending + // so pending is consistent with atomic Height counter on the heightSub + s.heightSub.Pub(headers...) + + // TODO: Head advancing + // announce our new head + newHead := headers[len(headers)-1] + s.metrics.newHead(newHead.Height()) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + + // don't flush and continue if pending write batch is not grown enough, + if s.writesPending.Len() < s.Params.WriteBatchSize { + return nil, nil + } + + toWrite := s.writesPending + s.writesPending = newBatch[H](s.Params.WriteBatchSize) + return toWrite, nil +} + // flushLoop performs writing task to the underlying datastore in a separate routine -// This way writes are controlled and manageable from one place allowing -// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions -// (2) Batching header writes +// This way writesCh are controlled and manageable from one place allowing +// (1) Appends not to be blocked on long disk IO writesCh and underlying DB compactions +// (2) Batching header writesCh func (s *Store[H]) flushLoop() { defer close(s.writesDn) ctx := context.Background() - for headers := range s.writes { - // add headers to the pending and ensure they are accessible - s.pending.Append(headers...) - // and notify waiters if any + increase current read head height - // it is important to do Pub after updating pending - // so pending is consistent with atomic Height counter on the heightSub - s.heightSub.Pub(headers...) - // don't flush and continue if pending batch is not grown enough, - // and Store is not stopping(headers == nil) - if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { - continue - } + for headers := range s.writesCh { startTime := time.Now() - toFlush := s.pending.GetAll() + toFlush := headers.GetAll() for i := 0; ; i++ { err := s.flush(ctx, toFlush...) @@ -404,25 +428,19 @@ func (s *Store[H]) flushLoop() { from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height() log.Errorw("writing header batch", "try", i+1, "from", from, "to", to, "err", err) - s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true) + s.metrics.flush(ctx, time.Since(startTime), s.writesPending.Len(), true) const maxRetrySleep = time.Second sleep := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep) time.Sleep(sleep) } - s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false) - // reset pending - s.pending.Reset() - - if headers == nil { - // a signal to stop - return - } + s.metrics.flush(ctx, time.Since(startTime), s.writesPending.Len(), false) + headers.Reset() } } -// flush writes the given batch to datastore. +// flush writesCh the given batch to datastore. func (s *Store[H]) flush(ctx context.Context, headers ...H) error { ln := len(headers) if ln == 0 {