Skip to content

Commit

Permalink
Merge pull request #2 from uttom-akash/feat/ua/background-process
Browse files Browse the repository at this point in the history
Feat/ua/background process
  • Loading branch information
uttom-akash authored Dec 12, 2023
2 parents d53fa1c + 5b7a764 commit e6d8887
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 77 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
storage
storage
*.pprof
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
# lsm-storage
5 changes: 4 additions & 1 deletion docs/TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
- Invoke cmd to access in memory db
- Replace with bloom filters (fix length for block but think about length for sstable)
- what about using btree instead of sstable
- how rocksdb opens hundrade of table at once
- transaction : keep snapshot of memtable
- locking issue
- locking issue
- merge sorts while doing compaction
- don't make large arrary or map while doing compaction
1 change: 1 addition & 0 deletions internal/core/PriorityQueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package core
147 changes: 133 additions & 14 deletions internal/storageengine/backgroundprocess/compaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backgroundprocess

import (
"container/heap"
"pkvstore/internal/storageengine/channels"
"pkvstore/internal/storageengine/configs"
"pkvstore/internal/storageengine/lsmtree"
Expand All @@ -15,27 +16,43 @@ type Compaction struct {
}

func NewCompaction(lsmtree *lsmtree.LSMTree) *Compaction {
return &Compaction{
compaction := &Compaction{
lsmTree: lsmtree,
sharedChan: channels.GetSharedChannel(),
}

go compaction.listenFlushMemtable()
go compaction.listenToCompact()

return compaction
}

func (compaction *Compaction) ListenToCompact() {
for event := range compaction.sharedChan.NewMutationEventChannel {
if event < 1 || !compaction.lsmTree.MemTable.ShouldFlush() {
func (compaction *Compaction) listenToCompact() {

for event := range compaction.sharedChan.CompactionEvent {

if event < 1 {
continue
}

compaction.flushMemtable()
compaction.tryCompactionProcess()
}
}

func (compaction *Compaction) flushMemtable() {
oldMemtable := compaction.lsmTree.MemTable.ReplaceNewTable()
newSSTable := createSSTableFromMemtable(oldMemtable)
compaction.lsmTree.SSTables[newSSTable.Header.Level] = append(compaction.lsmTree.SSTables[newSSTable.Header.Level], newSSTable)
func (compaction *Compaction) listenFlushMemtable() {

for event := range compaction.sharedChan.FlushMemtableEvent {

if event < 1 || compaction.lsmTree.MemTable.ReadOnlyTable == nil {
continue
}

newSSTable := createSSTableFromMemtable(compaction.lsmTree.MemTable.ReadOnlyTable)
compaction.lsmTree.SSTables[newSSTable.Header.Level] = append(compaction.lsmTree.SSTables[newSSTable.Header.Level], newSSTable)
compaction.lsmTree.MemTable.ClearReadOnlyMemtable()

compaction.sharedChan.CompactionEvent <- 1
}
}

func (compaction *Compaction) tryCompactionProcess() {
Expand All @@ -57,16 +74,16 @@ func (compaction *Compaction) tryCompactionProcess() {
}

func (compaction *Compaction) mergeAndReplaceSSTables(currentLevel, newLevel uint8) {
mergedSSTable := mergeSSTables(compaction.lsmTree.SSTables[currentLevel], newLevel)
mergedSSTable := mergeGetSSTables(compaction.lsmTree.SSTables[currentLevel], newLevel)
compaction.lsmTree.SSTables[currentLevel] = make([]*sstable.SSTable, 0)
compaction.lsmTree.SSTables[newLevel] = append(compaction.lsmTree.SSTables[newLevel], mergedSSTable)
}

func mergeSSTables(sstables []*sstable.SSTable, newLevel uint8) *sstable.SSTable {
func mergeSSTables(sstablesInLevel []*sstable.SSTable, newLevel uint8) *sstable.SSTable {
keyValues := make(map[string]*sstable.SSTableEntry)

// Deduplication
for _, ssTable := range sstables {
for _, ssTable := range sstablesInLevel {
for _, block := range ssTable.Blocks {
for _, entry := range block.Entries {
keyValues[entry.Key] = entry
Expand All @@ -87,13 +104,115 @@ func mergeSSTables(sstables []*sstable.SSTable, newLevel uint8) *sstable.SSTable
return sstable.CreateSSTable(mergedSSTableEntries, newLevel)
}

func createSSTableFromMemtable(memTable *memtable.FlushableMemTable) *sstable.SSTable {
func createSSTableFromMemtable(memTable map[string]*memtable.MemTableEntry) *sstable.SSTable {
sstableEntries := make([]*sstable.SSTableEntry, 0)
config := configs.GetStorageEngineConfig()

for k, v := range memTable.Table {
for k, v := range memTable {
sstableEntries = append(sstableEntries, sstable.NewSSTableEntry(k, v.Value, v.IsTombstone))
}

sort.Slice(sstableEntries, func(i, j int) bool {
return sstableEntries[i].Key < sstableEntries[j].Key
})

return sstable.CreateSSTable(sstableEntries, uint8(config.LSMTreeConfig.FirstLevel))
}

type MergeItem struct {
Entry *sstable.SSTableEntry
SSTableID int
BlockID int
EntryID int
Index int
}

type MergePriorityQueue []*MergeItem

func (mpq MergePriorityQueue) Len() int {
return len(mpq)
}

func (mpq MergePriorityQueue) Less(i, j int) bool {
if mpq[i].Entry.Key == mpq[j].Entry.Key {
return mpq[i].SSTableID > mpq[j].SSTableID
}
return mpq[i].Entry.Key < mpq[j].Entry.Key
}

func (pq MergePriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}

func (pq *MergePriorityQueue) Push(x any) {
n := len(*pq)
item := x.(*MergeItem)
item.Index = n
*pq = append(*pq, item)
}

func (pq *MergePriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.Index = -1 // for safety
*pq = old[0 : n-1]
return item
}

func mergeGetSSTables(sstablesInLevel []*sstable.SSTable, newLevel uint8) *sstable.SSTable {
frontier := make(MergePriorityQueue, 0)
numberEntries := uint(0)

for sstableID, ssTable := range sstablesInLevel {
numberEntries += ssTable.Header.NumberEntries

firstBlock := ssTable.Blocks[0]
heap.Push(&frontier, &MergeItem{
Entry: firstBlock.Entries[0],
SSTableID: sstableID,
BlockID: 0,
EntryID: 0,
})
}

newSSTable := sstable.OpenSSTable(newLevel, numberEntries)
lastKey := ""

for len(frontier) > 0 {
item := heap.Pop(&frontier).(*MergeItem)

// Deduplication
if lastKey != item.Entry.Key {
newSSTable.AddEntry(item.Entry)
lastKey = item.Entry.Key
}

if item.EntryID+1 < len(sstablesInLevel[item.SSTableID].Blocks[item.BlockID].Entries) {
block := sstablesInLevel[item.SSTableID].Blocks[item.BlockID]
heap.Push(&frontier, &MergeItem{
Entry: block.Entries[item.EntryID+1],
SSTableID: item.SSTableID,
BlockID: item.BlockID,
EntryID: item.EntryID + 1,
})

continue
}

if item.BlockID+1 < len(sstablesInLevel[item.SSTableID].Blocks) {
block := sstablesInLevel[item.SSTableID].Blocks[item.BlockID+1]
heap.Push(&frontier, &MergeItem{
Entry: block.Entries[0],
SSTableID: item.SSTableID,
BlockID: item.BlockID + 1,
EntryID: 0,
})
}
}

return newSSTable.CompleteSSTableCreation()
}
6 changes: 6 additions & 0 deletions internal/storageengine/channels/sharedchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import "sync"

type SharedChannel struct {
NewMutationEventChannel chan int
SwitchMemtableEvent chan int
FlushMemtableEvent chan int
CompactionEvent chan int
}

func newSharedChannel() *SharedChannel {
return &SharedChannel{
NewMutationEventChannel: make(chan int, 10000),
SwitchMemtableEvent: make(chan int, 10000),
FlushMemtableEvent: make(chan int, 100),
CompactionEvent: make(chan int, 10),
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/storageengine/configs/storageengineconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewStorageEngineConfig() *StorageEngineConfig {
config.SSTableConfig.BlockCapacity = 2048 //2048
config.SSTableConfig.BlockFilterFalsePositive = 0.001 // 1 in 1000, 3.59KiB, hash function 10

config.MemTableConfig.MaxCapacity = 16384 //4096
config.MemTableConfig.MaxCapacity = 2 //4096

return config
}
Expand Down
78 changes: 47 additions & 31 deletions internal/storageengine/memtable/memtable.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package memtable

import (
"os"
"pkvstore/internal/models"
"pkvstore/internal/storageengine/channels"
"pkvstore/internal/storageengine/configs"
"sync"
)
Expand All @@ -19,35 +21,26 @@ func NewMemTableEntry(value string) *MemTableEntry {
}

type MemTable struct {
Table map[string]*MemTableEntry
size uint32
mutex sync.RWMutex
}

type FlushableMemTable struct {
Table map[string]*MemTableEntry
size uint32
Table map[string]*MemTableEntry
ReadOnlyTable map[string]*MemTableEntry
size uint32
mutex sync.RWMutex
sharedChannel *channels.SharedChannel
}

func NewMemTable() *MemTable {
return &MemTable{
Table: make(map[string]*MemTableEntry),
size: 0,
m := &MemTable{
Table: make(map[string]*MemTableEntry),
ReadOnlyTable: nil,
size: 0,
sharedChannel: channels.GetSharedChannel(),
}
}

func NewFlushableMemTable(table map[string]*MemTableEntry, size uint32) *FlushableMemTable {
return &FlushableMemTable{
Table: table,
size: size,
}
}
var wg sync.WaitGroup

func (m *MemTable) Put(key string, value string) {
m.mutex.Lock()
defer m.mutex.Unlock()
go m.ListenSwitchTableEvent(&wg)

m.Table[key] = NewMemTableEntry(value)
return m
}

func (m *MemTable) Get(key string) *models.Result {
Expand All @@ -66,6 +59,13 @@ func (m *MemTable) Get(key string) *models.Result {
return models.NewNotFoundResult()
}

func (m *MemTable) Put(key string, value string) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.Table[key] = NewMemTableEntry(value)
}

func (m *MemTable) Delete(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -75,25 +75,41 @@ func (m *MemTable) Delete(key string) {
}

func (m *MemTable) Size() int {
m.mutex.Lock()
defer m.mutex.Unlock()
return len(m.Table)
}

func (m *MemTable) ShouldFlush() bool {
func (m *MemTable) ListenSwitchTableEvent(wg *sync.WaitGroup) {

defer wg.Done()

exitSignal := make(chan os.Signal, 1)

config := configs.GetStorageEngineConfig()

return m.Size() >= config.MemTableConfig.MaxCapacity
for {
select {
case switchevent := <-m.sharedChannel.SwitchMemtableEvent:
if switchevent < 0 || m.Size() < config.MemTableConfig.MaxCapacity || m.ReadOnlyTable != nil {
continue
}

m.swtichMemtable()

m.sharedChannel.FlushMemtableEvent <- 1
case <-exitSignal:
return
}
}
}

func (m *MemTable) ReplaceNewTable() *FlushableMemTable {
func (m *MemTable) swtichMemtable() {
m.mutex.Lock()
defer m.mutex.Unlock()

oldTable := NewFlushableMemTable(m.Table, m.size)

m.ReadOnlyTable = m.Table
m.Table = make(map[string]*MemTableEntry)
m.size = 0
}

return oldTable
func (m *MemTable) ClearReadOnlyMemtable() {
m.ReadOnlyTable = nil
}
Loading

0 comments on commit e6d8887

Please sign in to comment.