Skip to content

Commit

Permalink
server: fix preserving dependencies after cache hit and fixes for con…
Browse files Browse the repository at this point in the history
…current requests
  • Loading branch information
aszs committed Jun 26, 2023
1 parent 6de21b9 commit 6b2f87e
Showing 1 changed file with 84 additions and 44 deletions.
128 changes: 84 additions & 44 deletions unfurl/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,16 @@ def _tag_changed(self) -> bool:
assert package
package.set_version_from_repo(ServerCacheResolver.get_remote_tags)
if self.branch != package.revision_tag:
logger.debug(f"newer tag {package.revision_tag} found for {self.cache_key()} (was {self.branch})")
logger.debug(
f"newer tag {package.revision_tag} found for {self.cache_key()} (was {self.branch})"
)
return True
return False

# cache value, last_commit (on the file_path), latest_commit (seen in branch), list of deps this value depends on
CacheValueType = Tuple[Any, str, str, List[CacheItemDependency]]

CacheItemDependencies = Dict[str, CacheItemDependency]
# cache value, last_commit (on the file_path), latest_commit (seen in branch), map of deps this value depends on
CacheValueType = Tuple[Any, str, str, CacheItemDependencies]
CacheWorkCallable = Callable[
["CacheEntry", Optional[str]], Tuple[Optional[Any], Any, bool]
]
Expand All @@ -292,7 +296,11 @@ def pull(repo: GitRepo, branch: str) -> str:
firstCommit = next(repo.repo.iter_commits("HEAD", max_parents=0))
try:
# use shallow_since so we don't remove commits we already fetched
repo.pull(revision=branch, with_exceptions=True, shallow_since=str(firstCommit.committed_date))
repo.pull(
revision=branch,
with_exceptions=True,
shallow_since=str(firstCommit.committed_date),
)
except git.exc.GitCommandError as e: # type: ignore
if "You are not currently on a branch." in e.stderr:
# we cloned a tag, not a branch, set action so we remember this
Expand All @@ -312,8 +320,10 @@ class CacheEntry:
strict: bool = False
args: Optional[dict] = None
stale_pull_age: int = 0
do_clone: bool = False
_deps: List[CacheItemDependency] = field(default_factory=list)
do_clone: bool = True
_deps: CacheItemDependencies = field(default_factory=dict)
_previous_deps: Optional[CacheItemDependencies] = None
root_entry: Optional["CacheEntry"] = None
# following are set by get_cache() or set_cache():
commitinfo: Union[bool, Optional["Commit"]] = None
hit: Optional[bool] = None
Expand Down Expand Up @@ -349,6 +359,7 @@ def pull(self, cache: Cache, stale_ok_age: int = 0) -> GitRepo:
branch = self.branch or DEFAULT_BRANCH
repo_key = "pull:" + _get_project_repo_dir(self.project_id, branch, self.args)
# treat repo_key as a mutex to serialize write operations on the repo
time.sleep(0.100) # hack for redis in_flight handling
val = cache.get(repo_key)
if val:
last_check, action = cast(Tuple[float, str], val)
Expand Down Expand Up @@ -563,24 +574,36 @@ def _cancel_inflight(self, cache: Cache):
return cache.delete(self._inflight_key())

def _do_work(
self, work: CacheWorkCallable, latest_commit: Optional[str]
self,
work: CacheWorkCallable,
latest_commit: Optional[str],
cache_dependency: Optional[CacheItemDependency] = None,
) -> Tuple[Optional[Any], Any, CacheDirective]:
try:
self._deps = []
self._previous_deps = self._deps.copy()
self._deps = {}
# NB: work shouldn't modify the working directory
err, value, cacheable = work(self, latest_commit)
except Exception as exc:
logger.error("unexpected error doing work for cache", exc_info=True)
self.directives = CacheDirective(latest_commit=latest_commit, store=False)
return exc, None, self.directives
if not self.repo or self.strict:
# self.strict might reclone the repo
# self.strict might re-clone the repo
self._set_project_repo()
assert self.repo
latest = self.repo.revision
if latest: # if revision is valid
latest_commit = latest
self.directives = CacheDirective(store=cacheable, latest_commit=latest_commit)
if not err and self.root_entry and cache_dependency:
if self.commitinfo is None:
last_commit = self._set_commit_info()
else:
last_commit = self.commitinfo.hexsha if self.commitinfo else "" # type: ignore
self.root_entry.add_cache_dep(
cache_dependency, latest_commit or "", last_commit
)
return err, value, self.directives

