diff --git a/pyproject.toml b/pyproject.toml index ba9422df212..6feee4d3fda 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,6 +119,7 @@ extend-select = [ "TCH", # flake8-type-checking "TID", # flake8-tidy-imports "UP", # pyupgrade + "PTH" ] ignore = [ "B904", # use 'raise ... from err' diff --git a/src/poetry/installation/uninstaller.py b/src/poetry/installation/uninstaller.py index d8588db7d56..5af23d41a24 100644 --- a/src/poetry/installation/uninstaller.py +++ b/src/poetry/installation/uninstaller.py @@ -98,8 +98,8 @@ class Distribution: def __init__( self, dist: metadata.Distribution, - info_location: BasePath | None, - installed_location: BasePath | None, + info_location: Path | None, + installed_location: Path | None, ) -> None: self._dist = dist self._info_location = info_location @@ -121,7 +121,7 @@ def info_location(self) -> str | None: def installed_location(self) -> str | None: if self._installed_location is None: return None - return normalize_path(str(self._installed_location)) + return str(normalize_path(self._installed_location)) def _get_dist_name_from_location(self) -> str | None: """Try to get the name from the metadata directory name. @@ -130,10 +130,9 @@ def _get_dist_name_from_location(self) -> str | None: """ if self._info_location is None: return None - stem, suffix = os.path.splitext(self._info_location.name) - if suffix not in (".dist-info", ".egg-info"): + if self._info_location.suffix != ".dist-info": return None - return stem.split("-", 1)[0] + return self._info_location.stem.split("-", 1)[0] @property def canonical_name(self) -> NormalizedName: @@ -212,26 +211,28 @@ def iter_declared_entries(self) -> Iterator[str] | None: return self._iter_declared_entries_from_record() -def normalize_path(path: str, resolve_symlinks: bool = True) -> str: +def normalize_path(path: Path, resolve_symlinks: bool = True) -> Path: """ Convert a path to its canonical, case-normalized, absolute version. """ - path = os.path.expanduser(path) - path = os.path.realpath(path) if resolve_symlinks else os.path.abspath(path) - return os.path.normcase(path) + path = path.expanduser() + path = os.path.realpath(path) if resolve_symlinks else path.resolve() + return Path(os.path.normcase(path)) -def renames(old: str, new: str) -> None: +def renames(old: Path, new: Path) -> None: """Like os.renames(), but handles renaming across devices.""" # Implementation borrowed from os.renames(). - head, tail = os.path.split(new) - if head and tail and not os.path.exists(head): - os.makedirs(head) + head = new.parent + tail = f"{new.stem}{new.suffix}" + if head and tail and not head.exists(): + head.mkdir(parents=True) shutil.move(old, new) - head, tail = os.path.split(old) + head = old.parent + tail = f"{old.stem}{old.suffix}" if head and tail: with contextlib.suppress(OSError): os.removedirs(head) @@ -259,12 +260,12 @@ def get_indentation() -> int: return getattr(_log_state, "indentation", 0) -def _script_names(bin_dir: str, script_name: str, is_gui: bool) -> Iterator[str]: +def _script_names(bin_dir: Path, script_name: str, is_gui: bool) -> Iterator[str]: """Create the fully qualified name of the files created by {console,gui}_scripts for the given ``dist``. Returns the list of file names """ - exe_name = os.path.join(bin_dir, script_name) + exe_name = bin_dir / script_name yield exe_name if not WINDOWS: return @@ -291,7 +292,7 @@ def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: @_unique -def uninstallation_paths(dist: Distribution) -> Iterator[str]: +def uninstallation_paths(dist: Distribution) -> Iterator[Path]: """ Yield all the uninstallation paths for dist based on RECORD-without-.py[co] @@ -305,7 +306,7 @@ def uninstallation_paths(dist: Distribution) -> Iterator[str]: https://packaging.python.org/specifications/recording-installed-packages/ """ - location = dist.location + location = Path(dist.location) assert location is not None, "not installed" entries = dist.iter_declared_entries() @@ -314,29 +315,31 @@ def uninstallation_paths(dist: Distribution) -> Iterator[str]: raise UninstallationError(msg) for entry in entries: - path = os.path.join(location, entry) + path = location / entry yield path - if path.endswith(".py"): + if path.name.endswith(".py"): dn, fn = os.path.split(path) + dn = Path(dn) base = fn[:-3] - path = os.path.join(dn, base + ".pyc") + path = dn / (base + ".pyc") yield path - path = os.path.join(dn, base + ".pyo") + path = dn / (base + ".pyo") yield path -def compact(paths: Iterable[str]) -> set[str]: +def compact(paths: Iterable[Path]) -> set[Path]: """Compact a path set to contain the minimal number of paths necessary to contain all paths in the set. If /a/path/ and /a/path/to/a/file.txt are both in the set, leave only the shorter path.""" sep = os.path.sep - short_paths: set[str] = set() - for path in sorted(paths, key=len): + short_paths: set[Path] = set() + for path in sorted(paths, key=lambda x: len(x.name)): + # TODO: rewrite should_skip = any( - path.startswith(shortpath.rstrip("*")) - and path[len(shortpath.rstrip("*").rstrip(sep))] == sep + path.name.startswith(shortpath.name.rstrip("*")) + and path[len(shortpath.name.rstrip("*").rstrip(sep))] == sep for shortpath in short_paths ) if not should_skip: @@ -356,7 +359,7 @@ def compress_for_rename(paths: Iterable[str]) -> set[str]: wildcards: set[str] = set() def norm_join(*a: str) -> str: - return os.path.normcase(os.path.join(*a)) + return os.path.normcase(os.path.join(*a)) # noqa: PTH118 for root in unchecked: if any(os.path.normcase(root).startswith(w) for w in wildcards): @@ -378,7 +381,7 @@ def norm_join(*a: str) -> str: return set(map(case_map.__getitem__, remaining)) | wildcards -def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str]]: +def compress_for_output_listing(paths: Iterable[Path]) -> tuple[set[Path], set[Path]]: """Returns a tuple of 2 sets of which paths to display to user The first set contains paths that would be deleted. Files of a package @@ -388,7 +391,7 @@ def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str The second set contains files that would have been skipped in the above folders. """ - + # TODO: consider refactoring will_remove = set(paths) will_skip = set() @@ -396,10 +399,10 @@ def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str folders = set() files = set() for path in will_remove: - if path.endswith(".pyc"): + if path.name.endswith(".pyc"): continue - if path.endswith("__init__.py") or ".dist-info" in path: - folders.add(os.path.dirname(path)) + if path.name.endswith("__init__.py") or ".dist-info" in path.name: + folders.add(path.parent) files.add(path) _normcased_files = set(map(os.path.normcase, files)) @@ -409,20 +412,17 @@ def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str # This walks the tree using os.walk to not miss extra folders # that might get added. for folder in folders: - for dirpath, _, dirfiles in os.walk(folder): + for dirpath, _, dirfiles in folder.walk(): for fname in dirfiles: if fname.endswith(".pyc"): continue - file_ = os.path.join(dirpath, fname) - if ( - os.path.isfile(file_) - and os.path.normcase(file_) not in _normcased_files - ): + file_ = dirpath / fname + if file_.is_file() and os.path.normcase(file_) not in _normcased_files: # We are skipping this file. Add it to the set. will_skip.add(file_) - will_remove = files | {os.path.join(folder, "*") for folder in folders} + will_remove = files | {str(Path(folder).joinpath("*")) for folder in folders} return will_remove, will_skip @@ -437,9 +437,9 @@ def __init__(self) -> None: self._save_dirs: dict[str, str] = {} # (old path, new path) tuples for each move that may need # to be undone. - self._moves: list[tuple[str, str]] = [] + self._moves: list[tuple[Path, Path]] = [] - def _get_directory_stash(self, path: str) -> str: + def _get_directory_stash(self, path: Path) -> Path: """Stashes a directory. Directories are stashed adjacent to their original location if @@ -455,51 +455,50 @@ def _get_directory_stash(self, path: str) -> str: save_dir = tempfile.mkdtemp(prefix="poetry-uninstall") self._save_dirs[os.path.normcase(path)] = save_dir - return save_dir + return Path(save_dir) - def _get_file_stash(self, path: str) -> str: + def _get_file_stash(self, path: Path) -> str: """Stashes a file. If no root has been provided, one will be created for the directory in the user's temp directory.""" - path = os.path.normcase(path) - head, old_head = os.path.dirname(path), None + path = Path(os.path.normcase(path)) + head, old_head = path.parent, None save_dir = None while head != old_head: with contextlib.suppress(KeyError): save_dir = self._save_dirs[head] break - head, old_head = os.path.dirname(head), head + head, old_head = head.parent, head else: # Did not find any suitable root - head = os.path.dirname(path) + head = path.parent save_dir = tempfile.mkdtemp(prefix="poetry-uninstall") - self._save_dirs[head] = save_dir + self._save_dirs[head.name] = save_dir relpath = os.path.relpath(path, head) if relpath and relpath != os.path.curdir: - return os.path.join(save_dir, relpath) + return save_dir / relpath return save_dir - def stash(self, path: str) -> str: + def stash(self, path: Path) -> Path: """Stashes the directory or file and returns its new location. Handle symlinks as files to avoid modifying the symlink targets. """ - path_is_dir = os.path.isdir(path) and not os.path.islink(path) - if path_is_dir: + if path_is_dir := path.is_dir() and not path.is_symlink(): new_path = self._get_directory_stash(path) else: new_path = self._get_file_stash(path) self._moves.append((path, new_path)) - if path_is_dir and os.path.isdir(new_path): + if path_is_dir and new_path.is_dir(): # If we're moving a directory, we need to # remove the destination first or else it will be # moved to inside the existing directory. # We just created new_path ourselves, so it will # be removable. - os.rmdir(new_path) + new_path.rmdir() renames(path, new_path) return new_path @@ -511,16 +510,16 @@ def commit(self) -> None: self._save_dirs = {} def rollback(self) -> None: - """Undoes the uninstall by moving stashed files back.""" + """Undoes the uninstallation by moving stashed files back.""" for p in self._moves: logger.info("Moving to %s\n from %s", *p) for new_path, path in self._moves: try: logger.debug("Replacing %s from %s", new_path, path) - if os.path.isfile(new_path) or os.path.islink(new_path): - os.unlink(new_path) - elif os.path.isdir(new_path): + if new_path.is_file() or new_path.is_symlink(): + new_path.unlink() + elif new_path.is_dir(): remove_directory(Path(new_path)) renames(path, new_path) except OSError as ex: @@ -539,8 +538,8 @@ class UninstallPathSet: requirement.""" def __init__(self, dist: Distribution) -> None: - self._paths: set[str] = set() - self._refuse: set[str] = set() + self._paths: set[Path] = set() + self._refuse: set[Path] = set() self._pth: dict[str, UninstallPthEntries] = {} self._dist = dist self._moved_paths = StashedUninstallPathSet() @@ -549,7 +548,7 @@ def __init__(self, dist: Distribution) -> None: # the same args, which hurts performance. self._normalize_path_cached = functools.lru_cache(normalize_path) - def _permitted(self, path: str) -> bool: + def _permitted(self, path: Path) -> bool: """ Return True if the given path is one we are permitted to remove/modify, False otherwise. @@ -561,16 +560,17 @@ def _permitted(self, path: str) -> bool: if not running_under_virtualenv(): return True """ - return path.startswith(self._normalize_path_cached(sys.prefix)) + # TODO: relative_to? + return path.name.startswith(self._normalize_path_cached(sys.prefix)) - def add(self, path: str) -> None: + def add(self, path: Path) -> None: head, tail = os.path.split(path) # we normalize the head to resolve parent directory symlinks, but not # the tail, since we only want to uninstall symlinks, not their targets - path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) + path = self._normalize_path_cached(head) / os.path.normcase(tail) - if not os.path.exists(path): + if not path.exists(): return if self._permitted(path): self._paths.add(path) @@ -579,15 +579,15 @@ def add(self, path: str) -> None: # __pycache__ files can show up after 'installed-files.txt' is created, # due to imports - if os.path.splitext(path)[1] == ".py": - self.add(cache_from_source(path)) + if path.suffix == ".py": + self.add(Path(cache_from_source(path.name))) - def add_pth(self, pth_file: str, entry: str) -> None: + def add_pth(self, pth_file: Path, entry: str) -> None: pth_file = self._normalize_path_cached(pth_file) if self._permitted(pth_file): if pth_file not in self._pth: - self._pth[pth_file] = UninstallPthEntries(pth_file) - self._pth[pth_file].add(entry) + self._pth[pth_file.name] = UninstallPthEntries(pth_file) + self._pth[pth_file.name].add(entry) else: self._refuse.add(pth_file) @@ -671,21 +671,21 @@ def from_dist(cls, dist: Distribution, env: Env) -> UninstallPathSet: dist_location, ) - bin_dir = env.paths.get("scripts") + bin_dir = Path(env.paths.get("scripts")) # find distutils scripts= scripts try: for script in dist.iter_distutils_script_names(): - paths_to_remove.add(os.path.join(bin_dir, script)) + paths_to_remove.add(bin_dir / script) if WINDOWS: - paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) + paths_to_remove.add(bin_dir / f"{script}.bat") except (FileNotFoundError, NotADirectoryError): pass # find console_scripts and gui_scripts def iter_scripts_to_remove( dist: Distribution, - bin_dir: str, + bin_dir: Path, ) -> Iterator[str]: for entry_point in dist.iter_entry_points(): if entry_point.group == "console_scripts": @@ -700,7 +700,7 @@ def iter_scripts_to_remove( class UninstallPthEntries: - def __init__(self, pth_file: str) -> None: + def __init__(self, pth_file: Path) -> None: self.file = pth_file self.entries: set[str] = set() self._saved_lines: list[bytes] | None = None @@ -709,7 +709,7 @@ def add(self, entry: str) -> None: entry = os.path.normcase(entry) # On Windows, os.path.normcase converts the entry to use # backslashes. This is correct for entries that describe absolute - # paths outside of site-packages, but all the others use forward + # paths outside the site-packages, but all the others use forward # slashes. # os.path.splitdrive is used instead of os.path.isabs because isabs # treats non-absolute paths with drive letter markings like c:foo\bar @@ -724,10 +724,10 @@ def remove(self) -> None: logger.debug("Removing pth entries from %s:", self.file) # If the file doesn't exist, log a warning and return - if not os.path.isfile(self.file): + if not self.file.is_file(): logger.warning("Cannot remove entries from nonexistent file %s", self.file) return - with open(self.file, "rb") as fh: + with self.file.open("rb") as fh: # windows uses '\r\n' with py3k, but uses '\n' with py2.x lines = fh.readlines() self._saved_lines = lines @@ -737,12 +737,11 @@ def remove(self) -> None: if lines and not lines[-1].endswith(endline.encode("utf-8")): lines[-1] = lines[-1] + endline.encode("utf-8") for entry in self.entries: - try: + with contextlib.suppress(ValueError): logger.debug("Removing entry: %s", entry) lines.remove((entry + endline).encode("utf-8")) - except ValueError: - pass - with open(self.file, "wb") as fh: + + with self.file.open("wb") as fh: fh.writelines(lines) def rollback(self) -> bool: @@ -750,6 +749,6 @@ def rollback(self) -> bool: logger.error("Cannot roll back changes to %s, none were made", self.file) return False logger.debug("Rolling %s back to previous state", self.file) - with open(self.file, "wb") as fh: + with self.file.open("wb") as fh: fh.writelines(self._saved_lines) return True