Skip to content

Commit

Permalink
Merge pull request filecoin-project#6754 from filecoin-project/feat/s…
Browse files Browse the repository at this point in the history
…plitstore-refactor

Splitstore: Some small fixes
  • Loading branch information
Stebalien authored Jul 14, 2021
2 parents f0c9e4e + 3f3a12b commit 44d0171
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 52 deletions.
30 changes: 13 additions & 17 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Blockstore struct {
var _ blockstore.Blockstore = (*Blockstore)(nil)
var _ blockstore.Viewer = (*Blockstore)(nil)
var _ blockstore.BlockstoreIterator = (*Blockstore)(nil)
var _ blockstore.BlockstoreGC = (*Blockstore)(nil)
var _ io.Closer = (*Blockstore)(nil)

// Open creates a new badger-backed blockstore, with the supplied options.
Expand Down Expand Up @@ -167,34 +168,29 @@ func (b *Blockstore) CollectGarbage() error {
}
defer b.viewers.Done()

var err error
// compact first to gather the necessary statistics for GC
nworkers := runtime.NumCPU() / 2
if nworkers < 2 {
nworkers = 2
}

err := b.DB.Flatten(nworkers)
if err != nil {
return err
}

for err == nil {
err = b.DB.RunValueLogGC(0.125)
}

if err == badger.ErrNoRewrite {
// not really an error in this case
// not really an error in this case, it signals the end of GC
return nil
}

return err
}

// Compact runs a synchronous compaction
func (b *Blockstore) Compact() error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

nworkers := runtime.NumCPU() / 2
if nworkers < 2 {
nworkers = 2
}

return b.DB.Flatten(nworkers)
}

// View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
Expand Down
5 changes: 5 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type BlockstoreIterator interface {
ForEachKey(func(cid.Cid) error) error
}

// BlockstoreGC is a trait for blockstores that support online garbage collection
type BlockstoreGC interface {
CollectGarbage() error
}

// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
// The ID store filters out all puts for blocks with CIDs using the "identity"
// hash function. It also extracts inlined blocks from CIDs using the identity
Expand Down
82 changes: 47 additions & 35 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ var (
// this is first computed at warmup and updated in every compaction
markSetSizeKey = dstore.NewKey("/splitstore/markSetSize")

// compactionIndexKey stores the compaction index (serial number)
compactionIndexKey = dstore.NewKey("/splitstore/compactionIndex")

log = logging.Logger("splitstore")

// used to signal end of walk
Expand Down Expand Up @@ -140,6 +143,8 @@ type SplitStore struct {
markSetEnv MarkSetEnv
markSetSize int64

compactionIndex int64

ctx context.Context
cancel func()

Expand Down Expand Up @@ -480,6 +485,9 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
s.chain = chain
curTs := chain.GetHeaviestTipSet()

// should we warmup
warmup := false

// load base epoch from metadata ds
// if none, then use current epoch because it's a fresh start
bs, err := s.ds.Get(baseEpochKey)
Expand Down Expand Up @@ -509,11 +517,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
s.warmupEpoch = bytesToEpoch(bs)

case dstore.ErrNotFound:
// the hotstore hasn't warmed up, start a concurrent warm up
err = s.warmup(curTs)
if err != nil {
return xerrors.Errorf("error warming up: %w", err)
}
warmup = true

default:
return xerrors.Errorf("error loading warmup epoch: %w", err)
Expand All @@ -530,8 +534,29 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return xerrors.Errorf("error loading mark set size: %w", err)
}

// load compactionIndex from metadata ds to provide a hint as to when to perform moving gc
bs, err = s.ds.Get(compactionIndexKey)
switch err {
case nil:
s.compactionIndex = bytesToInt64(bs)

case dstore.ErrNotFound:
// this is potentially an upgrade from splitstore v0; schedule a warmup as v0 has
// some issues with hot references leaking into the coldstore.
warmup = true
default:
return xerrors.Errorf("error loading compaction index: %w", err)
}

log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)

if warmup {
err = s.warmup(curTs)
if err != nil {
return xerrors.Errorf("error starting warmup: %w", err)
}
}

// watch the chain
chain.SubscribeHeadChanges(s.HeadChange)

Expand Down Expand Up @@ -653,7 +678,7 @@ func (s *SplitStore) viewDone() {

s.txnViews--
if s.txnViews == 0 && s.txnViewsWaiting {
s.txnViewsCond.Signal()
s.txnViewsCond.Broadcast()
}
}

Expand Down Expand Up @@ -717,7 +742,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
quiet = true
log.Warnf("error checking markset: %s", err)
}
continue
// track it anyways
}

if mark {
Expand Down Expand Up @@ -943,6 +968,12 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
s.warmupEpoch = epoch
s.mx.Unlock()

// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
err = s.ds.Put(compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}

return nil
}

Expand Down Expand Up @@ -977,7 +1008,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary

log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch)
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex)

markSet, err := s.markSetEnv.Create("live", s.markSetSize)
if err != nil {
Expand All @@ -994,7 +1025,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
s.beginTxnMarking(markSet)

// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
// and messages until the boundary epoch.nn
log.Info("marking reachable objects")
startMark := time.Now()

Expand Down Expand Up @@ -1147,6 +1178,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error saving mark set size: %w", err)
}

s.compactionIndex++
err = s.ds.Put(compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}

return nil
}

Expand Down Expand Up @@ -1402,8 +1439,7 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) {

func (s *SplitStore) checkClosing() error {
if atomic.LoadInt32(&s.closing) == 1 {
log.Info("splitstore is closing; aborting compaction")
return xerrors.Errorf("compaction aborted")
return xerrors.Errorf("splitstore is closing")
}

return nil
Expand Down Expand Up @@ -1697,30 +1733,6 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
}
}

func (s *SplitStore) gcHotstore() {
if compact, ok := s.hot.(interface{ Compact() error }); ok {
log.Infof("compacting hotstore")
startCompact := time.Now()
err := compact.Compact()
if err != nil {
log.Warnf("error compacting hotstore: %s", err)
return
}
log.Infow("hotstore compaction done", "took", time.Since(startCompact))
}

if gc, ok := s.hot.(interface{ CollectGarbage() error }); ok {
log.Infof("garbage collecting hotstore")
startGC := time.Now()
err := gc.CollectGarbage()
if err != nil {
log.Warnf("error garbage collecting hotstore: %s", err)
return
}
log.Infow("hotstore garbage collection done", "took", time.Since(startGC))
}
}

func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
s.baseEpoch = epoch
return s.ds.Put(baseEpochKey, epochToBytes(epoch))
Expand Down
30 changes: 30 additions & 0 deletions blockstore/splitstore/splitstore_gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package splitstore

import (
"fmt"
"time"

bstore "github.com/filecoin-project/lotus/blockstore"
)

func (s *SplitStore) gcHotstore() {
if err := s.gcBlockstoreOnline(s.hot); err != nil {
log.Warnf("error garbage collecting hostore: %s", err)
}
}

func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error {
if gc, ok := b.(bstore.BlockstoreGC); ok {
log.Info("garbage collecting blockstore")
startGC := time.Now()

if err := gc.CollectGarbage(); err != nil {
return err
}

log.Infow("garbage collecting hotstore done", "took", time.Since(startGC))
return nil
}

return fmt.Errorf("blockstore doesn't support online gc: %T", b)
}

0 comments on commit 44d0171

Please sign in to comment.