Skip to content

Commit

Permalink
Merge pull request #19 from opeco17/upgrade-safety-to-v3
Browse files Browse the repository at this point in the history
Upgrade Safety version to v3
  • Loading branch information
opeco17 authored Mar 4, 2024
2 parents 2532782 + e7de400 commit 66fd80d
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 783 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Scanning 19 packages...

## Installation

The easiest way to install the `export` plugin is via the `plugin add` command of Poetry.
The easiest way to install the `audit` plugin is via the `plugin add` command of Poetry.

```bash
poetry plugin add poetry-audit-plugin
Expand All @@ -38,20 +38,36 @@ pip install poetry-audit-plugin
* `--json`: Export the result in JSON format.

* `--ignore-code`: Ignore some vulnerabilities IDs. Receive a list of IDs. For example:

```bash
poetry audit --ignore-code=CVE-2022-42969,CVE-2020-10684
```

* `--ignore-package`: Ignore some packages. Receive a list of packages. For example:

```bash
poetry audit --json --ignore-package=py,ansible-tower-cli
```

* `--proxy-protocol`, `--proxy-host`, `--proxy-port`: Proxy to access Safety DB. For example:

```bash
poetry audit --proxy-protocol=http --proxy-host=localhost --proxy-port=3128
```

* `--cache-sec`: How long Safety DB can be cached locally. For example:

```bash
poetry audit --cache-sec=60
```

## Exit codes

`poetry audit` will exit with a code indicating its status.

* `0`: Vulnerabilities were not found.
* `1`: One or more vulnerabilities were found.
* Others: Something wrong happened.

## Develop poetry-audit-plugin

Expand All @@ -69,6 +85,7 @@ Once you've done it, you can start developing poetry-audit-plugin. You can use t

```sh
cd tests/assets/no_vulnerabilities
poetry shell
poetry audit
```

Expand Down
164 changes: 145 additions & 19 deletions poetry_audit_plugin/command.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import copy
import json
import sys
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple

from cleo.helpers import option
from poetry.console.commands.command import Command

from poetry_audit_plugin import __version__
from poetry_audit_plugin.constants import (
EXIT_CODE_OK,
EXIT_CODE_OPTION_INVALID,
EXIT_CODE_VULNERABILITY_FOUND,
)
from poetry_audit_plugin.errors import SafetyDBAccessError, SafetyDBSessionBuildError
from poetry_audit_plugin.safety import (
Package,
Vulnerability,
VulnerablePackage,
build_safety_db_session,
check_vulnerable_packages,
suppress_vulnerable_packages,
)


Expand All @@ -19,17 +27,59 @@ class AuditCommand(Command):
description = "Check vulnerabilities in dependencies"

options = [
option("json", None, "Generate a JSON payload with the information of vulnerable packages.", flag=True),
option("ignore-code", None, "Ignore specified vulnerability codes", flag=False),
option("ignore-package", None, "Ignore specified packages", flag=False),
option(
long_name="json",
description="Generate a JSON payload with the information of vulnerable packages.",
flag=True,
),
option(
long_name="ignore-code",
description="Ignore specified vulnerability codes.",
flag=False,
),
option(
long_name="ignore-package",
description="Ignore specified packages.",
flag=False,
),
option(
long_name="proxy-protocol",
description="Protocol of proxy to access Safety DB.",
flag=False,
value_required=False,
default="http",
),
option(
long_name="proxy-host",
description="Host of proxy to access Safety DB.",
flag=False,
value_required=False,
),
option(
long_name="proxy-port",
description="Port of proxy to access Safety DB.",
flag=False,
value_required=False,
default="80",
),
option(
long_name="cache-sec",
description="How long Safety DB can be cached locally.",
flag=False,
value_required=False,
default="0",
),
]

def handle(self) -> None:
self.is_quiet = self.option("json")

self.line("<b># poetry audit report</b>")
self.line("")

self.validate_options()
self.validate_lock_file()

self.line("<b># poetry audit report</b>")
self.line("<info>Loading...</info>")

locked_repo = self.poetry.locker.locked_repository()
Expand All @@ -40,22 +90,45 @@ def handle(self) -> None:
self.line(f"<info>Scanning {len(packages)} packages...</info>")
self.line("")

all_vulnerable_packages = check_vulnerable_packages(packages)

ignored_packages: List[str] = self.option("ignore-package").split(",") if self.option("ignore-package") else []
ignored_codes: List[str] = self.option("ignore-code").split(",") if self.option("ignore-code") else []
is_ignore = bool(len(ignored_packages) or len(ignored_codes))
vulnerable_packages, amount_of_ignored_vulnerabilities = suppress_vulnerable_packages(
all_vulnerable_packages, ignored_packages, ignored_codes
)
try:
# TODO: Pass auth key to build_client_session function for advanced safety usage.
session = build_safety_db_session(
proxy_protocol=self.option("proxy-protocol"),
proxy_host=self.option("proxy-host"),
proxy_port=int(self.option("proxy-port")) if self.option("proxy-port") else None,
)
except SafetyDBSessionBuildError as e:
self.chatty_line_error(f"<error>Error occured while building Safety DB session.</error>")
self.chatty_line_error("")
self.chatty_line_error(str(e))
sys.exit(e.get_exit_code())
try:
all_vulnerable_packages = check_vulnerable_packages(
session, packages, int(self.option("cache-sec")) if self.option("cache-sec") else 0
)
except SafetyDBAccessError as e:
self.chatty_line_error(f"<error>Error occured while accessing Safety DB.</error>")
self.chatty_line_error("")
self.chatty_line_error(str(e))
sys.exit(e.get_exit_code())

vulnerable_packages, amount_of_ignored_vulnerabilities = self.filter_vulnerable_packages(
all_vulnerable_packages,
ignored_packages,
ignored_codes,
)
max_line_lengths = self.calculate_line_length(vulnerable_packages)
amount_of_vulnerable_packages = len(vulnerable_packages)
if self.option("json"):
json_report = self.get_json_report(vulnerable_packages)
self.chatty_line(json_report)
if amount_of_vulnerable_packages > 0:
sys.exit(1)
sys.exit(EXIT_CODE_VULNERABILITY_FOUND)
else:
sys.exit(EXIT_CODE_OK)
else:
amount_of_vulnerabilities = 0
for vulnerable_package in vulnerable_packages:
Expand All @@ -82,10 +155,10 @@ def handle(self) -> None:
self.line(
f"<error>{amount_of_vulnerabilities}</error> <b>vulnerabilities found in {amount_of_vulnerable_packages} packages</b>"
)
sys.exit(1)
sys.exit(EXIT_CODE_VULNERABILITY_FOUND)
else:
self.line("<b>Vulnerabilities not found</b> ✨✨")
sys.exit(0)
self.line("<b>No vulnerabilities found</b> ✨✨")
sys.exit(EXIT_CODE_OK)

def line(self, *args: Any, **kwargs: Any) -> None:
if not self.is_quiet:
Expand All @@ -99,13 +172,34 @@ def chatty_line(self, *args: Any, **kwargs: Any) -> None:
super().line(*args, **kwargs)

def chatty_line_error(self, *args: Any, **kwargs: Any) -> None:
super().line(*args, **kwargs)
super().line_error(*args, **kwargs)

def validate_options(self) -> None:
errors: List[str] = []
if self.option("proxy-host") and (not self.option("proxy-protocol") or not self.option("proxy-port")):
errors.append("proxy-protocol and proxy-port should not be empty when proxy-host is specified.")

if self.option("proxy-protocol") and (self.option("proxy-protocol") not in ["http", "https"]):
errors.append("proxy-protocol should be http or https.")

if self.option("proxy-port") and not self.option("proxy-port").isnumeric():
errors.append("proxy-port should be number.")

if self.option("cache-sec") and not self.option("cache-sec").isnumeric():
errors.append("cache-sec be number")

if errors:
self.chatty_line_error("<error>Command line option(s) are invalid</error>")
for error in errors:
self.chatty_line_error(error)
sys.exit(EXIT_CODE_OPTION_INVALID)

def validate_lock_file(self) -> None:
# Ref: https://github.com/python-poetry/poetry/blob/1.2.0b1/src/poetry/console/commands/export.py#L40
locker = self.poetry.locker
if not locker.is_locked():
self.line_error("<comment>The lock file does not exist. Locking.</comment>")
option = "quiet" if self.is_quiet() else None
option = "quiet" if self.is_quiet else None
self.call("lock", option)
self.line("")

Expand All @@ -131,8 +225,7 @@ def calculate_line_length(self, vulnerable_packages: List[VulnerablePackage]) ->
else:
line_length = len(getattr(vulnerability, key))

max_line_length = max_line_lengths[key]
if line_length > max_line_length:
if line_length > max_line_lengths[key]:
max_line_lengths[key] = line_length

return max_line_lengths
Expand All @@ -152,6 +245,39 @@ def get_json_report(self, vulnerable_packages: List[VulnerablePackage]) -> str:
}
return json.dumps(json_report_dict, indent=2)

def filter_vulnerable_packages(
self, vulnerable_packages: List[VulnerablePackage], ignored_packages: List[str], ignored_codes: List[str]
) -> Tuple[List[VulnerablePackage], int]:
filtered_vulnerable_packages: List[VulnerablePackage] = []
amount_of_ignored_vulnerabilities = 0

is_ignore_packages = len(ignored_packages) > 0
is_ignore_codes = len(ignored_codes) > 0

for vulnerable_package in vulnerable_packages:
filtered_vulnerable_package = copy.copy(vulnerable_package)
if is_ignore_packages:
if vulnerable_package.name in ignored_packages:
amount_of_ignored_vulnerabilities += len(vulnerable_package.vulnerabilities)
continue

if is_ignore_codes:
filtered_vulnerabilities: List[Vulnerability] = []
for vulnerability in vulnerable_package.vulnerabilities:
if vulnerability.cve not in ignored_codes:
filtered_vulnerabilities.append(vulnerability)
else:
amount_of_ignored_vulnerabilities += 1

if len(filtered_vulnerabilities):
filtered_vulnerable_package.vulnerabilities = filtered_vulnerabilities
else:
continue

filtered_vulnerable_packages.append(filtered_vulnerable_package)

return filtered_vulnerable_packages, amount_of_ignored_vulnerabilities


def factory():
return AuditCommand()
5 changes: 5 additions & 0 deletions poetry_audit_plugin/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
EXIT_CODE_OK = 0
EXIT_CODE_VULNERABILITY_FOUND = 1
EXIT_CODE_OPTION_INVALID = 64
EXIT_CODE_SAFETY_DB_SESSION_BUILD_ERROR = 65
EXIT_CODE_SAFETY_DB_ACCESS_ERROR = 66
22 changes: 22 additions & 0 deletions poetry_audit_plugin/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from poetry_audit_plugin.constants import (
EXIT_CODE_SAFETY_DB_ACCESS_ERROR,
EXIT_CODE_SAFETY_DB_SESSION_BUILD_ERROR,
)


class SafetyDBSessionBuildError(Exception):
def __init__(self, message) -> None:
self.message = message
super().__init__(self.message)

def get_exit_code(self) -> int:
return EXIT_CODE_SAFETY_DB_SESSION_BUILD_ERROR


class SafetyDBAccessError(Exception):
def __init__(self, message) -> None:
self.message = message
super().__init__(self.message)

def get_exit_code(self) -> int:
return EXIT_CODE_SAFETY_DB_ACCESS_ERROR
Loading

0 comments on commit 66fd80d

Please sign in to comment.