Skip to content

Commit

Permalink
Misc fixes and improvements.
Browse files Browse the repository at this point in the history
1. fix redis latency metrics.
2. add more error labels.
3. fetch from DB when unmarshal failed.
  • Loading branch information
Stumble committed Feb 10, 2023
1 parent b67fa25 commit 9f21756
Showing 1 changed file with 82 additions and 60 deletions.
142 changes: 82 additions & 60 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,26 @@ type metricSet struct {
Error *prometheus.CounterVec
}

type metricHitLabel string
type metricErrLabel string

var (
hitLabels = []string{"hit"}
hitLabelMemory = "mem"
hitLabelRedis = "redis"
hitLabelDB = "db"
hitLabels = []string{"hit"}
// metrics hit labels
hitLabelMemory metricHitLabel = "mem"
hitLabelRedis metricHitLabel = "redis"
hitLabelDB metricHitLabel = "db"
// The unit is ms.
latencyBucket = []float64{
1, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096}
// errors
errLabels = []string{"when"}
errLabelSetRedis = "set_redis"
errLabelSetMemCache = "set_mem_cache"
errLabelInvalidate = "invalidate_error"

errLabels = []string{"when"}
// metrics error labels
errLabelSetRedis metricErrLabel = "set_redis"
errLabelSetMemCache metricErrLabel = "set_mem_cache"
errLabelInvalidate metricErrLabel = "invalidate_error"
errLabelMemoryUnmarshalFailed metricErrLabel = "mem_unmarshal_failed"
errLabelRedisUnmarshalFailed metricErrLabel = "redis_unmarshal_failed"
)

func newMetricSet(appName string) *metricSet {
Expand Down Expand Up @@ -294,15 +301,22 @@ func (c *DCache) Close() {
}
}

