Skip to content

Commit

Permalink
chore: use Git object to get commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Selwyn-Smith <benselwynsmith@googlemail.com>
  • Loading branch information
benmss committed Feb 3, 2025
1 parent 1c562c8 commit 86ae015
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
39 changes: 21 additions & 18 deletions src/macaron/repo_finder/repo_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def find_repo(purl: PackageURL, check_latest_version: bool = True) -> tuple[str,
return found_repo, outcome

# Try to find the latest version repo.
logger.error("Could not find repo for PURL: %s", purl)
logger.debug("Could not find repo for PURL: %s", purl)
latest_version_purl = get_latest_purl_if_different(purl)
if not latest_version_purl:
logger.debug("Could not find newer PURL than provided: %s", purl)
Expand Down Expand Up @@ -217,13 +217,17 @@ def find_source(purl_string: str, input_repo: str | None, latest_version_fallbac
# Disable other loggers for cleaner output.
logging.getLogger("macaron.slsa_analyzer.analyzer").disabled = True

digest = ""
if defaults.getboolean("repofinder", "find_source_should_clone"):
# Clone the repo to retrieve the tags.
logger.debug("Preparing repo: %s", found_repo)
repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR)
logging.getLogger("macaron.slsa_analyzer.git_url").disabled = True
# The prepare_repo function will also check the latest version of the artifact if required.
_, _, digest = prepare_repo(repo_dir, found_repo, purl=purl, latest_version_fallback=not checked_latest_purl)
git_obj, _ = prepare_repo(repo_dir, found_repo, purl=purl, latest_version_fallback=not checked_latest_purl)

if git_obj:
digest = git_obj.get_head().hash

if not digest:
return False
Expand Down Expand Up @@ -380,7 +384,7 @@ def prepare_repo(
digest: str = "",
purl: PackageURL | None = None,
latest_version_fallback: bool = True,
) -> tuple[Git | None, CommitFinderInfo, str]:
) -> tuple[Git | None, CommitFinderInfo]:
"""Prepare the target repository for analysis.
If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``.
Expand Down Expand Up @@ -408,9 +412,8 @@ def prepare_repo(
Returns
-------
tuple[Git | None, CommitFinderInfo, str]
The pydriller.Git object of the repository or None if error; the outcome of the Commit Finder; and the final
digest.
tuple[Git | None, CommitFinderInfo]
The pydriller.Git object of the repository or None if error, and the outcome of the Commit Finder.
"""
# TODO: separate the logic for handling remote and local repos instead of putting them into this method.
logger.info(
Expand All @@ -428,7 +431,7 @@ def prepare_repo(
resolved_remote_path = get_remote_vcs_url(repo_path)
if not resolved_remote_path:
logger.error("The provided path to repo %s is not a valid remote path.", repo_path)
return None, commit_finder_outcome, digest
return None, commit_finder_outcome

git_service = get_git_service(resolved_remote_path)
repo_unique_path = get_repo_dir_name(resolved_remote_path)
Expand All @@ -438,7 +441,7 @@ def prepare_repo(
git_service.clone_repo(resolved_local_path, resolved_remote_path)
except CloneError as error:
logger.error("Cannot clone %s: %s", resolved_remote_path, str(error))
return None, commit_finder_outcome, digest
return None, commit_finder_outcome
else:
logger.info("Checking if the path to repo %s is a local path.", repo_path)
resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path)
Expand All @@ -448,29 +451,29 @@ def prepare_repo(
git_obj = Git(resolved_local_path)
except InvalidGitRepositoryError:
logger.error("No git repo exists at %s.", resolved_local_path)
return None, commit_finder_outcome, digest
return None, commit_finder_outcome
else:
logger.error("Error happened while preparing the repo.")
return None, commit_finder_outcome, digest
return None, commit_finder_outcome

if is_empty_repo(git_obj):
logger.error("The target repository does not have any commit.")
return None, commit_finder_outcome, digest
return None, commit_finder_outcome

# Find the digest if a version has been specified.
if not digest and purl and purl.version:
found_digest, commit_finder_outcome = find_commit(git_obj, purl)
if not found_digest:
logger.error("Could not map the input purl string to a specific commit in the corresponding repository.")
if not latest_version_fallback:
return None, commit_finder_outcome, digest
return None, commit_finder_outcome
# If the commit could not be found, check if the latest version of the artifact has a different repository.
latest_purl = get_latest_purl_if_different(purl)
if not latest_purl:
return None, commit_finder_outcome, digest
return None, commit_finder_outcome
latest_repo = get_latest_repo_if_different(latest_purl, repo_path)
if not latest_repo:
return None, commit_finder_outcome, digest
return None, commit_finder_outcome
return prepare_repo(latest_repo, latest_repo, target_dir, latest_version_fallback=False)

digest = found_digest
Expand All @@ -492,15 +495,15 @@ def prepare_repo(
# ``git_url.check_out_repo_target``.
if not check_out_repo_target(git_obj, branch_name, digest, not is_remote):
logger.error("Cannot checkout the specific branch or commit of the target repo.")
return None, commit_finder_outcome, digest
return None, commit_finder_outcome

return git_obj, commit_finder_outcome, digest
return git_obj, commit_finder_outcome

try:
git_service.check_out_repo(git_obj, branch_name, digest, not is_remote)
except RepoCheckOutError as error:
logger.error("Failed to check out repository at %s", resolved_local_path)
logger.error(error)
return None, commit_finder_outcome, digest
return None, commit_finder_outcome

return git_obj, commit_finder_outcome, digest
return git_obj, commit_finder_outcome
4 changes: 3 additions & 1 deletion src/macaron/slsa_analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,15 @@ def run_single(
commit_finder_outcome = CommitFinderInfo.NOT_USED
final_digest = analysis_target.digest
if analysis_target.repo_path:
git_obj, commit_finder_outcome, final_digest = prepare_repo(
git_obj, commit_finder_outcome = prepare_repo(
os.path.join(self.output_path, GIT_REPOS_DIR),
analysis_target.repo_path,
analysis_target.branch,
analysis_target.digest,
analysis_target.parsed_purl,
)
if git_obj:
final_digest = git_obj.get_head().hash

repo_finder_metadata = RepoFinderMetadata(
repo_finder_outcome=analysis_target.repo_finder_outcome,
Expand Down

0 comments on commit 86ae015

Please sign in to comment.