Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove version from implementation #165

Merged
merged 11 commits into from
Jan 18, 2024
219 changes: 30 additions & 189 deletions audbackend/core/artifactory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import re
import typing

import artifactory
Expand Down Expand Up @@ -154,12 +153,6 @@ def __init__(
)
self._repo = path.find_repository_local(self.repository)

# to support legacy file structure
# see _use_legacy_file_structure()
self._legacy_extensions = []
self._legacy_file_structure = False
self._legacy_file_structure_regex = False

def _access(
self,
):
Expand All @@ -170,10 +163,9 @@ def _access(
def _checksum(
self,
path: str,
version: str,
) -> str:
r"""MD5 checksum of file on backend."""
path = self._path(path, version)
path = self._path(path)
checksum = artifactory.ArtifactoryPath.stat(path).md5
return checksum

Expand Down Expand Up @@ -214,10 +206,9 @@ def _create(
def _date(
self,
path: str,
version: str,
) -> str:
r"""Get last modification date of file on backend."""
path = self._path(path, version)
path = self._path(path)
date = path.stat().mtime
date = utils.date_format(date)
return date
Expand All @@ -231,10 +222,14 @@ def _delete(
def _exists(
self,
path: str,
version: str,
) -> bool:
r"""Check if file exists on backend."""
path = self._path(path, version)
path = self._expand(path)
path = _artifactory_path(
path,
self._username,
self._api_key,
)
return path.exists()

def _expand(
Expand All @@ -258,154 +253,52 @@ def _get_file(
self,
src_path: str,
dst_path: str,
version: str,
verbose: bool,
):
r"""Get file from backend."""
src_path = self._path(src_path, version)
src_path = self._path(src_path)
_download(src_path, dst_path, verbose=verbose)

def _legacy_split_ext(
self,
name: str,
) -> typing.Tuple[str, str]:
r"""Split name into basename and extension."""
ext = None
for custom_ext in self._legacy_extensions:
# check for custom extension
# ensure basename is not empty
if self._legacy_file_structure_regex:
pattern = rf'\.({custom_ext})$'
match = re.search(pattern, name[1:])
if match:
ext = match.group(1)
elif name[1:].endswith(f'.{custom_ext}'):
ext = custom_ext
if ext is None:
# if no custom extension is found
# use last string after dot
ext = audeer.file_extension(name)

base = audeer.replace_file_extension(name, '', ext=ext)

if ext:
ext = f'.{ext}'

return base, ext

def _ls(
self,
path: str,
) -> typing.List[typing.Tuple[str, str]]:
r"""List all files under (sub-)path."""
if path.endswith('/'): # find files under sub-path

path = self._expand(path)
path = _artifactory_path(
path,
self._username,
self._api_key,
)
if not path.exists():
utils.raise_file_not_found_error(str(path))

paths = [str(x) for x in path.glob("**/*") if x.is_file()]

else: # find versions of path

root, name = self.split(path)

if self._legacy_file_structure:
base, _ = self._legacy_split_ext(name)
root = f'{self._expand(root)}{base}'
else:
root = self._expand(root)

root = _artifactory_path(
root,
self._username,
self._api_key,
)
vs = [os.path.basename(str(f)) for f in root if f.is_dir]

# filter out other files with same root and version
paths = [str(self._path(path, v))
for v in vs if self._exists(path, v)]

if not paths:
utils.raise_file_not_found_error(path)

# <host>/<repository>/<root>/<name>
# ->
# (/<root>/<name>, <version>)
#
# or legacy:
#
# <host>/<repository>/<root>/<base>/<version>/<base>-<version>.<ext>
# ->
# (/<root>/<base>.<ext>, <version>)

result = []
for p in paths:

p = self._collapse(p) # remove host and repo
tokens = p.split('/')

name = tokens[-1]
version = tokens[-2]

if self._legacy_file_structure:
base = tokens[-3]
ext = name[len(base) + len(version) + 1:]
name = f'{base}{ext}'
path = self.sep.join(tokens[:-3])
else:
path = self.sep.join(tokens[:-2])

path = self.sep + path
path = self.join(path, name)

result.append((path, version))

return result
) -> typing.List[str]:
r"""List all files under sub-path."""
path = self._path(path)
path = _artifactory_path(
path,
self._username,
self._api_key,
)
if not path.exists():
utils.raise_file_not_found_error(str(path))

paths = [str(x) for x in path.glob("**/*") if x.is_file()]
paths = [self._collapse(path) for path in paths]

return paths

def _owner(
self,
path: str,
version: str,
) -> str:
r"""Get owner of file on backend."""
path = self._path(path, version)
path = self._path(path)
owner = path.stat().modified_by
return owner

def _path(
self,
path: str,
version: str,
) -> artifactory.ArtifactoryPath:
r"""Convert to backend path.

<root>/<name>
->
<host>/<repository>/<root>/<version>/<name>

or legacy:

<root>/<base>.<ext>
<path>
->
<host>/<repository>/<root>/<base>/<version>/<name>-<version>.<ext>
<host>/<repository>/<path>

"""
root, name = self.split(path)
root = self._expand(root)

if self._legacy_file_structure:
base, ext = self._legacy_split_ext(name)
path = f'{root}{base}/{version}/{base}-{version}{ext}'
else:
path = f'{root}{version}/{name}'

path = self._expand(path)
path = _artifactory_path(
path,
self._username,
Expand All @@ -417,69 +310,17 @@ def _put_file(
self,
src_path: str,
dst_path: str,
version: str,
checksum: str,
verbose: bool,
):
r"""Put file to backend."""
dst_path = self._path(dst_path, version)
dst_path = self._path(dst_path)
_deploy(src_path, dst_path, checksum, verbose=verbose)

def _remove_file(
self,
path: str,
version: str,
):
r"""Remove file from backend."""
path = self._path(path, version)
path = self._path(path)
path.unlink()

def _use_legacy_file_structure(
self,
*,
extensions: typing.List[str] = None,
regex: bool = False,
):
r"""Use legacy file structure.

Stores files under
``'.../<name-wo-ext>/<version>/<name-wo-ext>-<version>.<ext>'``
instead of
``'.../<version>/<name>'``.
By default,
the extension
``<ext>``
is set to the string after the last dot.
I.e.,
the backend path
``'.../file.tar.gz'``
will translate into
``'.../file.tar/1.0.0/file.tar-1.0.0.gz'``.
However,
by passing a list with custom extensions
it is possible to overwrite
the default behavior
for certain extensions.
E.g.,
with
``backend._use_legacy_file_structure(extensions=['tar.gz'])``
it is ensured that
``'tar.gz'``
will be recognized as an extension
and the backend path
``'.../file.tar.gz'``
will then translate into
``'.../file/1.0.0/file-1.0.0.tar.gz'``.
E.g.
with
``backend._use_legacy_file_structure(extensions=['\d+.tar.gz'],
regex=True)``
the backend path
``'.../file.99.tar.gz'``
will translate into
``'.../file/1.0.0/file-1.0.0.99.tar.gz'``.

"""
self._legacy_file_structure = True
self._legacy_extensions = extensions or []
self._legacy_file_structure_regex = regex
Loading