Skip to content

Commit

Permalink
initial new storage engine
Browse files Browse the repository at this point in the history
  • Loading branch information
baxiry committed Nov 15, 2024
1 parent 0a5c619 commit 3b66674
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 7 deletions.
69 changes: 66 additions & 3 deletions engine/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"fmt"
"log"
"time"

"github.com/tidwall/gjson"
"go.etcd.io/bbolt"
Expand All @@ -17,7 +18,6 @@ type Store struct {
}

func NewDB(path string) *Store {

// Open a bbolt database
kv, err := bbolt.Open(path, 0600, nil)
if err != nil {
Expand Down Expand Up @@ -61,6 +61,7 @@ func NewDB(path string) *Store {
return db
}

// getData fitch for data
func (s *Store) getData(query gjson.Result) (data []string, err error) {
coll := query.Get("collection").Str
if coll == "" {
Expand All @@ -70,10 +71,10 @@ func (s *Store) getData(query gjson.Result) (data []string, err error) {
skip := query.Get("skip").Int()
limit := query.Get("limit").Int()
if limit == 0 {
limit = 1000 // what is default setting ?
// what default setting should be here ?
limit = 1000
}

// bbolt
err = s.db.View(func(tx *bbolt.Tx) error {

bucket := tx.Bucket([]byte(coll))
Expand Down Expand Up @@ -127,6 +128,68 @@ func (s *Store) Put(coll, val string) (err error) {
return err
}

// to work with writer
var (
oksChan = make(chan []bool, 1)
done = make(chan bool, 1)
objChan = make(chan Object, 1)
)

type Object struct {
id int
bucket string
key, val []byte
}

// Function to gather data and send after a duration
func writer(db *bbolt.DB, input chan Object) {
dataBatch := make(map[string][]Object)
ticker := time.NewTicker(100 * time.Millisecond)
var obj Object
oks := make([]bool, 0)
for {
select {
case obj = <-input:
dataBatch[obj.bucket] = append(dataBatch[obj.bucket], obj)
oks = append(oks, true)

case <-ticker.C:
if len(dataBatch) == 0 {
fmt.Println("nothing to write")
continue
}
err := db.Batch(func(tx *bbolt.Tx) error {
for bucketName, keyValues := range dataBatch {
bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
for _, v := range keyValues {
if err := bucket.Put(v.key, v.val); err != nil {
return err
}
}
}
return nil
})
if err != nil {
fmt.Println("error at db.batch", err)
}
oksChan <- oks
dataBatch = map[string][]Object{}
oks = []bool{}
case <-done:
break
}
}
}

func toByte(number int) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(number))
return buf
}

func (s *Store) Close() {
s.db.Sync()
s.db.Close()
Expand Down
5 changes: 1 addition & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import (
//go:embed static
var content embed.FS

// TODO: Close program gracefully.

func main() {

db := engine.NewDB("test.db")
if db == nil {
log.Fatal("no db")
Expand Down Expand Up @@ -120,6 +117,6 @@ func shell(w http.ResponseWriter, r *http.Request) {

// redirect to shell page temporary
func index(w http.ResponseWriter, r *http.Request) {
// TODO create index page
// TOD create index page
http.Redirect(w, r, "http://localhost:1111/shell", http.StatusSeeOther)
}

0 comments on commit 3b66674

Please sign in to comment.