diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py index a07270d46..9e1b123f3 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py @@ -5,13 +5,12 @@ import logging -import requests - from macaron.errors import HeuristicAnalyzerValueError from macaron.json_tools import JsonType, json_extract from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset +from macaron.util import send_head_http_raw logger: logging.Logger = logging.getLogger(__name__) @@ -87,7 +86,8 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes inspector_prefix = f"{self.INSPECTOR_PREFIX}{name.lower()}/{version}/" inspector_link = release_metadata["url"].replace(self.PYPI_PREFIX, inspector_prefix) - if not self._valid_url(inspector_link, pypi_package_json.pypi_registry.request_timeout): + # use a head request because we don't care about the response contents + if send_head_http_raw(inspector_link) is None: inspector_link = "" release_files.append(release_metadata["url"]) @@ -102,10 +102,3 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes return HeuristicResult.PASS, {version: release_files} return HeuristicResult.FAIL, {version: release_files} - - def _valid_url(self, url: str, timeout: int) -> bool: - try: - response = requests.head(url, allow_redirects=True, timeout=timeout) - return response.status_code == 200 - except requests.exceptions.RequestException: - return False diff --git a/src/macaron/util.py b/src/macaron/util.py index 8fdc41f3e..047d14125 100644 --- a/src/macaron/util.py +++ b/src/macaron/util.py @@ -59,6 +59,72 @@ def send_get_http(url: str, headers: dict) -> dict: return dict(response.json()) +def send_head_http_raw( + url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True +) -> Response | None: + """Send the HEAD HTTP request with the given url and headers. + + This method also handle logging when the API server return error status code. + + Parameters + ---------- + url : str + The url of the request. + headers : dict | None + The dict that describes the headers of the request. + timeout: int | None + The request timeout (optional). + allow_redirects: bool + Whether to allow redirects. Default: True. + + Returns + ------- + Response | None + If a Response object is returned and ``allow_redirects`` is ``True`` (the default) it will have a status code of + 200 (OK). If ``allow_redirects`` is ``False`` the response can instead have a status code of 302. Otherwise, the + request has failed and ``None`` will be returned. + """ + logger.debug("HEAD - %s", url) + if not timeout: + timeout = defaults.getint("requests", "timeout", fallback=10) + error_retries = defaults.getint("requests", "error_retries", fallback=5) + retry_counter = error_retries + try: + response = requests.head( + url=url, + headers=headers, + timeout=timeout, + allow_redirects=allow_redirects, + ) + except requests.exceptions.RequestException as error: + logger.debug(error) + return None + if not allow_redirects and response.status_code == 302: + # Found, most likely because a redirect is about to happen. + return response + while response.status_code != 200: + logger.debug( + "Receiving error code %s from server.", + response.status_code, + ) + if retry_counter <= 0: + logger.debug("Maximum retries reached: %s", error_retries) + return None + if response.status_code == 403: + check_rate_limit(response) + else: + return None + retry_counter = retry_counter - 1 + response = requests.head( + url=url, + headers=headers, + timeout=timeout, + allow_redirects=allow_redirects, + ) + + return response + + def send_get_http_raw( url: str, headers: dict | None = None, timeout: int | None = None, allow_redirects: bool = True ) -> Response | None: diff --git a/tests/malware_analyzer/pypi/test_wheel_absence.py b/tests/malware_analyzer/pypi/test_wheel_absence.py index bbd7e73cf..ed5d42227 100644 --- a/tests/malware_analyzer/pypi/test_wheel_absence.py +++ b/tests/malware_analyzer/pypi/test_wheel_absence.py @@ -21,8 +21,11 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None: analyzer.analyze(pypi_package_json) -@patch("requests.head") -def test_analyze_tar_present(mock_head: MagicMock, pypi_package_json: MagicMock) -> None: +# Note: to patch a function, the way it is imported matters. +# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir +# if it is imported like this: from os import listdir; listdir() then you patch .listdir +@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw") +def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None: """Test for when only .tar.gz is present, so failed""" analyzer = WheelAbsenceAnalyzer() version = "0.1.0" @@ -66,11 +69,7 @@ def test_analyze_tar_present(mock_head: MagicMock, pypi_package_json: MagicMock) pypi_package_json.get_latest_version.return_value = version pypi_package_json.component.version = None pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}} - pypi_package_json.pypi_registry.request_timeout = 100 - - inspector_link_mock = MagicMock() - inspector_link_mock.status_code = 200 - mock_head.return_value = inspector_link_mock + mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.FAIL, {version: [url, inspector_link_expected]}) @@ -79,8 +78,8 @@ def test_analyze_tar_present(mock_head: MagicMock, pypi_package_json: MagicMock) assert actual_result == expected_result -@patch("requests.head") -def test_analyze_whl_present(mock_head: MagicMock, pypi_package_json: MagicMock) -> None: +@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw") +def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None: """Test for when only .whl is present, so pass""" analyzer = WheelAbsenceAnalyzer() version = "0.1.0" @@ -123,11 +122,7 @@ def test_analyze_whl_present(mock_head: MagicMock, pypi_package_json: MagicMock) pypi_package_json.get_releases.return_value = release pypi_package_json.component.version = version pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}} - pypi_package_json.pypi_registry.request_timeout = 100 - - inspector_link_mock = MagicMock() - inspector_link_mock.status_code = 200 - mock_head.return_value = inspector_link_mock + mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes expected_result: tuple[HeuristicResult, dict] = (HeuristicResult.PASS, {version: [url, inspector_link_expected]}) @@ -136,8 +131,8 @@ def test_analyze_whl_present(mock_head: MagicMock, pypi_package_json: MagicMock) assert actual_result == expected_result -@patch("requests.head") -def test_analyze_both_present(mock_head: MagicMock, pypi_package_json: MagicMock) -> None: +@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw") +def test_analyze_both_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None: """Test for when both .tar.gz and .whl are present, so passed""" analyzer = WheelAbsenceAnalyzer() version = "0.1.0" @@ -209,11 +204,7 @@ def test_analyze_both_present(mock_head: MagicMock, pypi_package_json: MagicMock pypi_package_json.get_releases.return_value = release pypi_package_json.component.version = version pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}} - pypi_package_json.pypi_registry.request_timeout = 100 - - inspector_link_mock = MagicMock() - inspector_link_mock.status_code = 200 - mock_head.return_value = inspector_link_mock + mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes expected_result: tuple[HeuristicResult, dict] = ( HeuristicResult.PASS,