Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Author:    Bartosz Sokorski <b.sokorski@gmail.com>
  • Loading branch information
Secrus committed Sep 30, 2024
1 parent 22901fa commit 1867a75
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 107 deletions.
227 changes: 139 additions & 88 deletions src/poetry/inspection/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,27 @@
from typing import Mapping
from typing import Sequence

import pkginfo
#import pkginfo

from build import BuildBackendException
from poetry.core.constraints.version import Version
from poetry.core.factory import Factory
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.core.pyproject.toml import PyProjectTOML
from poetry.core.utils.helpers import parse_requires
#from poetry.core.utils.helpers import parse_requires
from poetry.core.utils.helpers import temporary_directory
from poetry.core.version.markers import InvalidMarker
from poetry.core.version.requirements import InvalidRequirement

from poetry.utils.helpers import extractall
from poetry.utils.isolated_build import isolated_builder

from poetry.inspection.utils import metadata_regex, pkg_info_regex, egg_regex
from packaging.metadata import Metadata

if TYPE_CHECKING:
from collections.abc import Iterator

from packaging.metadata import RawMetadata
from packaging.utils import NormalizedName
from poetry.core.packages.project_package import ProjectPackage

Expand Down Expand Up @@ -217,73 +217,95 @@ def to_package(
package.extras = package_extras

return package
#
# @classmethod
# def _requirements_from_distribution(
# cls,
# dist: pkginfo.BDist | pkginfo.SDist | pkginfo.Wheel,
# ) -> list[str] | None:
# """
# Helper method to extract package requirements from a `pkginfo.Distribution`
# instance.
#
# :param dist: The distribution instance to extract requirements from.
# """
# # If the distribution lists requirements, we use those.
# #
# # If the distribution does not list requirements, but the metadata is new enough
# # to specify that this is because there definitely are none: then we return an
# # empty list.
# #
# # If there is a requires.txt, we use that.
# if dist.requires_dist:
# return list(dist.requires_dist)
#
# if dist.metadata_version is not None:
# metadata_version = Version.parse(dist.metadata_version)
# if (
# metadata_version >= DYNAMIC_METADATA_VERSION
# and "Requires-Dist" not in dist.dynamic
# ):
# return []
#
# requires = Path(dist.filename) / "requires.txt"
# if requires.exists():
# text = requires.read_text(encoding="utf-8")
# requirements = parse_requires(text)
# return requirements
#
# return None
#
# @classmethod
# def _from_distribution(
# cls, dist: pkginfo.BDist | pkginfo.SDist | pkginfo.Wheel
# ) -> PackageInfo:
# """
# Helper method to parse package information from a `pkginfo.Distribution`
# instance.
#
# :param dist: The distribution instance to parse information from.
# """
# if dist.metadata_version not in pm._VALID_METADATA_VERSIONS:
# raise ValueError(f"Unknown metadata version: {dist.metadata_version}")
#
# requirements = cls._requirements_from_distribution(dist)
#
# info = cls(
# name=dist.name,
# version=dist.version,
# summary=dist.summary,
# requires_dist=requirements,
# requires_python=dist.requires_python,
# )
#
# info._source_type = "file"
# info._source_url = Path(dist.filename).resolve().as_posix()
#
# return info

@classmethod
def _requirements_from_distribution(
cls,
dist: pkginfo.BDist | pkginfo.SDist | pkginfo.Wheel,
) -> list[str] | None:
"""
Helper method to extract package requirements from a `pkginfo.Distribution`
instance.
:param dist: The distribution instance to extract requirements from.
"""
# If the distribution lists requirements, we use those.
#
# If the distribution does not list requirements, but the metadata is new enough
# to specify that this is because there definitely are none: then we return an
# empty list.
#
# If there is a requires.txt, we use that.
if dist.requires_dist:
return list(dist.requires_dist)

if dist.metadata_version is not None:
metadata_version = Version.parse(dist.metadata_version)
if (
metadata_version >= DYNAMIC_METADATA_VERSION
and "Requires-Dist" not in dist.dynamic
):
return []

requires = Path(dist.filename) / "requires.txt"
if requires.exists():
text = requires.read_text(encoding="utf-8")
requirements = parse_requires(text)
return requirements

return None

@classmethod
def _from_distribution(
cls, dist: pkginfo.BDist | pkginfo.SDist | pkginfo.Wheel
) -> PackageInfo:
"""
Helper method to parse package information from a `pkginfo.Distribution`
instance.
:param dist: The distribution instance to parse information from.
"""
if dist.metadata_version not in pkginfo.distribution.HEADER_ATTRS:
# This check can be replaced once upstream implements strict parsing
# https://bugs.launchpad.net/pkginfo/+bug/2058697
raise ValueError(f"Unknown metadata version: {dist.metadata_version}")

requirements = cls._requirements_from_distribution(dist)

info = cls(
name=dist.name,
version=dist.version,
summary=dist.summary,
requires_dist=requirements,
requires_python=dist.requires_python,
)

info._source_type = "file"
info._source_url = Path(dist.filename).resolve().as_posix()
@staticmethod
def _read_metadata_from_sdist(path: Path) -> bytes:
import zipfile
import tarfile

if zipfile.is_zipfile(path):
with zipfile.ZipFile(path) as archive:
for file in archive.infolist():
if pkg_info_regex.search(file.filename):
return archive.read(file.filename)
elif tarfile.is_tarfile(path):
with tarfile.TarFile.open(path) as archive:
for name in archive.getnames():
if pkg_info_regex.search(name):
return archive.extractfile(name).read()
else:
raise ValueError(f"Not a known archive format: {path}")

return info
@staticmethod
def _read_metadata_from_unpacked_sdist(path: Path) -> bytes:
if path.is_file():
path = path.parent
return (path / "PKG-INFO").read_bytes()

@classmethod
def _from_sdist_file(cls, path: Path) -> PackageInfo:
Expand All @@ -297,8 +319,9 @@ def _from_sdist_file(cls, path: Path) -> PackageInfo:
info = None

with contextlib.suppress(ValueError):
sdist = pkginfo.SDist(str(path))
info = cls._from_distribution(sdist)
sdist = cls._read_metadata_from_sdist(path)
metadata = Metadata.from_email(sdist)
info = cls.from_metadata(metadata)

if info is not None and info.requires_dist is not None:
# we successfully retrieved dependencies from sdist metadata
Expand Down Expand Up @@ -357,18 +380,21 @@ def _find_dist_info(path: Path) -> Iterator[Path]:
yield Path(d)

@classmethod
def from_metadata(cls, metadata: RawMetadata) -> PackageInfo:
def from_metadata(cls, metadata: Metadata) -> PackageInfo:
"""
Create package information from core metadata.
:param metadata: raw metadata
"""
requirements = []
if metadata.requires_dist:
requirements = [str(req) for req in metadata.requires_dist]
return cls(
name=metadata.get("name"),
version=metadata.get("version"),
summary=metadata.get("summary"),
requires_dist=metadata.get("requires_dist"),
requires_python=metadata.get("requires_python"),
name=metadata.name,
version= str(metadata.version),
summary=metadata.summary,
requires_dist=requirements,
requires_python=metadata.requires_python,
)

@classmethod
Expand All @@ -383,13 +409,13 @@ def from_metadata_directory(cls, path: Path) -> PackageInfo | None:
else:
directories = list(cls._find_dist_info(path=path))

dist: pkginfo.BDist | pkginfo.SDist | pkginfo.Wheel
metadata: bytes
for directory in directories:
try:
if directory.suffix == ".egg-info":
dist = pkginfo.UnpackedSDist(directory.as_posix())
metadata = cls._read_metadata_from_unpacked_sdist(directory)
elif directory.suffix == ".dist-info":
dist = pkginfo.Wheel(directory.as_posix())
metadata = cls._read_metadata_from_wheel(directory)
else:
continue
break
Expand All @@ -398,11 +424,12 @@ def from_metadata_directory(cls, path: Path) -> PackageInfo | None:
else:
try:
# handle PKG-INFO in unpacked sdist root
dist = pkginfo.UnpackedSDist(path.as_posix())
metadata = cls._read_metadata_from_unpacked_sdist(path)
except ValueError:
return None

return cls._from_distribution(dist=dist)
parsed_metadata = Metadata.from_email(metadata)
return cls.from_metadata(parsed_metadata)

@classmethod
def from_package(cls, package: Package) -> PackageInfo:
Expand Down Expand Up @@ -446,9 +473,8 @@ def from_directory(cls, path: Path) -> PackageInfo:
:param path: Path to generate package information from.
"""
project_package = cls._get_poetry_package(path)
info: PackageInfo | None
if project_package:
if project_package := cls._get_poetry_package(path):
info = cls.from_package(project_package)
else:
info = cls.from_metadata_directory(path)
Expand Down Expand Up @@ -481,6 +507,24 @@ def from_sdist(cls, path: Path) -> PackageInfo:
# so, we assume this is an directory
return cls.from_directory(path=path)

@staticmethod
def _read_metadata_from_wheel(path: Path) -> bytes:
if path.suffix == ".whl":
import zipfile

with zipfile.ZipFile(path) as wheel_file:
for file in wheel_file.infolist():
if metadata_regex.search(file.filename):
return wheel_file.read(file.filename)

elif path.suffix == ".dist-info":
return (path / "METADATA").read_bytes()

else:
raise ValueError(
f"Not a known wheel archive format or installed .dist-info: {path}"
)

@classmethod
def from_wheel(cls, path: Path) -> PackageInfo:
"""
Expand All @@ -489,8 +533,9 @@ def from_wheel(cls, path: Path) -> PackageInfo:
:param path: Path to wheel.
"""
try:
wheel = pkginfo.Wheel(str(path))
return cls._from_distribution(wheel)
wheel_metadata = cls._read_metadata_from_wheel(path)
metadata = Metadata.from_email(wheel_metadata)
return cls.from_metadata(metadata)
except ValueError as e:
raise PackageInfoError(path, e)

Expand All @@ -505,8 +550,14 @@ def from_bdist(cls, path: Path) -> PackageInfo:
return cls.from_wheel(path=path)

try:
bdist = pkginfo.BDist(str(path))
return cls._from_distribution(bdist)
import zipfile

with zipfile.ZipFile(path) as wheel_file:
for file in wheel_file.infolist():
if egg_regex.search(file.filename):
metadata = wheel_file.read(file.filename)
parsed_metadata = Metadata.from_email(metadata)
return cls.from_metadata(parsed_metadata)
except ValueError as e:
raise PackageInfoError(path, e)

Expand Down
7 changes: 3 additions & 4 deletions src/poetry/inspection/lazy_wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from requests.models import HTTPError
from requests.models import Response
from requests.status_codes import codes
from poetry.inspection.utils import metadata_regex


if TYPE_CHECKING:
Expand Down Expand Up @@ -490,8 +491,6 @@ class LazyWheelOverHTTP(LazyFileOverHTTP):
# multiple times in the same invocation against an index without this support.
_domains_without_negative_range: ClassVar[set[str]] = set()

_metadata_regex = re.compile(r"^[^/]*\.dist-info/METADATA$")

def read_metadata(self, name: str) -> bytes:
"""Download and read the METADATA file from the remote wheel."""
with ZipFile(self) as zf:
Expand Down Expand Up @@ -707,14 +706,14 @@ def _prefetch_metadata(self, name: str) -> str:
filename = ""
for info in zf.infolist():
if start is None:
if self._metadata_regex.search(info.filename):
if metadata_regex.search(info.filename):
filename = info.filename
start = info.header_offset
continue
else:
# The last .dist-info/ entry may be before the end of the file if the
# wheel's entries are sorted lexicographically (which is unusual).
if not self._metadata_regex.search(info.filename):
if not metadata_regex.search(info.filename):
end = info.header_offset
break
if start is None:
Expand Down
5 changes: 5 additions & 0 deletions src/poetry/inspection/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import re

metadata_regex = re.compile(r"^[^/]*\.dist-info/METADATA$")
pkg_info_regex = re.compile(r"^[a-zA-Z0-9.-]*/PKG-INFO$")
egg_regex = re.compile(r"^[a-zA-Z0-9.-]*(/EGG-INFO)?/PKG-INFO$")
Loading

0 comments on commit 1867a75

Please sign in to comment.