Skip to content

Commit

Permalink
try to fix bug in dmap merger
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Dec 13, 2020
1 parent 25a6195 commit 46e6e5b
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 30 deletions.
2 changes: 1 addition & 1 deletion dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (db *Olric) createDMap(part *partition, name string) (*dmap, error) {
if !ok {
return nil, fmt.Errorf("storage engine could not be found: %s", nm.config.storageEngine)
}
nm.storage, err = engine.Fork()
nm.storage, err = engine.Fork(nil)
if err != nil {
return nil, err
}
Expand Down
26 changes: 9 additions & 17 deletions dmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,33 +121,25 @@ func newDB(c *config.Config, peers ...*Olric) (*Olric, error) {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
c.Started = func() {
defer cancel()
}

db, err := New(c)
if err != nil {
return nil, err
}

db.wg.Add(1)
go db.callStartedCallback()

db.wg.Add(1)
go func() {
defer db.wg.Done()
err = db.server.ListenAndServe()
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to run TCP server")
serr := db.Start()
if serr != nil {
db.log.V(2).Printf("[ERROR] Failed to start Olric node: %s", serr)
}
}()
<-db.server.StartedCtx.Done()
db.passCheckpoint()

err = db.startDiscovery()
if err != nil {
return nil, err
}
// Wait some time for goroutines
<-time.After(100 * time.Millisecond)
db.passCheckpoint()

<-ctx.Done()
return db, nil
}

Expand Down
28 changes: 19 additions & 9 deletions internal/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package kvstore

import (
"errors"
"regexp"

"github.com/buraksezer/olric/pkg/storage"
Expand Down Expand Up @@ -48,16 +49,24 @@ func (kv *KVStore) SetConfig(c *storage.Config) {
kv.config = c
}

func (kv *KVStore) Start() error { return nil }
func (kv *KVStore) Start() error {
if kv.config == nil {
return errors.New("config cannot be nil")
}
return nil
}

// Fork creates a new KVStore instance.
func (kv *KVStore) Fork() (storage.Engine, error) {
size, err := kv.config.Get("tableSize")
func (kv *KVStore) Fork(c *storage.Config) (storage.Engine, error) {
if c == nil {
c = kv.config.Copy()
}
size, err := c.Get("tableSize")
if err != nil {
return nil, err
}
child := &KVStore{
config: kv.config,
config: c,
}
t := newTable(size.(int))
child.tables = append(kv.tables, t)
Expand Down Expand Up @@ -305,19 +314,20 @@ func (kv *KVStore) Import(data []byte) (storage.Engine, error) {
return nil, err
}

options := kv.config.Copy()
options.Add("tableSize", tr.Allocated)
fresh, err := kv.Fork()
c := kv.config.Copy()
c.Add("tableSize", tr.Allocated)

child, err := kv.Fork(c)
if err != nil {
return nil, err
}
t := fresh.(*KVStore).tables[0]
t := child.(*KVStore).tables[0]
t.hkeys = tr.HKeys
t.offset = tr.Offset
t.inuse = tr.Inuse
t.garbage = tr.Garbage
copy(t.memory, tr.Memory)
return fresh, nil
return child, nil
}

// Stats is a function which provides memory allocation and garbage ratio of a storage instance.
Expand Down
2 changes: 1 addition & 1 deletion internal/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func bval(i int) []byte {
func testKVStore() (storage.Engine, error) {
kv := &KVStore{}
kv.SetConfig(DefaultConfig())
return kv.Fork()
return kv.Fork(nil)
}

func Test_Put(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *Config) Copy() *Config {
m: make(map[string]interface{}),
}
for key, value := range c.m {
c.m[key] = value
n.m[key] = value
}
return n
}
2 changes: 1 addition & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Engine interface {
Start() error
NewEntry() Entry
Name() string
Fork() (Engine, error)
Fork(*Config) (Engine, error)
PutRaw(uint64, []byte) error
Put(uint64, Entry) error
GetRaw(uint64) ([]byte, error)
Expand Down
1 change: 1 addition & 0 deletions rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error {
mergeErr = err
return false
}
// TODO: Don't put the winner again if it comes from dm.storage
mergeErr = dm.storage.Put(hkey, winner)
if mergeErr == storage.ErrFragmented {
db.wg.Add(1)
Expand Down

0 comments on commit 46e6e5b

Please sign in to comment.