From 3b66674b057f074bad867d1371ee62239ad586b9 Mon Sep 17 00:00:00 2001 From: baxiry Date: Fri, 15 Nov 2024 18:18:00 +0300 Subject: [PATCH] initial new storage engine --- engine/store.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++--- main.go | 5 +--- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/engine/store.go b/engine/store.go index 3c4ed41..5c30852 100644 --- a/engine/store.go +++ b/engine/store.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "log" + "time" "github.com/tidwall/gjson" "go.etcd.io/bbolt" @@ -17,7 +18,6 @@ type Store struct { } func NewDB(path string) *Store { - // Open a bbolt database kv, err := bbolt.Open(path, 0600, nil) if err != nil { @@ -61,6 +61,7 @@ func NewDB(path string) *Store { return db } +// getData fitch for data func (s *Store) getData(query gjson.Result) (data []string, err error) { coll := query.Get("collection").Str if coll == "" { @@ -70,10 +71,10 @@ func (s *Store) getData(query gjson.Result) (data []string, err error) { skip := query.Get("skip").Int() limit := query.Get("limit").Int() if limit == 0 { - limit = 1000 // what is default setting ? + // what default setting should be here ? + limit = 1000 } - // bbolt err = s.db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket([]byte(coll)) @@ -127,6 +128,68 @@ func (s *Store) Put(coll, val string) (err error) { return err } +// to work with writer +var ( + oksChan = make(chan []bool, 1) + done = make(chan bool, 1) + objChan = make(chan Object, 1) +) + +type Object struct { + id int + bucket string + key, val []byte +} + +// Function to gather data and send after a duration +func writer(db *bbolt.DB, input chan Object) { + dataBatch := make(map[string][]Object) + ticker := time.NewTicker(100 * time.Millisecond) + var obj Object + oks := make([]bool, 0) + for { + select { + case obj = <-input: + dataBatch[obj.bucket] = append(dataBatch[obj.bucket], obj) + oks = append(oks, true) + + case <-ticker.C: + if len(dataBatch) == 0 { + fmt.Println("nothing to write") + continue + } + err := db.Batch(func(tx *bbolt.Tx) error { + for bucketName, keyValues := range dataBatch { + bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return err + } + for _, v := range keyValues { + if err := bucket.Put(v.key, v.val); err != nil { + return err + } + } + } + return nil + }) + if err != nil { + fmt.Println("error at db.batch", err) + } + oksChan <- oks + dataBatch = map[string][]Object{} + oks = []bool{} + case <-done: + break + } + } +} + +func toByte(number int) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(number)) + return buf +} + func (s *Store) Close() { s.db.Sync() s.db.Close() diff --git a/main.go b/main.go index e76a49c..979c4a9 100644 --- a/main.go +++ b/main.go @@ -17,10 +17,7 @@ import ( //go:embed static var content embed.FS -// TODO: Close program gracefully. - func main() { - db := engine.NewDB("test.db") if db == nil { log.Fatal("no db") @@ -120,6 +117,6 @@ func shell(w http.ResponseWriter, r *http.Request) { // redirect to shell page temporary func index(w http.ResponseWriter, r *http.Request) { - // TODO create index page + // TOD create index page http.Redirect(w, r, "http://localhost:1111/shell", http.StatusSeeOther) }