Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mypy: fix helper functions #1224

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,24 @@ exclude_also = [
# also don't complain about abstract methods (as they're also not run)
"@(abc\\.)?abstractmethod",
]

[tool.mypy]
check_untyped_defs = true
enable_error_code = "ignore-without-code"
exclude = [
"bin/",
"docker/",
"install/venv",
]
follow_imports = "silent"
ignore_missing_imports = true
install_types = true
no_error_summary = true
non_interactive = true
pretty = true
show_error_codes = true
show_error_context = true
strict_equality = true
warn_redundant_casts = true
warn_unreachable = true
warn_unused_ignores = true
15 changes: 7 additions & 8 deletions src/helperFunctions/data_conversion.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Any, AnyStr, TypeVar
from typing import TYPE_CHECKING, Any, AnyStr

if TYPE_CHECKING:
from collections.abc import Iterable

_KT = TypeVar('_KT') # Key type
_VT = TypeVar('_VT') # Value type
from helperFunctions.types import KT, UID, VT, CompId


def make_bytes(data: AnyStr | list[int]) -> bytes:
Expand Down Expand Up @@ -35,9 +34,9 @@ def make_unicode_string(code: Any) -> str:
return code.__str__()


def convert_uid_list_to_compare_id(uid_list: Iterable[str]) -> str:
def convert_uid_list_to_compare_id(uid_list: Iterable[UID]) -> CompId:
"""
Convert a list of UIDs to a compare ID (which is a unique string consisting of UIDs separated by semi-colons, used
Convert a list of UIDs to a compare ID (which is a unique string consisting of UIDs separated by semicolons, used
to identify a FACT `Firmware` or `FileObject` comparison).

:param uid_list: A list of `FileObject` or `Firmware` UIDs.
Expand All @@ -46,7 +45,7 @@ def convert_uid_list_to_compare_id(uid_list: Iterable[str]) -> str:
return ';'.join(sorted(uid_list))


def convert_compare_id_to_list(compare_id: str) -> list[str]:
def convert_compare_id_to_list(compare_id: CompId) -> list[UID]:
"""
Convert a compare ID back to a list of UIDs.

Expand All @@ -56,7 +55,7 @@ def convert_compare_id_to_list(compare_id: str) -> list[str]:
return compare_id.split(';')


def normalize_compare_id(compare_id: str) -> str:
def normalize_compare_id(compare_id: CompId) -> CompId:
"""
Sort the UIDs in a compare ID (so that it is unique) and return it.

Expand All @@ -67,7 +66,7 @@ def normalize_compare_id(compare_id: str) -> str:
return convert_uid_list_to_compare_id(uids)


def get_value_of_first_key(input_dict: dict[_KT, _VT]) -> _VT | None:
def get_value_of_first_key(input_dict: dict[KT, VT]) -> VT | None:
"""
Get the value of the first key in a dictionary. If the dict is empty, return `None`.

Expand Down
8 changes: 5 additions & 3 deletions src/helperFunctions/database.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

from contextlib import contextmanager
from typing import ContextManager, TypeVar
from typing import Iterator, TypeVar

DatabaseInterface = TypeVar('DatabaseInterface')
from storage.db_interface_base import ReadOnlyDbInterface

DatabaseInterface = TypeVar('DatabaseInterface', bound=ReadOnlyDbInterface)


