Skip to content

Commit

Permalink
Merge pull request #13 from octu0/v1.4.0
Browse files Browse the repository at this point in the history
v1.4.0
  • Loading branch information
octu0 authored Jul 5, 2022
2 parents 8405558 + 78aa57b commit e6c58cb
Show file tree
Hide file tree
Showing 30 changed files with 1,462 additions and 734 deletions.
58 changes: 25 additions & 33 deletions bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Bitcask struct {
opt *option
path string
curr datafile.Datafile
datafiles map[int32]datafile.Datafile
datafiles map[datafile.FileID]datafile.Datafile
trie art.Tree
indexer indexer.Indexer
ttlIndexer indexer.Indexer
Expand Down Expand Up @@ -621,12 +621,11 @@ func (b *Bitcask) maybeRotate() error {
return nil
}

currentFileID, err := b.closeCurrentFile()
if err != nil {
if _, err := b.closeCurrentFile(); err != nil {
return errors.WithStack(err)
}

if err := b.openWritableFile(currentFileID + 1); err != nil {
if err := b.openWritableFile(datafile.NextFileID()); err != nil {
return errors.WithStack(err)
}

Expand All @@ -638,37 +637,33 @@ func (b *Bitcask) maybeRotate() error {
}

// closeCurrentFile closes current datafile and makes it read only.
func (b *Bitcask) closeCurrentFile() (int32, error) {
func (b *Bitcask) closeCurrentFile() (datafile.FileID, error) {
id := b.curr.FileID()

if err := b.curr.Close(); err != nil {
return 0, errors.WithStack(err)
return datafile.FileID{}, errors.WithStack(err)
}

df, err := datafile.OpenReadonly(
df, err := datafile.OpenReadonly(id, b.path,
datafile.RuntimeContext(b.opt.RuntimeContext),
datafile.Path(b.path),
datafile.FileID(id),
datafile.TempDir(b.opt.TempDir),
datafile.CopyTempThreshold(b.opt.CopyTempThreshold),
)
if err != nil {
if err2 := b.openWritableFile(id); err2 != nil {
return 0, errors.Wrapf(err2, "failed reopen datafile(writable) %s(%d) cause:%+v", b.path, id, err)
return datafile.FileID{}, errors.Wrapf(err2, "failed reopen datafile(writable) %s(%d) cause:%+v", b.path, id, err)
}
return 0, errors.Wrapf(err, "failed to open datafile(readonly) %s(%d)", b.path, id)
return datafile.FileID{}, errors.Wrapf(err, "failed to open datafile(readonly) %s(%d)", b.path, id)
}

b.datafiles[id] = df
return id, nil
}

// openWritableFile opens new datafile for writing data
func (b *Bitcask) openWritableFile(fileID int32) error {
curr, err := datafile.Open(
func (b *Bitcask) openWritableFile(fileID datafile.FileID) error {
curr, err := datafile.Open(fileID, b.path,
datafile.RuntimeContext(b.opt.RuntimeContext),
datafile.Path(b.path),
datafile.FileID(fileID),
datafile.FileMode(b.opt.FileFileModeBeforeUmask),
datafile.TempDir(b.opt.TempDir),
datafile.CopyTempThreshold(b.opt.CopyTempThreshold),
Expand All @@ -677,6 +672,7 @@ func (b *Bitcask) openWritableFile(fileID int32) error {
return errors.WithStack(err)
}
b.curr = curr
b.repliEmit.EmitCurrentFileID(fileID)
return nil
}

Expand All @@ -691,19 +687,17 @@ func (b *Bitcask) Reopen() error {
// reopen reloads a bitcask object with index and datafiles
// caller of this method should take care of locking
func (b *Bitcask) reopenLocked() error {
datafiles, lastID, err := loadDatafiles(b.opt, b.path)
datafiles, lastFileID, err := loadDatafiles(b.opt, b.path)
if err != nil {
return errors.WithStack(err)
}
t, ttlIndex, err := loadIndexes(b, datafiles, lastID)
t, ttlIndex, err := loadIndexes(b, datafiles, lastFileID)
if err != nil {
return errors.WithStack(err)
}

curr, err := datafile.Open(
curr, err := datafile.Open(lastFileID, b.path,
datafile.RuntimeContext(b.opt.RuntimeContext),
datafile.Path(b.path),
datafile.FileID(lastID),
datafile.FileMode(b.opt.FileFileModeBeforeUmask),
datafile.TempDir(b.opt.TempDir),
datafile.CopyTempThreshold(b.opt.CopyTempThreshold),
Expand Down Expand Up @@ -785,28 +779,26 @@ func isExpiredFromTime(expiry time.Time) bool {
return expiry.Before(time.Now().UTC())
}

func loadDatafiles(opt *option, path string) (map[int32]datafile.Datafile, int32, error) {
ids, err := datafile.ParseIdsFromDatafiles(path)
func loadDatafiles(opt *option, path string) (map[datafile.FileID]datafile.Datafile, datafile.FileID, error) {
ids, err := datafile.GrepFileIdsFromDatafilePath(path)
if err != nil {
return nil, 0, err
return nil, datafile.FileID{}, err
}

datafiles := make(map[int32]datafile.Datafile, len(ids))
datafiles := make(map[datafile.FileID]datafile.Datafile, len(ids))
for _, id := range ids {
d, err := datafile.OpenReadonly(
d, err := datafile.OpenReadonly(id, path,
datafile.RuntimeContext(opt.RuntimeContext),
datafile.Path(path),
datafile.FileID(id),
datafile.TempDir(opt.TempDir),
datafile.CopyTempThreshold(opt.CopyTempThreshold),
)
if err != nil {
return nil, 0, errors.WithStack(err)
return nil, datafile.FileID{}, errors.WithStack(err)
}
datafiles[id] = d
}
if len(ids) < 1 {
return datafiles, 0, nil
return datafiles, datafile.FileID{}, nil
}

lastID := ids[len(ids)-1]
Expand All @@ -816,7 +808,7 @@ func loadDatafiles(opt *option, path string) (map[int32]datafile.Datafile, int32
// loadIndexes loads index from disk to memory. If index is not available or partially available (last bitcask process crashed)
// then it iterates over last datafile and construct index
// we construct ttl_index here also along with normal index
func loadIndexes(b *Bitcask, datafiles map[int32]datafile.Datafile, lastID int32) (art.Tree, art.Tree, error) {
func loadIndexes(b *Bitcask, datafiles map[datafile.FileID]datafile.Datafile, lastID datafile.FileID) (art.Tree, art.Tree, error) {
t, found, err := b.indexer.Load(filepath.Join(b.path, filerIndexFile))
if err != nil {
return nil, nil, errors.WithStack(err)
Expand All @@ -835,12 +827,12 @@ func loadIndexes(b *Bitcask, datafiles map[int32]datafile.Datafile, lastID int32
return t, ttlIndex, nil
}

fileIds := make([]int32, 0, len(datafiles))
fileIds := make([]datafile.FileID, 0, len(datafiles))
for _, df := range datafiles {
fileIds = append(fileIds, df.FileID())
}
sort.Slice(fileIds, func(i, j int) bool {
return fileIds[i] < fileIds[j]
return fileIds[i].Newer(fileIds[j])
})

for _, fileID := range fileIds {
Expand Down Expand Up @@ -1008,7 +1000,7 @@ func Open(path string, funcs ...OptionFunc) (*Bitcask, error) {
return nil, errors.WithStack(err)
}
if ok != true {
return nil, ErrDatabaseLocked
return nil, errors.WithStack(ErrDatabaseLocked)
}

if err := bitcask.Reopen(); err != nil {
Expand Down
14 changes: 9 additions & 5 deletions bitcask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ func TestOpenErrors(t *testing.T) {
assert.Error(err)
})

t.Run("LoadDatafilesError", func(t *testing.T) {
t.Run("LoadDatafiles", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
Expand All @@ -1233,12 +1233,16 @@ func TestOpenErrors(t *testing.T) {
assert.NoError(err)

// Simulate some horrible that happened to the datafiles!
err = os.Rename(filepath.Join(testdir, "000000000.data"), filepath.Join(testdir, "000000000xxx.data"))
err = os.Rename(filepath.Join(testdir, db.curr.FileID().String()+".data"), filepath.Join(testdir, db.curr.FileID().String()+"xxx.data"))
assert.NoError(err)

_, err = Open(testdir)
assert.Error(err)
assert.Equal("strconv.ParseInt: parsing \"000000000xxx\": invalid syntax", err.Error())
db2, err := Open(testdir)
if err != nil {
t.Errorf("no error! %+v", err)
}
if db.curr.FileID().Newer(db2.curr.FileID()) != true {
t.Errorf("open new file: old:%s new:%s", db.curr.FileID(), db2.curr.FileID())
}
})
}

Expand Down
Loading

0 comments on commit e6c58cb

Please sign in to comment.