Skip to content

Commit

Permalink
Remove version from implementation (#165)
Browse files Browse the repository at this point in the history
* Backend: expand path with version

* remove version handling

* remove version handling

* TST: fix test

* TST: fix test of docstring examples

* DOC: fix SQL example

* fix linter errors

* TST: fix use backend path separator when expanding path

* Backend: check path and version in _path_with_version()

* Backend.latest_version(): remove path check

* Backend._ls(): improve docstring
  • Loading branch information
frankenjoe authored Jan 18, 2024
1 parent 21a6cce commit a64eba6
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 508 deletions.
219 changes: 30 additions & 189 deletions audbackend/core/artifactory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import re
import typing

import artifactory
Expand Down Expand Up @@ -154,12 +153,6 @@ def __init__(
)
self._repo = path.find_repository_local(self.repository)

# to support legacy file structure
# see _use_legacy_file_structure()
self._legacy_extensions = []
self._legacy_file_structure = False
self._legacy_file_structure_regex = False

def _access(
self,
):
Expand All @@ -170,10 +163,9 @@ def _access(
def _checksum(
self,
path: str,
version: str,
) -> str:
r"""MD5 checksum of file on backend."""
path = self._path(path, version)
path = self._path(path)
checksum = artifactory.ArtifactoryPath.stat(path).md5
return checksum

Expand Down Expand Up @@ -214,10 +206,9 @@ def _create(
def _date(
self,
path: str,
version: str,
) -> str:
r"""Get last modification date of file on backend."""
path = self._path(path, version)
path = self._path(path)
date = path.stat().mtime
date = utils.date_format(date)
return date
Expand All @@ -231,10 +222,14 @@ def _delete(
def _exists(
self,
path: str,
version: str,
) -> bool:
r"""Check if file exists on backend."""
path = self._path(path, version)
path = self._expand(path)
path = _artifactory_path(
path,
self._username,
self._api_key,
)
return path.exists()

def _expand(
Expand All @@ -258,154 +253,52 @@ def _get_file(
self,
src_path: str,
dst_path: str,
version: str,
verbose: bool,
):
r"""Get file from backend."""
src_path = self._path(src_path, version)
src_path = self._path(src_path)
_download(src_path, dst_path, verbose=verbose)

def _legacy_split_ext(
self,
name: str,
) -> typing.Tuple[str, str]:
r"""Split name into basename and extension."""
ext = None
for custom_ext in self._legacy_extensions:
# check for custom extension
# ensure basename is not empty
if self._legacy_file_structure_regex:
pattern = rf'\.({custom_ext})$'
match = re.search(pattern, name[1:])
if match:
ext = match.group(1)
elif name[1:].endswith(f'.{custom_ext}'):
ext = custom_ext
if ext is None:
# if no custom extension is found
# use last string after dot
ext = audeer.file_extension(name)

base = audeer.replace_file_extension(name, '', ext=ext)

if ext:
ext = f'.{ext}'

return base, ext

def _ls(
self,
path: str,
) -> typing.List[typing.Tuple[str, str]]:
r"""List all files under (sub-)path."""
if path.endswith('/'): # find files under sub-path

path = self._expand(path)
path = _artifactory_path(
path,
self._username,
self._api_key,
)
if not path.exists():
utils.raise_file_not_found_error(str(path))

paths = [str(x) for x in path.glob("**/*") if x.is_file()]

else: # find versions of path

root, name = self.split(path)

if self._legacy_file_structure:
base, _ = self._legacy_split_ext(name)
root = f'{self._expand(root)}{base}'
else:
root = self._expand(root)

root = _artifactory_path(
root,
self._username,
self._api_key,
)
vs = [os.path.basename(str(f)) for f in root if f.is_dir]

# filter out other files with same root and version
paths = [str(self._path(path, v))
for v in vs if self._exists(path, v)]

if not paths:
utils.raise_file_not_found_error(path)

# <host>/<repository>/<root>/<name>
# ->
# (/<root>/<name>, <version>)
#
# or legacy:
#
# <host>/<repository>/<root>/<base>/<version>/<base>-<version>.<ext>
# ->
# (/<root>/<base>.<ext>, <version>)

result = []
for p in paths:

p = self._collapse(p) # remove host and repo
tokens = p.split('/')

name = tokens[-1]
version = tokens[-2]

if self._legacy_file_structure:
base = tokens[-3]
ext = name[len(base) + len(version) + 1:]
name = f'{base}{ext}'
path = self.sep.join(tokens[:-3])
else:
path = self.sep.join(tokens[:-2])

path = self.sep + path
path = self.join(path, name)

result.append((path, version))

return result
) -> typing.List[str]:
r"""List all files under sub-path."""
path = self._path(path)
path = _artifactory_path(
path,
self._username,
self._api_key,
)
if not path.exists():
utils.raise_file_not_found_error(str(path))

paths = [str(x) for x in path.glob("**/*") if x.is_file()]
paths = [self._collapse(path) for path in paths]

return paths

def _owner(
self,
path: str,
version: str,
) -> str:
r"""Get owner of file on backend."""
path = self._path(path, version)
path = self._path(path)
owner = path.stat().modified_by
return owner

def _path(
self,
path: str,
version: str,
) -> artifactory.ArtifactoryPath:
r"""Convert to backend path.
<root>/<name>
->
<host>/<repository>/<root>/<version>/<name>
or legacy:
<root>/<base>.<ext>
<path>
->
<host>/<repository>/<root>/<base>/<version>/<name>-<version>.<ext>
<host>/<repository>/<path>
"""
root, name = self.split(path)
root = self._expand(root)

if self._legacy_file_structure:
base, ext = self._legacy_split_ext(name)
path = f'{root}{base}/{version}/{base}-{version}{ext}'
else:
path = f'{root}{version}/{name}'

path = self._expand(path)
path = _artifactory_path(
path,
self._username,
Expand All @@ -417,69 +310,17 @@ def _put_file(
self,
src_path: str,
dst_path: str,
version: str,
checksum: str,
verbose: bool,
):
r"""Put file to backend."""
dst_path = self._path(dst_path, version)
dst_path = self._path(dst_path)
_deploy(src_path, dst_path, checksum, verbose=verbose)

def _remove_file(
self,
path: str,
version: str,
):
r"""Remove file from backend."""
path = self._path(path, version)
path = self._path(path)
path.unlink()

def _use_legacy_file_structure(
self,
*,
extensions: typing.List[str] = None,
regex: bool = False,
):
r"""Use legacy file structure.
Stores files under
``'.../<name-wo-ext>/<version>/<name-wo-ext>-<version>.<ext>'``
instead of
``'.../<version>/<name>'``.
By default,
the extension
``<ext>``
is set to the string after the last dot.
I.e.,
the backend path
``'.../file.tar.gz'``
will translate into
``'.../file.tar/1.0.0/file.tar-1.0.0.gz'``.
However,
by passing a list with custom extensions
it is possible to overwrite
the default behavior
for certain extensions.
E.g.,
with
``backend._use_legacy_file_structure(extensions=['tar.gz'])``
it is ensured that
``'tar.gz'``
will be recognized as an extension
and the backend path
``'.../file.tar.gz'``
will then translate into
``'.../file/1.0.0/file-1.0.0.tar.gz'``.
E.g.
with
``backend._use_legacy_file_structure(extensions=['\d+.tar.gz'],
regex=True)``
the backend path
``'.../file.99.tar.gz'``
will translate into
``'.../file/1.0.0/file-1.0.0.99.tar.gz'``.
"""
self._legacy_file_structure = True
self._legacy_extensions = extensions or []
self._legacy_file_structure_regex = regex
Loading

0 comments on commit a64eba6

Please sign in to comment.