Skip to content

Commit

Permalink
Merge pull request #265 from CortexFoundation/dev
Browse files Browse the repository at this point in the history
torrentfs/monitor/storage | useful err check
  • Loading branch information
ucwong authored Nov 30, 2019
2 parents 237e5a2 + 108e65f commit 312fef7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 38 deletions.
8 changes: 4 additions & 4 deletions torrentfs/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewMonitor(flag *Config) (m *Monitor, e error) {
terminated: 0,
lastNumber: uint64(0),
dirty: false,
taskCh: make(chan *Block, batch*4),
taskCh: make(chan *Block, batch),
}
m.blockCache, _ = lru.New(6)
m.healthPeers, _ = lru.New(50)
Expand Down Expand Up @@ -1020,9 +1020,9 @@ func (m *Monitor) syncLastBlock() uint64 {
//return m.lastNumber - minNumber
return 0
}
/*if err := m.deal(rpcBlock); err != nil {
return 0
}*/
//if err := m.deal(rpcBlock); err != nil {
// return 0
//}
}
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
m.lastNumber = maxNumber
Expand Down
88 changes: 54 additions & 34 deletions torrentfs/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package torrentfs

import (
"encoding/json"
"github.com/CortexFoundation/CortexTheseus/params"
"errors"
//"fmt"
"os"
Expand Down Expand Up @@ -54,8 +55,8 @@ func (mc *MutexCounter) IsZero() bool {

type FileStorage struct {
filesContractAddr map[common.Address]*FileInfo
files []*FileInfo
blocks []*Block
files []*FileInfo //only storage init files from local storage
blocks []*Block //only storage init ckp blocks from local storage
db *bolt.DB
version string

Expand Down Expand Up @@ -112,13 +113,21 @@ func NewFileStorage(config *Config) (*FileStorage, error) {

fs.version = "1"

fs.initBlockNumber()
fs.initCheckPoint()
fs.initBlocks()
//fs.readLastFileIndex()
fs.initFiles()
fs.initMerkleTree()
//tmpCache, _ := lru.New(120)
if err := fs.initBlockNumber(); err != nil {
return nil, err
}
if err := fs.initCheckPoint(); err != nil {
return nil, err
}
if err := fs.initBlocks(); err != nil {
return nil, err
}
if err := fs.initFiles(); err != nil {
return nil, err
}
if err := fs.initMerkleTree(); err != nil {
return nil, err
}

return fs, nil
}
Expand Down Expand Up @@ -167,14 +176,16 @@ func (fs *FileStorage) NewFileInfo(Meta *FileMeta) *FileInfo {
//}

func (fs *FileStorage) initMerkleTree() error {
fs.leaves = append(fs.leaves, BlockContent{x: "0x21d6ce908e2d1464bd74bbdbf7249845493cc1ba10460758169b978e187762c1"})
fs.leaves = append(fs.leaves, BlockContent{x: params.MainnetGenesisHash.String()})//"0x21d6ce908e2d1464bd74bbdbf7249845493cc1ba10460758169b978e187762c1"})
tr, err := NewTree(fs.leaves)
if err != nil {
return err
}
fs.tree = tr
for _, block := range fs.blocks {
fs.addLeaf(block)
if err := fs.addLeaf(block); err != nil {
panic("Storage merkletree construct failed")
}
}

log.Info("Storage merkletree initialization", "root", common.ToHex(fs.tree.MerkleRoot()))
Expand All @@ -184,11 +195,17 @@ func (fs *FileStorage) initMerkleTree() error {

func (fs *FileStorage) addLeaf(block *Block) error {
fs.leaves = append(fs.leaves, BlockContent{x: block.Hash.String()})
fs.tree.RebuildTreeWith(fs.leaves)
fs.writeRoot(block.Number, fs.tree.MerkleRoot())
log.Debug("Add a new leaf", "number", block.Number, "root", common.ToHex(fs.tree.MerkleRoot())) //, "version", common.ToHex(version)) //MerkleRoot())
if err := fs.tree.RebuildTreeWith(fs.leaves); err == nil {
if err := fs.writeRoot(block.Number, fs.tree.MerkleRoot()); err != nil {
return err
}

return nil
log.Debug("Add a new leaf", "number", block.Number, "root", common.ToHex(fs.tree.MerkleRoot())) //, "version", common.ToHex(version)) //MerkleRoot())

return nil
} else {
return err
}
}

func (fs *FileStorage) Root() common.Hash {
Expand Down Expand Up @@ -466,9 +483,9 @@ func (fs *FileStorage) Version() string {
}

func (fs *FileStorage) initBlocks() error {
return fs.db.View(func(tx *bolt.Tx) error {
if buk := tx.Bucket([]byte("blocks_" + fs.version)); buk == nil {
return ErrReadDataFromBoltDB
return fs.db.Update(func(tx *bolt.Tx) error {
if buk, err := tx.CreateBucketIfNotExists([]byte("blocks_" + fs.version)); err != nil {
return err
} else {
c := buk.Cursor()

Expand All @@ -491,9 +508,9 @@ func (fs *FileStorage) initBlocks() error {
}

func (fs *FileStorage) initFiles() error {
return fs.db.View(func(tx *bolt.Tx) error {
if buk := tx.Bucket([]byte("files_" + fs.version)); buk == nil {
return ErrReadDataFromBoltDB
return fs.db.Update(func(tx *bolt.Tx) error {
if buk, err := tx.CreateBucketIfNotExists([]byte("files_" + fs.version)); buk == nil || err != nil {
return err
} else {
c := buk.Cursor()

Expand Down Expand Up @@ -552,16 +569,17 @@ func (fs *FileStorage) initFiles() error {
})
}*/
func (fs *FileStorage) initCheckPoint() error {
return fs.db.View(func(tx *bolt.Tx) error {
buk := tx.Bucket([]byte("checkpoint_" + fs.version))
if buk == nil {
return ErrReadDataFromBoltDB
return fs.db.Update(func(tx *bolt.Tx) error {
buk, err := tx.CreateBucketIfNotExists([]byte("checkpoint_" + fs.version))
if err != nil {
return err
}

v := buk.Get([]byte("key"))

if v == nil {
return ErrReadDataFromBoltDB
//return ErrReadDataFromBoltDB
return nil
}

number, err := strconv.ParseUint(string(v), 16, 64)
Expand All @@ -576,16 +594,18 @@ func (fs *FileStorage) initCheckPoint() error {
}

func (fs *FileStorage) initBlockNumber() error {
return fs.db.View(func(tx *bolt.Tx) error {
buk := tx.Bucket([]byte("currentBlockNumber_" + fs.version))
if buk == nil {
return ErrReadDataFromBoltDB
return fs.db.Update(func(tx *bolt.Tx) error {
//buk := tx.Bucket([]byte("currentBlockNumber_" + fs.version))
buk, err := tx.CreateBucketIfNotExists([]byte("currentBlockNumber_" + fs.version))
if err != nil {
return err
}

v := buk.Get([]byte("key"))

if v == nil {
return ErrReadDataFromBoltDB
//return ErrReadDataFromBoltDB
return nil
}

number, err := strconv.ParseUint(string(v), 16, 64)
Expand Down Expand Up @@ -636,9 +656,9 @@ func (fs *FileStorage) writeRoot(number uint64, root []byte) error {

func (fs *FileStorage) GetRootByNumber(number uint64) (root []byte) {
cb := func(tx *bolt.Tx) error {
buk := tx.Bucket([]byte("version_" + fs.version))
if buk == nil {
return ErrReadDataFromBoltDB
buk, err := tx.CreateBucketIfNotExists([]byte("version_" + fs.version))
if err != nil {
return err
}

v := buk.Get([]byte(strconv.FormatUint(number, 16)))
Expand Down

0 comments on commit 312fef7

Please sign in to comment.