Skip to content

Commit

Permalink
Read from readonly memtable and refactor the priority queue for merge
Browse files Browse the repository at this point in the history
  • Loading branch information
uttom-akash committed Dec 20, 2023
1 parent 5b7a764 commit 8a95b2d
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 87 deletions.
44 changes: 44 additions & 0 deletions internal/core/PriorityQueue.go
Original file line number Diff line number Diff line change
@@ -1 +1,45 @@
package core

type Item struct {
SortKey string
SSTableID int
BlockID int
EntryID int
Index int
}

type PriorityQueue []*Item

func (mpq PriorityQueue) Len() int {
return len(mpq)
}

func (mpq PriorityQueue) Less(i, j int) bool {
if mpq[i].SortKey == mpq[j].SortKey {
return mpq[i].SSTableID > mpq[j].SSTableID
}
return mpq[i].SortKey < mpq[j].SortKey
}

func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}

func (pq *PriorityQueue) Push(x any) {
n := len(*pq)
item := x.(*Item)
item.Index = n
*pq = append(*pq, item)
}

func (pq *PriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.Index = -1 // for safety
*pq = old[0 : n-1]
return item
}
94 changes: 14 additions & 80 deletions internal/storageengine/backgroundprocess/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backgroundprocess

import (
"container/heap"
"pkvstore/internal/core"
"pkvstore/internal/storageengine/channels"
"pkvstore/internal/storageengine/configs"
"pkvstore/internal/storageengine/lsmtree"
Expand Down Expand Up @@ -79,31 +80,6 @@ func (compaction *Compaction) mergeAndReplaceSSTables(currentLevel, newLevel uin
compaction.lsmTree.SSTables[newLevel] = append(compaction.lsmTree.SSTables[newLevel], mergedSSTable)
}

func mergeSSTables(sstablesInLevel []*sstable.SSTable, newLevel uint8) *sstable.SSTable {
keyValues := make(map[string]*sstable.SSTableEntry)

// Deduplication
for _, ssTable := range sstablesInLevel {
for _, block := range ssTable.Blocks {
for _, entry := range block.Entries {
keyValues[entry.Key] = entry
}
}
}

// List entries
var mergedSSTableEntries []*sstable.SSTableEntry
for _, sstableEntries := range keyValues {
mergedSSTableEntries = append(mergedSSTableEntries, sstableEntries)
}

sort.Slice(mergedSSTableEntries, func(i, j int) bool {
return mergedSSTableEntries[i].Key < mergedSSTableEntries[j].Key
})

return sstable.CreateSSTable(mergedSSTableEntries, newLevel)
}

func createSSTableFromMemtable(memTable map[string]*memtable.MemTableEntry) *sstable.SSTable {
sstableEntries := make([]*sstable.SSTableEntry, 0)
config := configs.GetStorageEngineConfig()
Expand All @@ -119,60 +95,16 @@ func createSSTableFromMemtable(memTable map[string]*memtable.MemTableEntry) *sst
return sstable.CreateSSTable(sstableEntries, uint8(config.LSMTreeConfig.FirstLevel))
}

type MergeItem struct {
Entry *sstable.SSTableEntry
SSTableID int
BlockID int
EntryID int
Index int
}

type MergePriorityQueue []*MergeItem

func (mpq MergePriorityQueue) Len() int {
return len(mpq)
}

func (mpq MergePriorityQueue) Less(i, j int) bool {
if mpq[i].Entry.Key == mpq[j].Entry.Key {
return mpq[i].SSTableID > mpq[j].SSTableID
}
return mpq[i].Entry.Key < mpq[j].Entry.Key
}

func (pq MergePriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}

func (pq *MergePriorityQueue) Push(x any) {
n := len(*pq)
item := x.(*MergeItem)
item.Index = n
*pq = append(*pq, item)
}

func (pq *MergePriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.Index = -1 // for safety
*pq = old[0 : n-1]
return item
}

func mergeGetSSTables(sstablesInLevel []*sstable.SSTable, newLevel uint8) *sstable.SSTable {
frontier := make(MergePriorityQueue, 0)
frontier := make(core.PriorityQueue, 0)
numberEntries := uint(0)

for sstableID, ssTable := range sstablesInLevel {
numberEntries += ssTable.Header.NumberEntries

firstBlock := ssTable.Blocks[0]
heap.Push(&frontier, &MergeItem{
Entry: firstBlock.Entries[0],
heap.Push(&frontier, &core.Item{
SortKey: firstBlock.Entries[0].Key,
SSTableID: sstableID,
BlockID: 0,
EntryID: 0,
Expand All @@ -183,18 +115,20 @@ func mergeGetSSTables(sstablesInLevel []*sstable.SSTable, newLevel uint8) *sstab
lastKey := ""

for len(frontier) > 0 {
item := heap.Pop(&frontier).(*MergeItem)
item := heap.Pop(&frontier).(*core.Item)

// Deduplication
if lastKey != item.Entry.Key {
newSSTable.AddEntry(item.Entry)
lastKey = item.Entry.Key
entry := sstablesInLevel[item.SSTableID].Blocks[item.BlockID].Entries[item.EntryID]

if lastKey != item.SortKey {
newSSTable.AddEntry(entry)
lastKey = entry.Key
}

if item.EntryID+1 < len(sstablesInLevel[item.SSTableID].Blocks[item.BlockID].Entries) {
block := sstablesInLevel[item.SSTableID].Blocks[item.BlockID]
heap.Push(&frontier, &MergeItem{
Entry: block.Entries[item.EntryID+1],
heap.Push(&frontier, &core.Item{
SortKey: block.Entries[item.EntryID+1].Key,
SSTableID: item.SSTableID,
BlockID: item.BlockID,
EntryID: item.EntryID + 1,
Expand All @@ -205,8 +139,8 @@ func mergeGetSSTables(sstablesInLevel []*sstable.SSTable, newLevel uint8) *sstab

if item.BlockID+1 < len(sstablesInLevel[item.SSTableID].Blocks) {
block := sstablesInLevel[item.SSTableID].Blocks[item.BlockID+1]
heap.Push(&frontier, &MergeItem{
Entry: block.Entries[0],
heap.Push(&frontier, &core.Item{
SortKey: block.Entries[0].Key,
SSTableID: item.SSTableID,
BlockID: item.BlockID + 1,
EntryID: 0,
Expand Down
15 changes: 12 additions & 3 deletions internal/storageengine/memtable/memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (m *MemTable) Get(key string) *models.Result {

val, exists := m.Table[key]

if !exists {
val, exists = m.ReadOnlyTable[key]
}

if exists && val.IsTombstone {
return models.NewDeletedResult()
}
Expand Down Expand Up @@ -84,12 +88,10 @@ func (m *MemTable) ListenSwitchTableEvent(wg *sync.WaitGroup) {

exitSignal := make(chan os.Signal, 1)

config := configs.GetStorageEngineConfig()

for {
select {
case switchevent := <-m.sharedChannel.SwitchMemtableEvent:
if switchevent < 0 || m.Size() < config.MemTableConfig.MaxCapacity || m.ReadOnlyTable != nil {
if switchevent < 0 {
continue
}

Expand All @@ -103,9 +105,16 @@ func (m *MemTable) ListenSwitchTableEvent(wg *sync.WaitGroup) {
}

func (m *MemTable) swtichMemtable() {

config := configs.GetStorageEngineConfig()

m.mutex.Lock()
defer m.mutex.Unlock()

if m.Size() < config.MemTableConfig.MaxCapacity || m.ReadOnlyTable != nil {
return
}

m.ReadOnlyTable = m.Table
m.Table = make(map[string]*MemTableEntry)
}
Expand Down
20 changes: 16 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@ func main() {

store.Put("address1", "dhanmandi")

store.Put("name2", "akash")
store.Put("name2", "bkash")

store.Put("address2", "dhanmandi")

store.Put("name3", "akash")
store.Put("name3", "dkash")

store.Put("address3", "dhanmandi")

store.Put("name", "ckash")

fmt.Println("name: ", store.Get("name")) // Output: one
store.Delete("name")

newUser(store, 10)

fmt.Println("name1: ", store.Get("name1")) // Output: one
fmt.Println("name2: ", store.Get("name2")) // Output: one
fmt.Println("name3: ", store.Get("name3")) // Output: one
fmt.Println("name: ", store.Get("name")) // Output: one

}

Expand All @@ -41,13 +48,16 @@ func generateAndPutData(current int, lsm *store.Store, wg *sync.WaitGroup) {

var key string

for i := int64(0); i < 10; i++ {
for i := int64(0); i < 100; i++ {
randomNumber := rand.Int63n(1<<63 - 1)
key = strconv.FormatInt(randomNumber, 10)
value := strconv.FormatInt(randomNumber, 10)

lsm.Put(key, value)

// fmt.Println("get:", lsm.Get(key))
}

fmt.Printf("Goroutine %d completed\n", current)
}

Expand All @@ -60,6 +70,8 @@ func newUser(store *store.Store, i int) {

for j := 0; j < numGoroutines; j++ {
go generateAndPutData(j, store, &wg)

time.Sleep(3 * time.Second)
}

wg.Wait()
Expand Down

0 comments on commit 8a95b2d

Please sign in to comment.