Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check PyPI registry when deps.dev fails to find a source repository #982

Open
wants to merge 9 commits into
base: staging
Choose a base branch
from
4 changes: 2 additions & 2 deletions src/macaron/json_tools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module provides utility functions for JSON data."""
Expand Down Expand Up @@ -53,5 +53,5 @@ def json_extract(entry: dict | list, keys: Sequence[str | int], type_: type[T])
if isinstance(entry, type_):
return entry

logger.debug("Found value of incorrect type: %s instead of %s.", type(entry), type(type_))
logger.debug("Found value of incorrect type: %s instead of %s.", type(entry), type_)
return None
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Analyzer checks whether the maintainers' join date closer to latest package's release date."""
Expand Down Expand Up @@ -95,7 +95,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
The result and related information collected during the analysis.
"""
maintainers_join_date: list[datetime] | None = self._get_maintainers_join_date(
pypi_package_json.pypi_registry, pypi_package_json.component.name
pypi_package_json.pypi_registry, pypi_package_json.component_name
)
latest_release_date: datetime | None = self._get_latest_release_date(pypi_package_json)
detail_info: dict[str, JsonType] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

version = pypi_package_json.component.version
version = pypi_package_json.component_version
if version is None: # check latest release version
version = pypi_package_json.get_latest_version()

Expand Down
47 changes: 45 additions & 2 deletions src/macaron/repo_finder/repo_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from macaron.config.defaults import defaults
from macaron.config.global_config import global_config
from macaron.errors import CloneError, RepoCheckOutError
from macaron.repo_finder import to_domain_from_known_purl_types
from macaron.repo_finder import repo_finder_pypi, to_domain_from_known_purl_types
from macaron.repo_finder.commit_finder import find_commit, match_tags
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
Expand All @@ -66,11 +66,14 @@
list_remote_references,
resolve_local_path,
)
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo

logger: logging.Logger = logging.getLogger(__name__)


def find_repo(purl: PackageURL, check_latest_version: bool = True) -> tuple[str, RepoFinderInfo]:
def find_repo(
purl: PackageURL, check_latest_version: bool = True, all_package_registries: list[PackageRegistryInfo] | None = None
) -> tuple[str, RepoFinderInfo]:
"""Retrieve the repository URL that matches the given PURL.