func (c *DCache) recordLatency(label string, startedAt time.Time) func() {
func (c *DCache) makeHitRecorder(label metricHitLabel, startedAt time.Time) func() {
return func() {
if c.stats != nil {
c.stats.Latency.WithLabelValues(label).Observe(
c.stats.Hit.WithLabelValues(string(label)).Inc()
c.stats.Latency.WithLabelValues(string(label)).Observe(
float64(getNow().UnixMilli() - startedAt.UnixMilli()))
}
}
}

func (c *DCache) recordError(label metricErrLabel) {
if c.stats != nil {
c.stats.Error.WithLabelValues(string(label)).Inc()
}
}

// readValue read through using f and cache to @p key if no error and not @p noStore.
// return the marshaled bytes if no error.
func (c *DCache) readValue(
Expand All @@ -319,11 +333,7 @@ func (c *DCache) readValue(
// NOTE: This is mostly useful when user call cache layer with noCache flag, because
// when cache is used, call to this function is protected by a distributed lock.
rv, err, _ := c.group.Do(key, func() (any, error) {
defer c.recordLatency(hitLabelDB, getNow())()
if c.stats != nil {
c.stats.Hit.WithLabelValues(hitLabelDB).Inc()
}
// c.stats.
defer c.makeHitRecorder(hitLabelDB, getNow())()
dbres, ttl, err := f()
return &valueTtl{
Val: dbres,
Expand All @@ -343,10 +353,8 @@ func (c *DCache) readValue(
// successfully retrieved.
err := c.setKey(ctx, key, valueBytes, valTtl.Ttl, false)
if err != nil {
log.Err(err).Msgf("Failed to set Redis cache for %s", key)
if c.stats != nil {
c.stats.Error.WithLabelValues(errLabelSetRedis).Inc()
}
log.Ctx(ctx).Err(err).Msgf("Failed to set Redis cache for %s", key)
c.recordError(errLabelSetRedis)
}
}
return valueBytes, nil
Expand All @@ -366,12 +374,24 @@ func (c *DCache) setKey(ctx context.Context, key string, valueBytes []byte, ttl
if err != nil {
return err
}
c.updateMemoryCache(key, ve, isExplicitSet)
c.updateMemoryCache(ctx, key, ve, isExplicitSet)
return nil
}

// tryReadFromRedis try to read value from Redis.
func (c *DCache) tryReadFromRedis(ctx context.Context, key string) (*ValueBytesExpiredAt, error) {
veBytes, err := c.conn.Get(ctx, storeKey(key)).Bytes()
if err != nil {
return nil, err
}
ve := &ValueBytesExpiredAt{}
err = msgpack.Unmarshal(veBytes, ve)
return ve, err
}

// isExplicitSet = true, calling from Set. Otherwise, value is backfilled from Redis.
func (c *DCache) updateMemoryCache(key string, ve *ValueBytesExpiredAt, isExplicitSet bool) {
func (c *DCache) updateMemoryCache(
ctx context.Context, key string, ve *ValueBytesExpiredAt, isExplicitSet bool) {
// update memory cache.
// sub-second TTL will be ignored for memory cache.
ttl := time.UnixMilli(ve.ExpiredAt).Unix() - getNow().Unix()
Expand All @@ -394,10 +414,8 @@ func (c *DCache) updateMemoryCache(key string, ve *ValueBytesExpiredAt, isExplic
// ignore in memory cache error
err = c.inMemCache.Set([]byte(storeKey(key)), ve.ValueBytes, int(ttl))
if err != nil {
log.Err(err).Msgf("Failed to set memory cache for key %s", storeKey(key))
if c.stats != nil {
c.stats.Error.WithLabelValues(errLabelSetMemCache).Inc()
}
log.Ctx(ctx).Err(err).Msgf("Failed to set memory cache for key %s", storeKey(key))
c.recordError(errLabelSetMemCache)
}
}
}
Expand Down Expand Up @@ -478,11 +496,8 @@ func (c *DCache) listenKeyInvalidate() {
l := strings.Split(payload, delimiter)
if len(l) < 2 {
// Invalid payload
log.Warn().Msgf("Received invalidate payload %s", payload)

if c.stats != nil {
c.stats.Error.WithLabelValues(errLabelInvalidate).Inc()
}
log.Error().Msgf("Received invalidate payload %s", payload)
c.recordError(errLabelInvalidate)
return
}
if l[0] == c.id {
Expand Down Expand Up @@ -547,6 +562,7 @@ func (c *DCache) Get(ctx context.Context, key string, target any, expire time.Du
//
// @p noStore: The response value will not be saved into the cache.
func (c *DCache) GetWithTtl(ctx context.Context, key string, target any, read ReadWithTtlFunc, noCache bool, noStore bool) (err error) {
startedAt := getNow()
if c.tracer != nil {
ctx = c.tracer.TraceStart(ctx,
"GetWithTtl",
Expand All @@ -567,55 +583,57 @@ func (c *DCache) GetWithTtl(ctx context.Context, key string, target any, read Re
err = unmarshal(targetBytes, target)
return
}
// lookup in memory cache.
// lookup in memory cache, return only when unmarshal succeeded.
if c.inMemCache != nil {
var targetBytes []byte
targetBytes, err = c.inMemCache.Get([]byte(storeKey(key)))
if err == nil {
if c.stats != nil {
c.stats.Hit.WithLabelValues(hitLabelMemory).Inc()
}
if c.tracer != nil {
c.tracer.TraceHitFrom(ctx, hitMem)
}
err = unmarshal(targetBytes, target)
return
if err == nil {
c.makeHitRecorder(hitLabelMemory, startedAt)()
if c.tracer != nil {
c.tracer.TraceHitFrom(ctx, hitMem)
}
return
}
} else {
c.recordError(errLabelMemoryUnmarshalFailed)
}
}

var anyTypedBytes any
var targetHasUnmarshalled bool
anyTypedBytes, err, _ = c.group.Do(lockKey(key), func() (any, error) {
// distributed single flight to query db for value.
startedAt := getNow()
for {
ve := &ValueBytesExpiredAt{}
veBytes, e := c.conn.Get(ctx, storeKey(key)).Bytes()
if e == nil {
e = msgpack.Unmarshal(veBytes, ve)
}
ve, e := c.tryReadFromRedis(ctx, key)
if e == nil {
// Value was retrieved from Redis, backfill memory cache and return.
if c.stats != nil {
c.stats.Hit.WithLabelValues(hitLabelRedis).Inc()
}
if c.tracer != nil {
c.tracer.TraceHitFrom(ctx, hitRedis)
}
c.recordLatency(hitLabelRedis, startedAt)
if !noStore {
c.updateMemoryCache(key, ve, false)
// NOTE: must check if bytes stored in Redis can be correctly
// unmarshalled into target, because it may not when data structure changes.
// When that happens, we will still fetch from DB.
e = unmarshal(ve.ValueBytes, target)
if e == nil {
targetHasUnmarshalled = true
// Value was retrieved from Redis, backfill memory cache and return.
defer c.makeHitRecorder(hitLabelRedis, startedAt)()
if c.tracer != nil {
c.tracer.TraceHitFrom(ctx, hitRedis)
}
if !noStore {
c.updateMemoryCache(ctx, key, ve, false)
}
return ve.ValueBytes, nil
} else {
c.recordError(errLabelRedisUnmarshalFailed)
}
return ve.ValueBytes, nil
}
// If failed to retrieve value from Redis, try to get a lock and query DB.
// To avoid spamming Redis with SetNX requests, only one request should try to get
// the lock per-pod.
// If timeout or not cache-able error, another thread will obtain lock after sleep.
updated, err := c.conn.SetNX(ctx, lockKey(key), "", c.readInterval).Result()
if err != nil {
if c.stats != nil {
c.stats.Error.WithLabelValues(errLabelSetRedis).Inc()
}
c.recordError(errLabelSetRedis)
}
if updated {
return c.readValue(ctx, key, read, noStore)
Expand All @@ -627,14 +645,18 @@ func (c *DCache) GetWithTtl(ctx context.Context, key string, target any, read Re
// timeout, all of them will timeout.
return nil, ErrTimeout
case <-time.After(lockSleep):
// TODO(yumin): we can further optimize this part by
// check TTL of lockKey(key), and sleep wisely.
continue
}
}
})
if err != nil {
return
}
err = unmarshal(anyTypedBytes.([]byte), target)
if !targetHasUnmarshalled {
err = unmarshal(anyTypedBytes.([]byte), target)
}
return
}

Expand Down

0 comments on commit 9f21756

Please sign in to comment.