def _validate(
Expand All @@ -592,8 +615,8 @@ def _validate(
) -> bool:
if value == "not_stored":
return False
logger.debug("checking deps %s on %s", self._deps, self.cache_key())
for dep in self._deps:
logger.debug("checking deps %s on %s", list(self._deps), self.cache_key())
for dep in self._deps.values():
if dep.out_of_date():
# need to regenerate the value
return False
Expand All @@ -605,6 +628,7 @@ def get_or_set(
work: CacheWorkCallable,
latest_commit: Optional[str],
validate: Optional[Callable] = None,
cache_dependency: Optional[CacheItemDependency] = None,
) -> Tuple[Optional[Any], Any]:
if latest_commit is None and not self.stale_pull_age:
# don't use the cache
Expand All @@ -614,6 +638,8 @@ def get_or_set(
if commitinfo:
if commitinfo is True:
if self._validate(value, cache, latest_commit, validate):
if self.root_entry and cache_dependency:
self.root_entry.restore_cache_dep(self)
return None, value
logger.debug(f"validation failed for {self.cache_key()}")
# otherwise in cache but stale or invalid, fall thru to redo work
Expand All @@ -640,39 +666,50 @@ def get_or_set(
# there was already work inflight and use that instead
return None, value

err, value, directives = self._do_work(work, latest_commit)
err, value, directives = self._do_work(work, latest_commit, cache_dependency)
cancel_succeeded = self._cancel_inflight(cache)
# skip caching work if cancel inflight failed -- that means invalidate_cache deleted it
if cancel_succeeded and not err:
self.set_cache(cache, directives, value)
return err, value

def add_cache_dep(
self, cache_entry: "CacheEntry", stale_pull_age: int, package: Optional[Package]
) -> CacheItemDependency:
if cache_entry.commitinfo is None:
last_commit = self._set_commit_info()
elif isinstance(cache_entry.commitinfo, Commit):
last_commit = cache_entry.commitinfo.hexsha
self, dep: CacheItemDependency, latest_commit: str, last_commit: str
) -> None:
dep.latest_commit = latest_commit
dep.last_commit = last_commit
self._deps[dep.cache_key()] = dep
logger.debug("added dep %s on %s", self._deps, self.cache_key())

def restore_cache_dep(self, cache_entry: "CacheEntry") -> bool:
key = cache_entry.cache_key()
if self._previous_deps and key in self._previous_deps:
self._deps[key] = self._previous_deps[key]
return True
else:
last_commit = "" # file is missing, no commit
latest_commit = cache_entry.directives and cache_entry.directives.latest_commit
if not latest_commit and cache_entry.repo:
latest_commit = cache_entry.repo.revision
logger.warning(
"previous cache dependency for %s missing on %s: %s",
key,
self.cache_key(),
self._previous_deps,
)
return False

def make_cache_dep(
self: "CacheEntry", stale_pull_age: int, package: Optional[Package]
) -> CacheItemDependency:
dep = CacheItemDependency(
cache_entry.project_id,
cache_entry.branch,
cache_entry.file_path,
cache_entry.key,
self.project_id,
self.branch,
self.file_path,
self.key,
stale_pull_age,
cache_entry.do_clone,
latest_commit or "",
last_commit,
self.do_clone,
"",
"",
)
if package and package.discovered_revision:
dep.latest_package_url = package.url
self._deps.append(dep)
logger.debug("added dep %s on %s", self._deps, self.cache_key())
return dep


Expand Down Expand Up @@ -897,7 +934,7 @@ def populate_cache():
branch = request.args.get("branch", DEFAULT_BRANCH)
for prefix in ["refs/heads/", "refs/tags/"]:
if branch.startswith(prefix):
branch = branch[len(prefix):]
branch = branch[len(prefix) :]
break
path = request.args["path"]
latest_commit = request.args["latest_commit"]
Expand Down Expand Up @@ -1015,7 +1052,6 @@ def _cache_localenv_work(
return err, local_env, True

repo = _get_project_repo(project_id, branch, args)
# set strict if latest_commit is set
cache_entry = CacheEntry(
project_id,
branch,
Expand All @@ -1024,7 +1060,6 @@ def _cache_localenv_work(
or os.path.join(DefaultNames.EnsembleDirectory, DefaultNames.Ensemble),
"localenv",
repo,
bool(latest_commit),
args=args,
)
err, value = cache_entry.get_or_set(
Expand Down Expand Up @@ -1681,7 +1716,7 @@ def get_remote_tags(cls, url, pattern="*") -> List[str]:

@property
def use_local_cache(self) -> bool:
return flask_config["CACHE_TYPE"] != 'simple'
return flask_config["CACHE_TYPE"] != "simple"

def _really_resolve_to_local_path(
self,
Expand Down Expand Up @@ -1731,14 +1766,6 @@ def _work(
# # no revision specified -> use key for latest remote tags cache of repo
# # branch or tag that isn't a semver -> dep, save commit hash as latest_commit
assert repo_view
is_cache_dep = not repo_view.package or repo_view.package.is_mutable_ref()
if is_cache_dep and self.root_cache_request:
# this will add this cache_dep to the root_cache_request's value
self.root_cache_request.add_cache_dep(
cache_entry,
cache_entry.stale_pull_age,
repo_view.package, # type: ignore
)
# return the value and whether it is cacheable
return None, doc, cacheable and not private

Expand Down Expand Up @@ -1767,16 +1794,29 @@ def _work(
"load_yaml" + (fragment or ""),
stale_pull_age=app.config["CACHE_DEFAULT_PULL_TIMEOUT"],
do_clone=True,
root_entry=self.root_cache_request,
)
is_cache_dep = not repo_view.package or repo_view.package.is_mutable_ref()
if self.use_local_cache:
doc = self.get_cache(cache_entry.cache_key()) # check local cache
if doc is not None:
if self.root_cache_request and is_cache_dep:
self.root_cache_request.restore_cache_dep(cache_entry)
return doc, True

latest_commit = (
repo_view.package.lock_to_commit if repo_view.package else None
)
err, doc = cache_entry.get_or_set(cache, _work, latest_commit)
dep = None
if is_cache_dep:
# this will add this cache_dep to the root_cache_request's value
dep = cache_entry.make_cache_dep(
cache_entry.stale_pull_age,
repo_view.package if repo_view.package else None,
)
err, doc = cache_entry.get_or_set(
cache, _work, latest_commit, cache_dependency=dep
)
if err:
if not _get_project_repo(project_id, branch, None):
# couldn't clone the repo
Expand Down

0 comments on commit 6b2f87e

Please sign in to comment.