Skip to content

Commit

Permalink
feat(storers): Add get/set multilevel support and rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Jordan committed Apr 29, 2024
1 parent 43bab16 commit 92c02be
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 107 deletions.
13 changes: 7 additions & 6 deletions pkg/api/souin.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ func (s *SouinAPI) listKeys(search string) []string {
}

var storageToInfiniteTTLMap = map[string]time.Duration{
"BADGER": 365 * 24 * time.Hour,
"ETCD": 365 * 24 * time.Hour,
"NUTS": 0,
"OLRIC": 365 * 24 * time.Hour,
"OTTER": 365 * 24 * time.Hour,
"REDIS": 0,
"BADGER": 365 * 24 * time.Hour,
"ETCD": 365 * 24 * time.Hour,
"NUTS": 0,
"NUTS_MEMCACHED": 0,
"OLRIC": 365 * 24 * time.Hour,
"OTTER": 365 * 24 * time.Hour,
"REDIS": 0,
}

func (s *SouinAPI) purgeMapping() {
Expand Down
257 changes: 156 additions & 101 deletions pkg/storage/nutsMemcachedProvider.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package storage

import (
"bufio"
"bytes"
"encoding/json"
"errors"
Expand All @@ -10,17 +9,31 @@ import (
"strings"
"time"

"github.com/darkweak/souin/configurationtypes"
t "github.com/darkweak/souin/configurationtypes"
"github.com/darkweak/souin/pkg/rfc"
"github.com/darkweak/souin/pkg/storage/types"
"github.com/dgraph-io/ristretto"
"github.com/imdario/mergo"
"github.com/nutsdb/nutsdb"
lz4 "github.com/pierrec/lz4/v4"
"go.uber.org/zap"
)

var nutsMemcachedInstanceMap = map[string]*nutsdb.DB{}

// Why NutsMemcached?
// ---
// The NutsMemcached storage backend is composed of two different storage backends:
// 1. NutsDB: for the cache key index (i.e., IDX_ keys).
// 2. Memcached: for the cache content.
// There are two storage backends because:
// 1. is a "non forgetting" storage backend (NutsDB, for the index). Keys will be kept until their TTL expires.
// → if it was handled by a storage backend that can preemptively evict, you might evict IDX_ keys, which you wouldn't want.
// You need to make sure index and content stays in sync.
// 2. is "forgetting" storage backend (Memcached, for the data). Cache data will be pre-emptively evicted (i.e., before TTL is reached).
// → it makes it possible to put limits on total RAM/disk usage.

// NutsMemcached provider type
type NutsMemcached struct {
*nutsdb.DB
Expand All @@ -30,51 +43,52 @@ type NutsMemcached struct {
ristrettoCache *ristretto.Cache
}

// const (
// bucket = "souin-bucket"
// nutsLimit = 1 << 16
// )

// func sanitizeProperties(m map[string]interface{}) map[string]interface{} {
// iotas := []string{"RWMode", "StartFileLoadingMode"}
// for _, i := range iotas {
// if v := m[i]; v != nil {
// currentMode := nutsdb.FileIO
// switch v {
// case 1:
// currentMode = nutsdb.MMap
// }
// m[i] = currentMode
// }
// }

// for _, i := range []string{"SegmentSize", "NodeNum", "MaxFdNumsInCache"} {
// if v := m[i]; v != nil {
// m[i], _ = v.(int64)
// }
// }

// if v := m["EntryIdxMode"]; v != nil {
// m["EntryIdxMode"] = nutsdb.HintKeyValAndRAMIdxMode
// switch v {
// case 1:
// m["EntryIdxMode"] = nutsdb.HintKeyAndRAMIdxMode
// }
// }

// if v := m["SyncEnable"]; v != nil {
// m["SyncEnable"] = true
// if b, ok := v.(bool); ok {
// m["SyncEnable"] = b
// } else if s, ok := v.(string); ok {
// m["SyncEnable"], _ = strconv.ParseBool(s)
// }
// }

// return m
// }

// NutsConnectionFactory function create new Nuts instance
// Below is already defined in the original Nuts provider.
/* const (
bucket = "souin-bucket"
nutsLimit = 1 << 16
)
func sanitizeProperties(m map[string]interface{}) map[string]interface{} {
iotas := []string{"RWMode", "StartFileLoadingMode"}
for _, i := range iotas {
if v := m[i]; v != nil {
currentMode := nutsdb.FileIO
switch v {
case 1:
currentMode = nutsdb.MMap
}
m[i] = currentMode
}
}
for _, i := range []string{"SegmentSize", "NodeNum", "MaxFdNumsInCache"} {
if v := m[i]; v != nil {
m[i], _ = v.(int64)
}
}
if v := m["EntryIdxMode"]; v != nil {
m["EntryIdxMode"] = nutsdb.HintKeyValAndRAMIdxMode
switch v {
case 1:
m["EntryIdxMode"] = nutsdb.HintKeyAndRAMIdxMode
}
}
if v := m["SyncEnable"]; v != nil {
m["SyncEnable"] = true
if b, ok := v.(bool); ok {
m["SyncEnable"] = b
} else if s, ok := v.(string); ok {
m["SyncEnable"], _ = strconv.ParseBool(s)
}
}
return m
} */

// NutsConnectionFactory function create new NutsMemcached instance
func NutsMemcachedConnectionFactory(c t.AbstractConfigurationInterface) (types.Storer, error) {
dc := c.GetDefaultCache()
nutsConfiguration := dc.GetNutsMemcached()
Expand All @@ -88,6 +102,10 @@ func NutsMemcachedConnectionFactory(c t.AbstractConfigurationInterface) (types.S
// Use: github.com/nutsdb/nutsdb v0.14.0
//nutsOptions.EntryIdxMode = nutsdb.HintBPTSparseIdxMode

// EntryIdxMode will affect the size of the key index in memory.
// → since this storage backend has no limit on memory usage, it has to be chosen depending on
// the max number of cache keys that will be kept in flight.

if nutsConfiguration.Configuration != nil {
var parsedNuts nutsdb.Options
nutsConfiguration.Configuration = sanitizeProperties(nutsConfiguration.Configuration.(map[string]interface{}))
Expand Down Expand Up @@ -122,6 +140,7 @@ func NutsMemcachedConnectionFactory(c t.AbstractConfigurationInterface) (types.S
return nil, e
}

// Ristretto config
var numCounters int64 = 1e7 // number of keys to track frequency of (10M).
var maxCost int64 = 1 << 30 // maximum cost of cache (1GB).
if nutsConfiguration.Configuration != nil {
Expand All @@ -142,7 +161,8 @@ func NutsMemcachedConnectionFactory(c t.AbstractConfigurationInterface) (types.S
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
c.GetLogger().Sugar().Error("Impossible to make new Ristretto cache.", err)
return nil, e
}

instance := &NutsMemcached{
Expand All @@ -167,10 +187,13 @@ func (provider *NutsMemcached) ListKeys() []string {
keys := []string{}

e := provider.DB.View(func(tx *nutsdb.Tx) error {
e, _ := tx.GetAll(bucket)
e, _ := tx.PrefixScan(bucket, []byte(MappingKeyPrefix), 0, 100)
for _, k := range e {
if !strings.Contains(string(k.Key), surrogatePrefix) {
keys = append(keys, string(k.Key))
mapping, err := decodeMapping(k.Value)
if err == nil {
for _, v := range mapping.Mapping {
keys = append(keys, v.RealKey)
}
}
}
return nil
Expand Down Expand Up @@ -217,8 +240,8 @@ func (provider *NutsMemcached) Get(key string) (item []byte) {
}

// Prefix method returns the populated response if exists, empty response then
func (provider *NutsMemcached) Prefix(key string, req *http.Request, validator *rfc.Revalidator) *http.Response {
var result *http.Response
func (provider *NutsMemcached) Prefix(key string) []string {
result := []string{}

_ = provider.DB.View(func(tx *nutsdb.Tx) error {
prefix := []byte(key)
Expand All @@ -227,32 +250,7 @@ func (provider *NutsMemcached) Prefix(key string, req *http.Request, validator *
return err
} else {
for _, entry := range entries {
if varyVoter(key, req, string(entry.Key)) {
// TODO: improve this
// Store only response header in nuts and avoid query to memcached on each vary
// E.g, rfc.ValidateETag on NutsDB header value, retrieve response body later from memcached.

// Reminder: the key must be at most 250 bytes in length
//fmt.Println("memcached PREFIX", key, "GET", string(entry.Key))
i, e := provider.getFromMemcached(string(entry.Value))
if e == nil {
res, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(i)), req)
if err == nil {
rfc.ValidateETag(res, validator)
if validator.Matched {
provider.logger.Sugar().Debugf("The stored key %s matched the current iteration key ETag %+v", string(entry.Key), validator)
result = res
return nil
}

provider.logger.Sugar().Debugf("The stored key %s didn't match the current iteration key ETag %+v", string(entry.Key), validator)
} else {
provider.logger.Sugar().Errorf("An error occured while reading response for the key %s: %v", string(entry.Key), err)
}
} else {
provider.logger.Sugar().Errorf("An error occured while reading memcached for the key %s: %v", string(entry.Key), err)
}
}
result = append(result, string(entry.Key))
}
}
return nil
Expand All @@ -261,45 +259,102 @@ func (provider *NutsMemcached) Prefix(key string, req *http.Request, validator *
return result
}

// Set method will store the response in Nuts provider
func (provider *NutsMemcached) Set(key string, value []byte, url t.URL, ttl time.Duration) error {
if ttl == 0 {
ttl = url.TTL.Duration
}
// Only for memcached (to overcome 250 bytes key limit)
//memcachedKey := uuid.New().String()
memcachedKey := key
// GetMultiLevel tries to load the key and check if one of linked keys is a fresh/stale candidate.
func (provider *NutsMemcached) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
_ = provider.DB.View(func(tx *nutsdb.Tx) error {
i, e := tx.Get(bucket, []byte(MappingKeyPrefix+key))
if e != nil && !errors.Is(e, nutsdb.ErrKeyNotFound) {
return e
}

// set to nuts (normal TTL)
{
err := provider.DB.Update(func(tx *nutsdb.Tx) error {
var val []byte
if i != nil {
val = i.Value
}
fresh, stale, e = mappingElection(provider, val, req, validator, provider.logger)

// key: cache-key, value: memcached-key
return tx.Put(bucket, []byte(key), []byte(memcachedKey), uint32(ttl.Seconds()))
})
return e
})

return
}

// SetMultiLevel tries to store the key with the given value and update the mapping key to store metadata.
func (provider *NutsMemcached) SetMultiLevel(baseKey, variedKey string, value []byte, variedHeaders http.Header, etag string, duration time.Duration, realKey string) error {
now := time.Now()

compressed := new(bytes.Buffer)
if _, err := lz4.NewWriter(compressed).ReadFrom(bytes.NewReader(value)); err != nil {
provider.logger.Sugar().Errorf("Impossible to compress the key %s into Nuts, %v", variedKey, err)
return err
}
{
// matchedURL is only use when ttl == 0
ttl := duration + provider.stale
url := t.URL{
TTL: configurationtypes.Duration{Duration: ttl},
}
err := provider.Set(variedKey, compressed.Bytes(), url, ttl)
if err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into Nuts, %v", err)
return err
}
}

// set to nuts (stale TTL)
staleTtl := int32((provider.stale + ttl).Seconds())
err := provider.DB.Update(func(tx *nutsdb.Tx) error {
mappingKey := MappingKeyPrefix + baseKey
item, e := tx.Get(bucket, []byte(mappingKey))
if e != nil && !errors.Is(e, nutsdb.ErrKeyNotFound) {
provider.logger.Sugar().Errorf("Impossible to get the base key %s in Nuts, %v", baseKey, e)
return e
}

var val []byte
if item != nil {
val = item.Value
}

val, e = mappingUpdater(variedKey, val, provider.logger, now, now.Add(duration), now.Add(duration+provider.stale), variedHeaders, etag, realKey)
if e != nil {
return e
}

provider.logger.Sugar().Debugf("Store the new mapping for the key %s in Nuts", variedKey)

return tx.Put(bucket, []byte(mappingKey), val, nutsdb.Persistent)
})

if err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into Nuts, %v", err)
}

return err
}

// Set method will store the response in Nuts provider
func (provider *NutsMemcached) Set(key string, value []byte, url t.URL, duration time.Duration) error {
if duration == 0 {
duration = url.TTL.Duration
}
// Only for memcached (to overcome 250 bytes key limit)
//memcachedKey := uuid.New().String()
// Disabled for ristretto to improve performances
memcachedKey := key

// set to nuts
{
err := provider.DB.Update(func(tx *nutsdb.Tx) error {
// key: "STALE_" + cache-key, value: memcached-key
return tx.Put(bucket, []byte(StalePrefix+key), []byte(memcachedKey), uint32(staleTtl))
// key: cache-key, value: memcached-key
return tx.Put(bucket, []byte(key), []byte(memcachedKey), uint32(duration.Seconds()))
})

if err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into Nuts, %v", err)
return err
}
}

// set to memcached with stale TTL
_ = provider.setToMemcached(memcachedKey, value, staleTtl)

// set to memcached
_ = provider.setToMemcached(memcachedKey, value, int32(duration.Seconds()))
return nil
}

Expand Down Expand Up @@ -373,7 +428,7 @@ func (provider *NutsMemcached) setToMemcached(memcachedKey string, value []byte,
// }
ok := provider.ristrettoCache.Set(memcachedKey, value, int64(len(value)))
if !ok {
provider.logger.Sugar().Debugf("Value not set to cache, key=%v", memcachedKey)
provider.logger.Sugar().Debugf("Value not set to ristretto cache, key=%v", memcachedKey)
}
return
}
Expand Down
Loading

0 comments on commit 92c02be

Please sign in to comment.