Parameters
Expand All @@ -79,6 +82,8 @@ def find_repo(purl: PackageURL, check_latest_version: bool = True) -> tuple[str,
The parsed PURL to convert to the repository path.
check_latest_version: bool
A flag that determines whether the latest version of the PURL is also checked.
all_package_registries: list[PackageRegistryInfo] | None
The list of package registries, if any.

Returns
-------
Expand All @@ -103,6 +108,9 @@ def find_repo(purl: PackageURL, check_latest_version: bool = True) -> tuple[str,
logger.debug("Analyzing %s with Repo Finder: %s", purl, type(repo_finder))
found_repo, outcome = repo_finder.find_repo(purl)

if not found_repo:
found_repo, outcome = find_repo_alternative(purl, outcome, all_package_registries)

if found_repo or not check_latest_version:
return found_repo, outcome

Expand All @@ -114,13 +122,48 @@ def find_repo(purl: PackageURL, check_latest_version: bool = True) -> tuple[str,
return "", RepoFinderInfo.NO_NEWER_VERSION

found_repo, outcome = DepsDevRepoFinder().find_repo(latest_version_purl)
if found_repo:
return found_repo, outcome

if not found_repo:
found_repo, outcome = find_repo_alternative(latest_version_purl, outcome)

if not found_repo:
logger.debug("Could not find repo from latest version of PURL: %s", latest_version_purl)
return "", RepoFinderInfo.LATEST_VERSION_INVALID

return found_repo, outcome


def find_repo_alternative(
purl: PackageURL, outcome: RepoFinderInfo, all_package_registries: list[PackageRegistryInfo] | None = None
) -> tuple[str, RepoFinderInfo]:
"""Use PURL type specific methods to find the repository when the standard methods have failed.

Parameters
----------
purl : PackageURL
The parsed PURL to convert to the repository path.
outcome: RepoFinderInfo
A previous outcome to report if this method does nothing.
all_package_registries: list[PackageRegistryInfo] | None
The list of package registries, if any.

Returns
-------
tuple[str, RepoFinderOutcome] :
The repository URL for the passed package, if found, and the outcome to report.
"""
found_repo = ""
if purl.type == "pypi":
found_repo, outcome = repo_finder_pypi.find_repo(purl, all_package_registries)

if not found_repo:
logger.debug("Could not find repository using type specific (%s) methods for PURL: %s", purl.type, purl)

return found_repo, outcome


def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None:
"""Return the repository path from the PURL string.

Expand Down
17 changes: 16 additions & 1 deletion src/macaron/repo_finder/repo_finder_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ class RepoFinderInfo(Enum):
#: Reported if deps.dev returns data that does not contain the desired SCM URL. E.g. The repository URL.
DDEV_NO_URLS = "deps.dev no URLs"

#: Reported if there was an error with the request sent to the PyPI registry.
PYPI_HTTP_ERROR = "PyPI HTTP error"

#: Reported if there was an error parsing the JSON returned by the PyPI registry.
PYPI_JSON_ERROR = "PyPI JSON error"

#: Reported if there was no matching URLs in the JSON returned by the PyPI registry.
PYPI_NO_URLS = "PyPI no matching URLs"

#: Reported if the PyPI registry is disabled or not present in the list of package registries.
PYPI_NO_REGISTRY = "PyPI registry disabled or absent"

#: Reported if the provided PURL did not produce a result, but a more recent version could not be found.
NO_NEWER_VERSION = "No newer version than provided which failed"

Expand All @@ -70,7 +82,10 @@ class RepoFinderInfo(Enum):
FOUND_FROM_PARENT = "Found from parent"

#: Reported when a repository is found from a more recent version than was provided by the user.
FOUND_FROM_LATEST = "Found form latest"
FOUND_FROM_LATEST = "Found from latest"

#: Reported when a repository could only be found by checking the PyPI registry JSON.
FOUND_FROM_PYPI = "Found from PyPI"

#: Default value. Reported if the Repo Finder was not called. E.g. Because the repository URL was already present.
NOT_USED = "Not used"
Expand Down
85 changes: 85 additions & 0 deletions src/macaron/repo_finder/repo_finder_pypi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the logic for finding repositories of PyPI projects."""
import logging
import urllib.parse

from packageurl import PackageURL

from macaron.repo_finder.repo_finder_enums import RepoFinderInfo
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, PyPIRegistry
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo

logger: logging.Logger = logging.getLogger(__name__)


def find_repo(
purl: PackageURL, all_package_registries: list[PackageRegistryInfo] | None = None
) -> tuple[str, RepoFinderInfo]:
"""Retrieve the repository URL that matches the given PyPI PURL.

Parameters
----------
purl : PackageURL
The parsed PURL to convert to the repository path.
all_package_registries: list[PackageRegistryInfo] | None
The context of the current analysis, if any.

Returns
-------
tuple[str, RepoFinderOutcome] :
The repository URL for the passed package, if found, and the outcome to report.
"""
pypi_registry = next((registry for registry in PACKAGE_REGISTRIES if isinstance(registry, PyPIRegistry)), None)
if not pypi_registry:
return "", RepoFinderInfo.PYPI_NO_REGISTRY

pypi_registry.load_defaults()
pypi_asset = PyPIPackageJsonAsset(purl.name, purl.version, pypi_registry, {})
if not pypi_asset.download(dest=""):
return "", RepoFinderInfo.PYPI_HTTP_ERROR

if all_package_registries:
# Find the package registry info object that contains the PyPI registry and has the pypi build tool.
registry_info = next(
(
info
for info in all_package_registries
if info.package_registry == pypi_registry and info.build_tool_name == "pypi"
),
None,
)
if registry_info:
# Save the asset for later use.
registry_info.metadata.append(pypi_asset)

url_dict = pypi_asset.get_project_links()
if not url_dict:
return "", RepoFinderInfo.PYPI_JSON_ERROR

for url_key in url_dict:
url = url_dict[url_key]
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.hostname:
continue
if not parsed_url.hostname.lower() == "github.com":
continue
# The path starts with a "/".
split_path = parsed_url.path[1:].split("/")
if not split_path or len(split_path) < 2:
continue
# Fix the URL so that it is the base GitHub URL. E.g. github.com/{owner}/{repo}
fixed_url = urllib.parse.ParseResult(
scheme=parsed_url.scheme,
netloc=parsed_url.netloc,
path=f"{split_path[0]}/{split_path[1]}",
params=parsed_url.params,
query=parsed_url.query,
fragment=parsed_url.fragment,
).geturl()
logger.debug("Found repository URL from PyPI: %s", fixed_url)
return fixed_url, RepoFinderInfo.FOUND_FROM_PYPI

return "", RepoFinderInfo.PYPI_NO_URLS
58 changes: 45 additions & 13 deletions src/macaron/slsa_analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def run_single(
status=SCMStatus.ANALYSIS_FAILED,
)

# Pre-populate all package registries so assets can be stored for later.
all_package_registries = self._populate_package_registry_info()

provenance_is_verified = False
if not provenance_payload and parsed_purl:
# Try to find the provenance file for the parsed PURL.
Expand Down Expand Up @@ -367,7 +370,12 @@ def run_single(
available_domains = [git_service.hostname for git_service in GIT_SERVICES if git_service.hostname]
try:
analysis_target = Analyzer.to_analysis_target(
config, available_domains, parsed_purl, provenance_repo_url, provenance_commit_digest
config,
available_domains,
parsed_purl,
provenance_repo_url,
provenance_commit_digest,
all_package_registries,
)
except InvalidAnalysisTargetError as error:
return Record(
Expand Down Expand Up @@ -459,7 +467,7 @@ def run_single(
self._determine_build_tools(analyze_ctx, git_service)
if parsed_purl is not None:
self._verify_repository_link(parsed_purl, analyze_ctx)
self._determine_package_registries(analyze_ctx)
self._determine_package_registries(analyze_ctx, all_package_registries)

if not provenance_payload:
# Look for provenance using the CI.
Expand Down Expand Up @@ -767,6 +775,7 @@ def to_analysis_target(
parsed_purl: PackageURL | None,
provenance_repo_url: str | None = None,
provenance_commit_digest: str | None = None,
all_package_registries: list[PackageRegistryInfo] | None = None,
) -> AnalysisTarget:
"""Resolve the details of a software component from user input.

Expand All @@ -783,6 +792,8 @@ def to_analysis_target(
The repository URL extracted from provenance, or None if not found or no provenance.
provenance_commit_digest: str | None
The commit extracted from provenance, or None if not found or no provenance.
all_package_registries: list[PackageRegistryInfo] | None
The list of all package registries.

Returns
-------
Expand Down Expand Up @@ -825,7 +836,9 @@ def to_analysis_target(
converted_repo_path = repo_finder.to_repo_path(parsed_purl, available_domains)
if converted_repo_path is None:
# Try to find repo from PURL
repo, repo_finder_outcome = repo_finder.find_repo(parsed_purl)
repo, repo_finder_outcome = repo_finder.find_repo(
parsed_purl, all_package_registries=all_package_registries
)

return Analyzer.AnalysisTarget(
parsed_purl=parsed_purl,
Expand Down Expand Up @@ -979,20 +992,39 @@ def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseG
)
)

def _determine_package_registries(self, analyze_ctx: AnalyzeContext) -> None:
def _populate_package_registry_info(self) -> list[PackageRegistryInfo]:
"""Add all possible package registries to the analysis context."""
package_registries = []
for package_registry in PACKAGE_REGISTRIES:
for build_tool in BUILD_TOOLS:
build_tool_name = build_tool.name
if build_tool_name not in package_registry.build_tool_names:
continue
package_registries.append(
PackageRegistryInfo(
build_tool_name=build_tool_name,
build_tool_purl_type=build_tool.purl_type,
package_registry=package_registry,
)
)
return package_registries

def _determine_package_registries(
self, analyze_ctx: AnalyzeContext, all_package_registries: list[PackageRegistryInfo]
) -> None:
"""Determine the package registries used by the software component based on its build tools."""
build_tools = (
analyze_ctx.dynamic_data["build_spec"]["tools"] or analyze_ctx.dynamic_data["build_spec"]["purl_tools"]
)
for package_registry in PACKAGE_REGISTRIES:
for build_tool in build_tools:
if package_registry.is_detected(build_tool):
analyze_ctx.dynamic_data["package_registries"].append(
PackageRegistryInfo(
build_tool=build_tool,
package_registry=package_registry,
)
)
build_tool_names = {build_tool.name for build_tool in build_tools}
relevant_package_registries = []
for package_registry in all_package_registries:
if package_registry.build_tool_name not in build_tool_names:
continue
relevant_package_registries.append(package_registry)

# Assign the updated list of registries.
analyze_ctx.dynamic_data["package_registries"] = relevant_package_registries

def _verify_repository_link(self, parsed_purl: PackageURL, analyze_ctx: AnalyzeContext) -> None:
"""Verify whether the claimed repository links back to the artifact."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
from macaron.malware_analyzer.pypi_heuristics.pypi_sourcecode_analyzer import PyPISourcecodeAnalyzer
from macaron.malware_analyzer.pypi_heuristics.sourcecode.suspicious_setup import SuspiciousSetupAnalyzer
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.build_tool.pip import Pip
from macaron.slsa_analyzer.build_tool.poetry import Poetry
from macaron.slsa_analyzer.checks.base_check import BaseCheck
from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType
from macaron.slsa_analyzer.package_registry.deps_dev import APIAccessError, DepsDevService
Expand Down Expand Up @@ -400,14 +398,24 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
match package_registry_info_entry:
# Currently, only PyPI packages are supported.
case PackageRegistryInfo(
build_tool=Pip() | Poetry(),
build_tool_name="pip" | "poetry",
build_tool_purl_type="pypi",
package_registry=PyPIRegistry() as pypi_registry,
) as pypi_registry_info:

# Create an AssetLocator object for the PyPI package JSON object.
pypi_package_json = PyPIPackageJsonAsset(
component=ctx.component, pypi_registry=pypi_registry, package_json={}
# Retrieve the pre-existing AssetLocator object for the PyPI package JSON object, if it exists.
pypi_package_json = next(
(asset for asset in pypi_registry_info.metadata if isinstance(asset, PyPIPackageJsonAsset)),
None,
)
if not pypi_package_json:
# Create an AssetLocator object for the PyPI package JSON object.
pypi_package_json = PyPIPackageJsonAsset(
component_name=ctx.component.name,
component_version=ctx.component.version,
pypi_registry=pypi_registry,
package_json={},
)

pypi_registry_info.metadata.append(pypi_package_json)

Expand Down
Loading
Loading