diff --git a/README.md b/README.md index 955f9c7..453fbd2 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,7 @@ the dependencies. The following list will be kept up to date whenever we encount - **UPath().glob()** fsspec fixed its glob behavior when handling `**` patterns in versions `fsspec>=2023.9.0` - **GCSPath().mkdir()** a few mkdir quirks are solved by installing `gcsfs>=2022.7.1` - **fsspec.filesystem(WebdavPath().protocol)** the webdav protocol was added to fsspec in version `fsspec>=2022.5.0` +- **stat.S_ISDIR(HTTPPath().stat().st_mode)** requires `fsspec>=2024.2.0` to correctly return `True` for directories ## Contributing diff --git a/upath/_stat.py b/upath/_stat.py new file mode 100644 index 0000000..3a6ec78 --- /dev/null +++ b/upath/_stat.py @@ -0,0 +1,385 @@ +from __future__ import annotations + +import os +import warnings +from datetime import datetime +from stat import S_IFDIR +from stat import S_IFLNK +from stat import S_IFREG +from typing import Any +from typing import Iterator +from typing import Mapping +from typing import Sequence + +__all__ = [ + "UPathStatResult", +] + + +def _convert_value_to_timestamp(value: Any) -> int | float: + """Try to convert a datetime-like value to a timestamp.""" + if isinstance(value, (int, float)): + return value + elif isinstance(value, str): + if value.endswith("Z"): + value = value[:-1] + "+00:00" + return datetime.fromisoformat(value).timestamp() + elif isinstance(value, datetime): + return value.timestamp() + else: + warnings.warn( + f"Cannot convert {value!r} of type {type(value)!r} to a timestamp." + " Please report this at: https://github.com/fsspec/universal_path/issues", + RuntimeWarning, + stacklevel=2, + ) + raise TypeError(f"Cannot convert {value!r} to a timestamp.") + + +def _get_stat_result_extra_fields() -> tuple[str, ...]: + """retrieve the extra fields of the os.stat_result class.""" + # Note: + # The lines below let us provide a dictionary with the additional + # named fields of the stat_result class as keys and the internal + # index of the field as value. + sr = os.stat_result(range(os.stat_result.n_fields)) + _, (_, extra) = sr.__reduce__() + extra_fields = sorted(extra, key=extra.__getitem__) + return tuple(extra_fields) + + +class UPathStatResult: + """A stat_result compatible class wrapping fsspec info dicts. + + **Note**: It is unlikely that you will ever have to instantiate + this class directly. If you want to convert and info dict, + use: `UPathStatResult.from_info(info)` + + This object may be accessed either as a tuple of + (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) + or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on. + + There's an additional method `as_info()` for accessing the info dict. + This is useful to access additional information provided by the file system + implementation, that's not covered by the stat_result tuple. + + """ + + __slots__ = ("_seq", "_info") + # Note: + # can't derive from os.stat_result at all, and can't derive from + # tuple and have slots. So we duck type the os.stat_result class + + # Add the fields and "extra fields" of the os.stat_result class + _fields = ( + "st_mode", + "st_ino", + "st_dev", + "st_nlink", + "st_uid", + "st_gid", + "st_size", + "st_atime", + "st_mtime", + "st_ctime", + ) + _fields_extra = _get_stat_result_extra_fields() + + # Provide the n_ attributes of the os.stat_result class for compatibility + n_sequence_fields = len(_fields) + n_fields = len(_fields) + len(_fields_extra) + n_unnamed_fields = len(set(_fields_extra).intersection(_fields)) + + if ( + n_fields != os.stat_result.n_fields + or n_sequence_fields != os.stat_result.n_sequence_fields + or n_unnamed_fields != os.stat_result.n_unnamed_fields + ): + warnings.warn( + "UPathStatResult: The assumed number of fields in the" + " stat_result class is not correct. Got: " + f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}" + " This might cause problems? Please report this issue at:" + " https://github.com/fsspec/universal_path/issues", + RuntimeWarning, + stacklevel=2, + ) + + def __init__( + self, + stat_result_seq: Sequence[int], + info_dict: Mapping[str, Any] | None = None, + ) -> None: + """init compatible with os.stat_result + + Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info. + """ + seq = tuple(stat_result_seq) + if n := len(seq) < self.n_sequence_fields: + raise TypeError( + f"{self.__name__} takes at least {self.n_fields}-sequence" + " ({n}-sequence given)" + ) + elif n > self.n_fields: + raise TypeError( + f"{self.__name__} takes at most {self.n_fields}-sequence" + " ({n}-sequence given)" + ) + elif self.n_sequence_fields <= n < self.n_sequence_fields: + warnings.warn( + "UPathStatResult: The seq provided more than" + f" {self.n_sequence_fields} items. Ignoring the extra items...", + UserWarning, + stacklevel=2, + ) + self._seq = seq[: self.n_sequence_fields] + self._info = info_dict or {} + + def __repr__(self): + cls_name = type(self).__name__ + seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self))) + return f"{cls_name}({seq_attrs}, info={self._info!r})" + + # --- access to the fsspec info dict ------------------------------ + + @classmethod + def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult: + """Create a UPathStatResult from a fsspec info dict.""" + # fill all the fallback default values with 0 + defaults = [0] * cls.n_sequence_fields + return cls(defaults, info) + + def as_info(self) -> Mapping[str, Any]: + """Return the fsspec info dict.""" + return self._info + + # --- guaranteed fields ------------------------------------------- + + @property + def st_mode(self) -> int: + """protection bits""" + mode = self._info.get("mode") + if isinstance(mode, int): + return mode + elif isinstance(mode, str): + try: + return int(mode, 8) + except ValueError: + pass + + type_ = self._info.get("type") + if type_ == "file": + return S_IFREG # see: stat.S_ISREG + elif type_ == "directory": + return S_IFDIR # see: stat.S_ISDIR + + if self._info.get("isLink"): + return S_IFLNK # see: stat.S_ISLNK + + return self._seq[0] + + @property + def st_ino(self) -> int: + """inode""" + ino = self._info.get("ino") + if isinstance(ino, int): + return ino + return self._seq[1] + + @property + def st_dev(self) -> int: + """device""" + dev = self._info.get("dev") + if isinstance(dev, int): + return dev + return self._seq[2] + + @property + def st_nlink(self) -> int: + """number of hard links""" + nlink = self._info.get("nlink") + if isinstance(nlink, int): + return nlink + return self._seq[3] + + @property + def st_uid(self) -> int: + """user ID of owner""" + for key in ["uid", "owner", "uname", "unix.owner"]: + try: + return int(self._info[key]) + except (ValueError, TypeError, KeyError): + pass + return self._seq[4] + + @property + def st_gid(self) -> int: + """group ID of owner""" + for key in ["gid", "group", "gname", "unix.group"]: + try: + return int(self._info[key]) + except (ValueError, TypeError, KeyError): + pass + return self._seq[5] + + @property + def st_size(self) -> int: + """total size, in bytes""" + try: + return int(self._info["size"]) + except (ValueError, TypeError, KeyError): + return self._seq[6] + + @property + def st_atime(self) -> int | float: + """time of last access""" + for key in ["atime", "time", "last_accessed", "accessTime"]: + try: + raw_value = self._info[key] + except KeyError: + continue + try: + return _convert_value_to_timestamp(raw_value) + except (TypeError, ValueError): + pass + return self._seq[7] + + @property + def st_mtime(self) -> int | float: + """time of last modification""" + for key in [ + "mtime", + "LastModified", + "last_modified", + "timeModified", + "modificationTime", + "modified_at", + ]: + try: + raw_value = self._info[key] + except KeyError: + continue + try: + return _convert_value_to_timestamp(raw_value) + except (TypeError, ValueError): + pass + return self._seq[8] + + @property + def st_ctime(self) -> int | float: + """time of last change""" + try: + raw_value = self._info["ctime"] + except KeyError: + pass + else: + try: + return _convert_value_to_timestamp(raw_value) + except (TypeError, ValueError): + pass + return self._seq[9] + + # --- extra fields ------------------------------------------------ + + def __getattr__(self, item): + if item in self._fields_extra: + return 0 # fallback default value + raise AttributeError(item) + + if "st_birthtime" in _fields_extra: + + @property + def st_birthtime(self) -> int | float: + """time of creation""" + for key in ["created", "creation_time", "timeCreated", "created_at"]: + try: + raw_value = self._info[key] + except KeyError: + continue + try: + return _convert_value_to_timestamp(raw_value) + except (TypeError, ValueError): + pass + return 0 + + # --- os.stat_result tuple interface ------------------------------ + + def __len__(self) -> int: + return len(self._fields) + + def __iter__(self) -> Iterator[int]: + """the sequence interface iterates over the guaranteed fields. + + All values are integers. + """ + for field in self._fields: + yield int(getattr(self, field)) + + def index(self, value: int, start: int = 0, stop: int = None, /) -> int: + """the sequence interface index method.""" + if stop is None: + stop = len(self._seq) + return self._seq.index(value, start, stop) + + def count(self, value: int) -> int: + """the sequence interface count method.""" + return self._seq.count(value) + + # --- compatibility with the fsspec info dict interface ------------ + + def __getitem__(self, item: int | str) -> Any: + if isinstance(item, str): + warnings.warn( + "Access the fsspec info via `.as_info()[key]`", + DeprecationWarning, + stacklevel=2, + ) + return self._info[item] + # we need to go via the attributes and cast to int + attr = self._fields[item] + return int(getattr(self, attr)) + + def keys(self): + """compatibility with the fsspec info dict interface.""" + warnings.warn( + "Access the fsspec info via `.as_info().keys()`", + DeprecationWarning, + stacklevel=2, + ) + return self._info.keys() + + def values(self): + """compatibility with the fsspec info dict interface.""" + warnings.warn( + "Access the fsspec info via `.as_info().values()`", + DeprecationWarning, + stacklevel=2, + ) + return self._info.values() + + def items(self): + """compatibility with the fsspec info dict interface.""" + warnings.warn( + "Access the fsspec info via `.as_info().items()`", + DeprecationWarning, + stacklevel=2, + ) + return self._info.items() + + def get(self, key, default=None): + """compatibility with the fsspec info dict interface.""" + warnings.warn( + "Access the fsspec info via `.as_info().get(key, default)`", + DeprecationWarning, + stacklevel=2, + ) + return self._info.get(key, default) + + def copy(self): + """compatibility with the fsspec info dict interface.""" + warnings.warn( + "Access the fsspec info via `.as_info().copy()`", + DeprecationWarning, + stacklevel=2, + ) + return self._info.copy() diff --git a/upath/core.py b/upath/core.py index dd3032e..be5b3ab 100644 --- a/upath/core.py +++ b/upath/core.py @@ -21,6 +21,7 @@ from upath._compat import str_remove_suffix from upath._flavour import FSSpecFlavour from upath._protocol import get_upath_protocol +from upath._stat import UPathStatResult from upath.registry import get_upath_class __all__ = ["UPath"] @@ -587,8 +588,15 @@ def is_relative_to(self, other, /, *_deprecated): # === pathlib.Path ================================================ - def stat(self, *, follow_symlinks=True): - return self.fs.stat(self.path) + def stat(self, *, follow_symlinks=True) -> UPathStatResult: + if not follow_symlinks: + warnings.warn( + "UPath.stat(follow_symlinks=False): follow_symlinks=False is" + " currently ignored.", + UserWarning, + stacklevel=2, + ) + return UPathStatResult.from_info(self.fs.stat(self.path)) def lstat(self): # return self.stat(follow_symlinks=False) diff --git a/upath/implementations/http.py b/upath/implementations/http.py index 0d0dc21..425bca0 100644 --- a/upath/implementations/http.py +++ b/upath/implementations/http.py @@ -1,11 +1,13 @@ from __future__ import annotations +import warnings from itertools import chain from fsspec.asyn import sync from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim from upath._flavour import FSSpecFlavour as _FSSpecFlavour +from upath._stat import UPathStatResult from upath.core import UPath __all__ = ["HTTPPath"] @@ -50,6 +52,19 @@ def is_dir(self): else: return True + def stat(self, follow_symlinks: bool = True): + if not follow_symlinks: + warnings.warn( + "HTTPPath.stat(follow_symlinks=False): follow_symlinks=False is" + " currently ignored.", + UserWarning, + stacklevel=2, + ) + info = self.fs.info(self.path) + if "url" in info: + info["type"] = "directory" if info["url"].endswith("/") else "file" + return UPathStatResult.from_info(info) + def iterdir(self): it = iter(super().iterdir()) try: diff --git a/upath/tests/cases.py b/upath/tests/cases.py index bed4212..f08a52e 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -1,6 +1,9 @@ +import os import pickle import re +import stat import sys +import warnings from pathlib import Path import pytest @@ -9,6 +12,7 @@ from packaging.version import Version from upath import UPath +from upath._stat import UPathStatResult class BaseTests: @@ -26,7 +30,28 @@ def test_home(self): def test_stat(self): stat = self.path.stat() - assert stat + assert isinstance(stat, UPathStatResult) + assert len(tuple(stat)) == os.stat_result.n_sequence_fields + + with warnings.catch_warnings(): + warnings.simplefilter("error") + + for idx in range(os.stat_result.n_sequence_fields): + assert isinstance(stat[idx], int) + for attr in UPathStatResult._fields + UPathStatResult._fields_extra: + assert hasattr(stat, attr) + + def test_stat_dir_st_mode(self): + base = self.path.stat() # base folder + assert stat.S_ISDIR(base.st_mode) + + def test_stat_file_st_mode(self): + file1 = self.path.joinpath("file1.txt").stat() + assert stat.S_ISREG(file1.st_mode) + + def test_stat_st_size(self): + file1 = self.path.joinpath("file1.txt").stat() + assert file1.st_size == 11 def test_chmod(self): with pytest.raises(NotImplementedError): diff --git a/upath/tests/implementations/test_http.py b/upath/tests/implementations/test_http.py index c9b2797..7541780 100644 --- a/upath/tests/implementations/test_http.py +++ b/upath/tests/implementations/test_http.py @@ -9,6 +9,7 @@ from ..cases import BaseTests from ..utils import skip_on_windows from ..utils import xfail_if_no_ssl_connection +from ..utils import xfail_if_version try: get_filesystem_class("http") @@ -120,6 +121,10 @@ def test_rename2(self): with pytest.raises(NotImplementedError): return super().test_rename() + @xfail_if_version("fsspec", lt="2024.2.0", reason="requires fsspec>=2024.2.0") + def test_stat_dir_st_mode(self): + super().test_stat_dir_st_mode() + @pytest.mark.parametrize( "args,parts",