@contextmanager
def get_shared_session(database: DatabaseInterface) -> ContextManager[DatabaseInterface]:
def get_shared_session(database: DatabaseInterface) -> Iterator[DatabaseInterface]:
with database.get_read_only_session():
yield database
4 changes: 3 additions & 1 deletion src/helperFunctions/docker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
from contextlib import suppress
from subprocess import CompletedProcess
Expand Down Expand Up @@ -65,7 +67,7 @@ def run_docker_container(
# We do not know the docker entrypoint so we just insert a generic "entrypoint"
command = kwargs.get('command', None)
if isinstance(command, str):
args = 'entrypoint' + command
args: list[str] | str = 'entrypoint' + command
elif isinstance(command, list):
args = ['entrypoint', *command]
else:
Expand Down
1 change: 1 addition & 0 deletions src/helperFunctions/fileSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def file_is_empty(file_path: Path) -> bool:
return False
except Exception as exception:
logging.error(f'Unexpected Exception: {type(exception)} {exception!s}')
return False


def get_config_dir():
Expand Down
21 changes: 16 additions & 5 deletions src/helperFunctions/hash.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
from __future__ import annotations

from hashlib import new
from typing import TYPE_CHECKING

import tlsh

from helperFunctions.data_conversion import make_bytes

if TYPE_CHECKING:
from typing import Any, AnyStr

def get_hash(hash_function: str, binary: bytes | str) -> str:
ELF_MIME_TYPES = [
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
]


def get_hash(hash_function: str, binary: AnyStr) -> str:
"""
Hashes binary with hash_function.

Expand All @@ -20,19 +31,19 @@ def get_hash(hash_function: str, binary: bytes | str) -> str:
return raw_hash.hexdigest()


def get_sha256(code: bytes | str) -> str:
def get_sha256(code: AnyStr) -> str:
return get_hash('sha256', code)


def get_md5(code: bytes | str) -> str:
def get_md5(code: AnyStr) -> str:
return get_hash('md5', code)


def get_tlsh_comparison(first, second):
def get_tlsh_comparison(first: str, second: str) -> int:
return tlsh.diff(first, second)


def normalize_lief_items(functions):
def normalize_lief_items(functions: list[Any]) -> list[str]:
"""
Shorthand to convert a list of objects to a list of strings
"""
Expand Down
9 changes: 5 additions & 4 deletions src/helperFunctions/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ class OperateInDirectory:
"""

def __init__(self, target_directory: str | Path, remove: bool = False):
self._current_working_dir = None
self._current_working_dir: str | None = None
self._target_directory = str(target_directory)
self._remove = remove

def __enter__(self):
self._current_working_dir = os.getcwd() # noqa: PTH109
self._current_working_dir = str(Path.cwd())
os.chdir(self._target_directory)

def __exit__(self, *args):
os.chdir(self._current_working_dir)
if self._current_working_dir:
os.chdir(self._current_working_dir)
if self._remove:
remove_folder(self._target_directory)

Expand All @@ -58,7 +59,7 @@ def remove_folder(folder_name: str):
raise InstallationError(exception) from None


def log_current_packages(packages: tuple[str], install: bool = True):
def log_current_packages(packages: tuple[str, ...], install: bool = True):
"""
Log which packages are installed or removed.

Expand Down
4 changes: 2 additions & 2 deletions src/helperFunctions/magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
_internal_symlink_magic = f'{get_src_dir()}/bin/internal_symlink_magic.mgc'
_magic_file = f'{_internal_symlink_magic}:{_fact_magic}:{_default_magic}'

_instances = {}
_instances: dict[int, pymagic.Magic] = {}


def _get_magic_instance(**kwargs):
"""Returns an instance of pymagic.Magic"""
# Dicts are not hashable but sorting and creating a tuple is a valid hash
# Dicts are not hashable, but sorting and creating a tuple is a valid hash
key = hash(tuple(sorted(kwargs.items())))
instance = _instances.get(key)
if instance is None:
Expand Down
72 changes: 38 additions & 34 deletions src/helperFunctions/object_conversion.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,63 @@
from objects.file import FileObject
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from objects.firmware import Firmware

if TYPE_CHECKING:
from objects.file import FileObject


def _add_firmware_only_fields(fo, meta):
def _get_firmware_only_fields(fo: Firmware) -> dict[str, Any]:
"""
Adds fields relevant for :class:`objects.firmware.Firmware` objects from
`fo` to `meta`
Get fields relevant for :class:`objects.firmware.Firmware` objects from `fo`

:param meta: The dictionary to add the fields to
:param fo: A :class:`objects.firmware.Firmware`
"""
if isinstance(fo, Firmware):
fo.root_uid = fo.uid
meta['device_name'] = fo.device_name
meta['device_class'] = fo.device_class
meta['device_part'] = fo.part
meta['vendor'] = fo.vendor
meta['version'] = fo.version
meta['release_date'] = fo.release_date
return {
'device_name': fo.device_name,
'device_class': fo.device_class,
'device_part': fo.part,
'vendor': fo.vendor,
'version': fo.version,
'release_date': fo.release_date,
}


def _add_file_object_only_fields(fo, meta):
def _get_file_object_only_fields(fo: FileObject) -> dict[str, Any]:
"""
Adds fields relevant for only :class:`objects.file.FileObject` but not
Firmware objects from `fo` to `meta`
Get fields relevant for only :class:`objects.file.FileObject` but not Firmware objects from `fo`

:param meta: The dictionary to add the fields to
:param fo: A :class:`objects.firmware.FileObject`
"""
if not isinstance(fo, Firmware):
meta['firmwares_including_this_file'] = list(fo.parent_firmware_uids)
meta['virtual_file_path'] = fo.virtual_file_path
return {
'firmwares_including_this_file': list(fo.parent_firmware_uids),
'virtual_file_path': fo.virtual_file_path,
}


def _add_general_information(fo, meta):
def _get_general_information(fo: FileObject) -> dict[str, Any]:
"""
Adds fields relevant for :class:`objects.file.FileObjects` and
:class:`objects.firmware.Firmware` from `fo` to `meta`
Get fields relevant for :class:`objects.file.FileObjects` and :class:`objects.firmware.Firmware` from `fo`

:param meta: The dictionary to add the fields to
:param fo: A :class:`objects.file.FileObject`
"""
meta['hid'] = fo.get_hid()
meta['size'] = fo.size
meta['number_of_included_files'] = len(fo.files_included) if fo.files_included else 0
meta['included_files'] = list(fo.files_included) if fo.files_included else []
meta['total_files_in_firmware'] = len(fo.list_of_all_included_files) if fo.list_of_all_included_files else 'unknown'
return {
'hid': fo.get_hid(),
'size': fo.size,
'number_of_included_files': len(fo.files_included) if fo.files_included else 0,
'included_files': list(fo.files_included) if fo.files_included else [],
'total_files_in_firmware': len(fo.list_of_all_included_files) if fo.list_of_all_included_files else 'unknown',
}


def create_meta_dict(fo: FileObject):
def create_meta_dict(fo: FileObject) -> dict[str, Any]:
"""
Creates a dictionary with the meta information contained in :class:`objects.file.FileObject` `fo`
"""
meta = {}
_add_firmware_only_fields(fo, meta)
_add_file_object_only_fields(fo, meta)
_add_general_information(fo, meta)
meta: dict[str, Any] = _get_general_information(fo)
if not isinstance(fo, Firmware):
meta.update(_get_file_object_only_fields(fo))
else:
meta.update(_get_firmware_only_fields(fo))
return meta
16 changes: 11 additions & 5 deletions src/helperFunctions/pdf.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from __future__ import annotations

import json
import logging
from pathlib import Path
from subprocess import CalledProcessError
from typing import TYPE_CHECKING

from common_helper_encoder import ReportEncoder
from docker.errors import DockerException
from docker.types import Mount

from helperFunctions.docker import run_docker_container
from helperFunctions.object_conversion import create_meta_dict
from objects.firmware import Firmware

if TYPE_CHECKING:
from pathlib import Path

from objects.firmware import Firmware


def build_pdf_report(firmware: Firmware, folder: Path) -> Path:
def build_pdf_report(firmware: Firmware, folder: Path) -> Path | None:
"""
Creates a pdf report for the given firmware by calling the fact_pdf_report docker container.

Expand Down Expand Up @@ -59,10 +65,10 @@ def _initialize_subfolder(folder: Path, firmware: Firmware) -> None:
(folder / 'data' / 'analysis.json').write_text(json.dumps(firmware.processed_analysis, cls=ReportEncoder))


def _find_pdf(folder: Path) -> Path:
def _find_pdf(folder: Path) -> Path | None:
pdf_path = None
for file_path in (folder / 'pdf').rglob('*.pdf'):
if pdf_path:
logging.warning(f'Indistinct pdf name. Found: {file_path.name}')
logging.warning(f'Indistinct pdf name. Found: {file_path.name}') # type: ignore[unreachable]
pdf_path = file_path
return pdf_path
Loading