Skip to content

Commit

Permalink
server: fix cache dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
aszs committed Jun 27, 2023
1 parent 2f91ec0 commit ebf67f3
Showing 1 changed file with 48 additions and 51 deletions.
99 changes: 48 additions & 51 deletions unfurl/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import (
Dict,
List,
NamedTuple,
Optional,
Tuple,
Any,
Expand Down Expand Up @@ -285,7 +286,15 @@ def _tag_changed(self) -> bool:

CacheItemDependencies = Dict[str, CacheItemDependency]
# cache value, last_commit (on the file_path), latest_commit (seen in branch), map of deps this value depends on
CacheValueType = Tuple[Any, str, str, CacheItemDependencies]


class CacheValue(NamedTuple):
value: Any
last_commit: str
latest_commit: str
deps: CacheItemDependencies


CacheWorkCallable = Callable[
["CacheEntry", Optional[str]], Tuple[Optional[Any], Any, bool]
]
Expand Down Expand Up @@ -322,7 +331,6 @@ class CacheEntry:
stale_pull_age: int = 0
do_clone: bool = True
_deps: CacheItemDependencies = field(default_factory=dict)
_previous_deps: Optional[CacheItemDependencies] = None
root_entry: Optional["CacheEntry"] = None
# following are set by get_cache() or set_cache():
commitinfo: Union[bool, Optional["Commit"]] = None
Expand Down Expand Up @@ -438,10 +446,7 @@ def set_cache(self, cache: Cache, directives: CacheDirective, value: Any) -> str
value = "not_stored" # XXX
cache.set(
full_key,
cast(
CacheValueType,
(value, last_commit, latest_commit or last_commit, self._deps),
),
CacheValue(value, last_commit, latest_commit or last_commit, self._deps),
timeout=directives.timeout,
)
return last_commit
Expand Down Expand Up @@ -474,24 +479,24 @@ def at_latest(self, older: str, newer: Optional[str]) -> bool:

def get_cache(
self, cache: Cache, latest_commit: Optional[str]
) -> Tuple[Any, Union[bool, Optional["Commit"]]]:
) -> Tuple[Optional[CacheValue], Union[bool, Optional["Commit"]]]:
"""Look up a cached value and then check if it out of date by checking if the file path in the key was modified after the given commit
(also store the last_commit so we don't have to do that check everytime)
we assume latest_commit is the last commit the client has seen but it might be older than the local copy
"""
full_key = self.cache_key()
value = cast(CacheValueType, cache.get(full_key))
value = cast(CacheValue, cache.get(full_key))
if value is None:
logger.info("cache miss for %s", full_key)
self.hit = False
return None, False # cache miss
return None, None # cache miss

response, last_commit, cached_latest_commit, self._deps = value
if latest_commit == cached_latest_commit:
# this is the latest
logger.info("cache hit for %s with %s", full_key, latest_commit)
self.hit = True
return response, True
return value, None
else:
# cache might be out of date, let's check by getting the commit info for the file path
try:
Expand All @@ -505,7 +510,7 @@ def get_cache(
)
# delete the local repository
_clear_project(self.project_id)
return response, False # treat as cache miss
return None, None # treat as cache miss
else:
logger.info(
"cache hit for %s, but error with client's commit %s",
Expand All @@ -515,36 +520,34 @@ def get_cache(
)
# got an error resolving latest_commit, just return the cached value
self.hit = True
return response, True
return value, None
if at_latest:
# repo was up-to-date, so treat as a cache hit
logger.info("cache hit for %s with %s", full_key, latest_commit)
self.hit = True
return response, True
return value, None

# the latest_commit is newer than the cached_latest_commit, check if the file has changed
new_commit = self._set_commit_info()
if new_commit == last_commit:
# the file hasn't changed, let's update the cache with latest_commit so we don't have to do this check again
value = CacheValue(
response,
last_commit,
latest_commit or cached_latest_commit,
self._deps,
)
cache.set(
full_key,
cast(
CacheValueType,
(
response,
last_commit,
latest_commit or cached_latest_commit,
self._deps,
),
),
value,
)
logger.info("cache hit for %s, updated %s", full_key, latest_commit)
self.hit = True
return response, True
return value, None
else:
# stale -- up to the caller to do something about it, e.g. update or delete the key
logger.info("stale cache hit for %s with %s", full_key, latest_commit)
return response, self.commitinfo # type: ignore
return value, self.commitinfo

def _set_inflight(
self, cache: Cache, latest_commit: Optional[str]
Expand All @@ -562,9 +565,11 @@ def _set_inflight(
time.sleep(_cache_inflight_sleep_duration)
if not cache.get(self._inflight_key()):
# no longer in flight
value, commitinfo = self.get_cache(cache, inflight_commit)
if commitinfo: # hit, use this instead of doing our work
return value, commitinfo
cache_value, stale = self.get_cache(
cache, inflight_commit
)
if cache_value: # hit, use this instead of doing our work
return cache_value.value, True
break # missing, so inflight work must have failed, continue with our work

cache.set(self._inflight_key(), (latest_commit, time.time()))
Expand All @@ -580,7 +585,6 @@ def _do_work(
cache_dependency: Optional[CacheItemDependency] = None,
) -> Tuple[Optional[Any], Any, CacheDirective]:
try:
self._previous_deps = self._deps.copy()
self._deps = {}
# NB: work shouldn't modify the working directory
err, value, cacheable = work(self, latest_commit)
Expand Down Expand Up @@ -634,17 +638,26 @@ def get_or_set(
# don't use the cache
return self._do_work(work, latest_commit)[0:2]

value, commitinfo = self.get_cache(cache, latest_commit)
if commitinfo:
if commitinfo is True:
if self._validate(value, cache, latest_commit, validate):
cache_value, stale = self.get_cache(cache, latest_commit)
if cache_value: # cache hit
if not stale:
if self._validate(
cache_value.value,
cache,
cache_value.latest_commit,
validate,
):
if self.root_entry and cache_dependency:
self.root_entry.restore_cache_dep(self)
return None, value
self.root_entry.add_cache_dep(
cache_dependency,
cache_value.latest_commit,
cache_value.last_commit,
)
return None, cache_value.value
logger.debug(f"validation failed for {self.cache_key()}")
# otherwise in cache but stale or invalid, fall thru to redo work
# XXX? check date to see if its recent enough to serve anyway
# if commitinfo.committed_date - time.time() < stale_ok_age:
# if stale.committed_date - time.time() < stale_ok_age:
# return value
self.hit = False
else: # cache miss
Expand Down Expand Up @@ -681,20 +694,6 @@ def add_cache_dep(
self._deps[dep.cache_key()] = dep
logger.debug("added dep %s on %s", self._deps, self.cache_key())

def restore_cache_dep(self, cache_entry: "CacheEntry") -> bool:
key = cache_entry.cache_key()
if self._previous_deps and key in self._previous_deps:
self._deps[key] = self._previous_deps[key]
return True
else:
logger.warning(
"previous cache dependency for %s missing on %s: %s",
key,
self.cache_key(),
self._previous_deps,
)
return False

def make_cache_dep(
self: "CacheEntry", stale_pull_age: int, package: Optional[Package]
) -> CacheItemDependency:
Expand Down Expand Up @@ -1800,8 +1799,6 @@ def _work(
if self.use_local_cache:
doc = self.get_cache(cache_entry.cache_key()) # check local cache
if doc is not None:
if self.root_cache_request and is_cache_dep:
self.root_cache_request.restore_cache_dep(cache_entry)
return doc, True

latest_commit = (
Expand Down

0 comments on commit ebf67f3

Please sign in to comment.