Skip to content

Commit

Permalink
Implement the ringlist as a separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
mgnsk committed Feb 12, 2022
1 parent b423d0e commit a528e23
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 140 deletions.
74 changes: 62 additions & 12 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/mgnsk/evcache/v2/ringlist"
)

// SyncInterval is the interval for background loop
Expand Down Expand Up @@ -51,7 +53,7 @@ type Cache struct {
onceLoop sync.Once
wg sync.WaitGroup
lfuEnabled bool
list *ringList
list *ringlist.List
afterEvict EvictionCallback
mode EvictionMode
stopLoop chan struct{}
Expand Down Expand Up @@ -99,10 +101,10 @@ func (build Builder) WithEvictionMode(mode EvictionMode) Builder {
//
// If cache exceeds the limit, the eldest record is evicted or
// if LFU is enabled, the least frequently used record is evicted.
func (build Builder) WithCapacity(capacity uint32) Builder {
func (build Builder) WithCapacity(cap uint32) Builder {
return func(c *Cache) {
build(c)
c.list = newRingList(capacity)
c.list = ringlist.New(int(cap))
}
}

Expand All @@ -126,7 +128,7 @@ func (build Builder) Build() *Cache {
}
build(c)
if c.list == nil {
c.list = newRingList(0)
c.list = ringlist.New(0)
}
if c.lfuEnabled {
c.runLoopOnce()
Expand Down Expand Up @@ -176,6 +178,9 @@ func (c *Cache) Range(f func(key, value interface{}) bool) {

// Len returns the number of keys in the cache.
func (c *Cache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()

return c.list.Len()
}

Expand All @@ -189,6 +194,7 @@ func (c *Cache) Len() int {
func (c *Cache) Do(f func(key, value interface{}) bool) {
c.mu.Lock()
defer c.mu.Unlock()

c.list.Do(func(key interface{}) bool {
r, ok := c.records.Load(key)
if !ok {
Expand All @@ -204,12 +210,14 @@ func (c *Cache) Do(f func(key, value interface{}) bool) {
func (c *Cache) Pop() (key, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
if key := c.list.Pop(); key != nil {

if front := c.list.Front(); front != nil {
key := c.list.Remove(front)
if r, ok := c.evictLocked(key, nil); ok {
return key, r.value
}
panic("evcache: invalid cache state")
}

return nil, nil
}

Expand All @@ -218,8 +226,9 @@ func (c *Cache) Pop() (key, value interface{}) {
func (c *Cache) Flush() {
c.mu.Lock()
defer c.mu.Unlock()

c.records.Range(func(key, _ interface{}) bool {
if r, ok := c.evictLocked(key, nil); ok {
if r, ok := c.evictLocked(key, nil); ok && r.ring.Value != nil {
c.list.Remove(r.ring)
}
return true
Expand All @@ -231,10 +240,15 @@ func (c *Cache) Flush() {
func (c *Cache) Evict(key interface{}) (value interface{}, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

if r, ok := c.evictLocked(key, nil); ok {
c.list.Remove(r.ring)
// Ring value can be nil on an overflowed list element.
if r.ring.Value != nil {
c.list.Remove(r.ring)
}
return r.value, true
}

return nil, false
}

Expand All @@ -247,10 +261,14 @@ func (c *Cache) Evict(key interface{}) (value interface{}, ok bool) {
func (c *Cache) CompareAndEvict(key, value interface{}) bool {
c.mu.Lock()
defer c.mu.Unlock()

if r, ok := c.evictLocked(key, &value); ok {
c.list.Remove(r.ring)
if r.ring.Value != nil {
c.list.Remove(r.ring)
}
return true
}

return false
}

Expand All @@ -269,6 +287,7 @@ func (c *Cache) Set(key, value interface{}, ttl time.Duration) {
c.Evict(front)
}
}()

doEvict := func(r *record) {
switch r.State() {
case active:
Expand All @@ -284,23 +303,31 @@ func (c *Cache) Set(key, value interface{}, ttl time.Duration) {
defer r.mu.Unlock()
}
}

new := c.pool.Get().(*record)

new.mu.Lock()
defer new.mu.Unlock()

for {
old, loaded := c.records.LoadOrStore(key, new)
if !loaded {
c.mu.Lock()
defer c.mu.Unlock()

new.ring.Value = key
front = c.list.PushBack(new.ring)

new.init(value, ttl)
new.setState(active)

if ttl > 0 {
c.runLoopOnce()
}

return
}

doEvict(old.(*record))
}
}
Expand All @@ -325,6 +352,7 @@ func (c *Cache) Fetch(key interface{}, ttl time.Duration, f FetchCallback) (valu
didLoad bool
front interface{}
)

new := c.pool.Get().(*record)
defer func() {
if didLoad {
Expand All @@ -333,29 +361,38 @@ func (c *Cache) Fetch(key interface{}, ttl time.Duration, f FetchCallback) (valu
c.Evict(front)
}
}()

new.mu.Lock()
defer new.mu.Unlock()

loadOrStore := func() (old *record, loaded bool) {
if old, loaded := c.records.LoadOrStore(key, new); loaded {
return old.(*record), true
}

value, err = f()
if err != nil {
c.records.Delete(key)
return nil, false
}

c.mu.Lock()
defer c.mu.Unlock()

new.ring.Value = key
front = c.list.PushBack(new.ring)

new.readerWg.Add(1)
new.init(value, ttl)
new.setState(active)

if ttl > 0 {
c.runLoopOnce()
}

return nil, false
}

for {
r, loaded := loadOrStore()
if err != nil {
Expand All @@ -382,9 +419,10 @@ func (c *Cache) Fetch(key interface{}, ttl time.Duration, f FetchCallback) (valu
func (c *Cache) Close() error {
c.runLoopOnce()
c.stopLoop <- struct{}{}
close(c.stopLoop)

c.Flush()
c.wg.Wait()

return nil
}

Expand All @@ -393,13 +431,16 @@ func (c *Cache) evictLocked(key interface{}, target *interface{}) (r *record, ok
if !ok {
return nil, false
}

r = rec.(*record)
if r.State() != active || target != nil && !reflect.DeepEqual(r.value, *target) {
return nil, false
}

// Safe to lock r.mu on an active record while holding c.mu.
r.mu.Lock()
defer r.mu.Unlock()

switch c.mode {
case ModeNonBlocking:
// In non-blocking mode, new writers see an empty map immediately.
Expand All @@ -409,7 +450,9 @@ func (c *Cache) evictLocked(key interface{}, target *interface{}) (r *record, ok
// Add before setState to allow waiters to use an unlocked record.
r.evictionWg.Add(1)
}

r.setState(evicting)

if c.mode == ModeBlocking || c.afterEvict != nil {
c.wg.Add(1)
go func() {
Expand All @@ -425,6 +468,7 @@ func (c *Cache) evictLocked(key interface{}, target *interface{}) (r *record, ok
}
}()
}

return r, true
}

Expand All @@ -448,21 +492,27 @@ func (c *Cache) runLoopOnce() {
func (c *Cache) processRecords(now int64) {
c.mu.Lock()
defer c.mu.Unlock()

c.records.Range(func(key, value interface{}) bool {
r := value.(*record)
if r.State() != active {
return true
}

if deadline := r.Deadline(); deadline > 0 && deadline < now {
c.list.Remove(r.ring)
if r.ring.Value != nil {
c.list.Remove(r.ring)
}
c.evictLocked(key, nil)
return true
}

if c.lfuEnabled {
if hits := atomic.SwapUint32(&r.hits, 0); hits > 0 {
c.list.MoveForward(r.ring, hits)
c.list.Move(r.ring, int(hits))
}
}

return true
})
}
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
Package evcache provides an in-memory ordered cache with optional eventually consistent LFU ordering.
Package evcache provides a sync.Map wrapper with specific concurrency guarantees.
*/
package evcache
Loading

0 comments on commit a528e23

Please sign in to comment.