From c45308292b164cf69beeb625b3717ab337e1d5d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 24 Jan 2025 16:55:11 +0100 Subject: [PATCH] feat: converted qemu_exec plugin to v1 --- .../analysis/qemu_exec/code/qemu_exec.py | 396 ++++++++++-------- .../analysis/qemu_exec/routes/ajax_view.html | 107 +++-- .../analysis/qemu_exec/routes/routes.py | 13 +- .../qemu_exec/test/test_plugin_qemu_exec.py | 315 ++++++-------- .../analysis/qemu_exec/test/test_routes.py | 50 ++- .../analysis/qemu_exec/view/qemu_exec.html | 98 +++-- 6 files changed, 504 insertions(+), 475 deletions(-) diff --git a/src/plugins/analysis/qemu_exec/code/qemu_exec.py b/src/plugins/analysis/qemu_exec/code/qemu_exec.py index 9dff239b3..33db8998b 100644 --- a/src/plugins/analysis/qemu_exec/code/qemu_exec.py +++ b/src/plugins/analysis/qemu_exec/code/qemu_exec.py @@ -7,29 +7,32 @@ from base64 import b64decode, b64encode from collections import OrderedDict from concurrent.futures import Future, ThreadPoolExecutor +from contextlib import contextmanager from json import JSONDecodeError, loads from multiprocessing import Manager from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Optional from common_helper_files import get_binary_from_file, safe_rglob from docker.errors import DockerException from docker.types import Mount +from pydantic import BaseModel, Field from requests.exceptions import ReadTimeout +from semver import Version import config -from analysis.PluginBase import AnalysisBasePlugin +from analysis.plugin import AnalysisPluginV0, Tag from helperFunctions import magic from helperFunctions.docker import run_docker_container from helperFunctions.tag import TagColor from helperFunctions.uid import create_uid -from storage.fsorganizer import FSOrganizer from unpacker.unpack_base import UnpackBase if TYPE_CHECKING: - from objects.file import FileObject + from io import FileIO +PLUGIN_NAME = 'qemu_exec' TIMEOUT_IN_SECONDS = 15 EXECUTABLE = 'executable' EMPTY = '(no parameter)' @@ -37,181 +40,242 @@ QEMU_ERRORS = ['Unsupported syscall', 'Invalid ELF', 'uncaught target signal'] CONTAINER_TARGET_PATH = '/opt/firmware_root' +EXECUTABLE_TYPES = {'application/x-executable', 'application/x-pie-executable', 'application/x-sharedlib'} +FACT_EXTRACTION_FOLDER_NAME = 'fact_extracted' +ARCH_TO_BIN_DICT = OrderedDict( + [ + ('aarch64', ['aarch64']), + ('ARM', ['aarch64', 'arm', 'armeb']), + ('MIPS32', ['mipsel', 'mips', 'mipsn32', 'mipsn32el']), + ('MIPS64', ['mips64', 'mips64el']), + ('MIPS', ['mipsel', 'mips', 'mips64', 'mips64el', 'mipsn32', 'mipsn32el']), + ('80386', ['i386']), + ('80486', ['x86_64', 'i386']), + ('x86', ['x86_64', 'i386']), + ('PowerPC', ['ppc', 'ppc64', 'ppc64le']), + ('PPC', ['ppc', 'ppc64', 'ppc64le']), + ('Renesas SH', ['sh4', 'sh4eb']), + ] +) -class Unpacker(UnpackBase): - def __init__(self): - self.fs_organizer = FSOrganizer() - def unpack_fo(self, file_object: FileObject) -> TemporaryDirectory | None: - file_path = file_object.file_path if file_object.file_path else self._get_path_from_fo(file_object) +class Unpacker(UnpackBase): + @contextmanager + def unpack_file(self, file_path: str): if not file_path or not Path(file_path).is_file(): - logging.error(f'could not unpack {file_object.uid}: file path not found') - return None - - extraction_dir = TemporaryDirectory(prefix='FACT_plugin_qemu_exec', dir=config.backend.docker_mount_base_dir) - self.extract_files_from_file(file_path, extraction_dir.name) - return extraction_dir - - def _get_path_from_fo(self, file_object: FileObject) -> str: - return self.fs_organizer.generate_path(file_object) - - -class AnalysisPlugin(AnalysisBasePlugin): - NAME = 'qemu_exec' - DESCRIPTION = 'test binaries for executability in QEMU and display help if available' - VERSION = '0.5.2' - DEPENDENCIES = ['file_type'] # noqa: RUF012 - FILE = __file__ - - FILE_TYPES = ['application/x-executable', 'application/x-pie-executable', 'application/x-sharedlib'] # noqa: RUF012 - FACT_EXTRACTION_FOLDER_NAME = 'fact_extracted' - - arch_to_bin_dict = OrderedDict( # noqa: RUF012 - [ - ('aarch64', ['aarch64']), - ('ARM', ['aarch64', 'arm', 'armeb']), - ('MIPS32', ['mipsel', 'mips', 'mipsn32', 'mipsn32el']), - ('MIPS64', ['mips64', 'mips64el']), - ('MIPS', ['mipsel', 'mips', 'mips64', 'mips64el', 'mipsn32', 'mipsn32el']), - ('80386', ['i386']), - ('80486', ['x86_64', 'i386']), - ('x86', ['x86_64', 'i386']), - ('PowerPC', ['ppc', 'ppc64', 'ppc64le']), - ('PPC', ['ppc', 'ppc64', 'ppc64le']), - ('Renesas SH', ['sh4', 'sh4eb']), - ] + logging.error(f'could not unpack {file_path}: file not found') + yield None + return + base_dir = config.backend.docker_mount_base_dir + with TemporaryDirectory(prefix='FACT_plugin_qemu_exec', dir=base_dir) as extraction_dir: + self.extract_files_from_file(file_path, extraction_dir) + yield extraction_dir + + +class ParameterResult(BaseModel): + parameters: str = Field( + description=( + 'A CLI parameter or a comma-separated list of parameters (if multiple parameters produced the same ' + 'output).' + ) ) + return_code: int + stdout: str = Field(description='The STDOUT output of executing the file.') + stderr: str = Field(description='The STDERR output of executing the file.') + + @classmethod + def from_result(cls, parameter: str, result: dict): + return cls( + parameters=parameter, + return_code=result['return_code'], + stdout=result['stdout'], + stderr=result['stderr'], + ) - root_path = None - def __init__(self, *args, unpacker=None, **kwargs): - self.unpacker = Unpacker() if unpacker is None else unpacker - super().__init__(*args, **kwargs) - - def process_object(self, file_object: FileObject) -> FileObject: - if self.NAME not in file_object.processed_analysis: - file_object.processed_analysis[self.NAME] = {} - file_object.processed_analysis[self.NAME]['summary'] = [] - - if file_object.processed_analysis['file_type']['result']['mime'] in self.FILE_TYPES: - return self._process_included_binary(file_object) - return self._process_container(file_object) - - def _process_included_binary(self, file_object: FileObject) -> FileObject: - # File will get analyzed in the parent container - file_object.processed_analysis[self.NAME]['parent_flag'] = True - return file_object - - def _process_container(self, file_object: FileObject) -> FileObject: - if not file_object.files_included: - return file_object - - tmp_dir = self.unpacker.unpack_fo(file_object) - extracted_files_dir = self.unpacker.get_extracted_files_dir(tmp_dir.name) - - if extracted_files_dir.is_dir(): - try: - self.root_path = self._find_root_path(extracted_files_dir) - file_list = self._find_relevant_files(extracted_files_dir) - if file_list: - file_object.processed_analysis[self.NAME]['files'] = {} - self._process_included_files(file_list, file_object) - finally: - tmp_dir.cleanup() - - return file_object - - def _find_relevant_files(self, extracted_files_dir: Path): - result = [] - for path in safe_rglob(extracted_files_dir): - if path.is_file() and not path.is_symlink(): - file_type = { - 'full': magic.from_file(path.absolute(), mime=False), - 'mime': magic.from_file(path.absolute(), mime=True), - } - if self._has_relevant_type(file_type): - result.append((f'/{path.relative_to(Path(self.root_path))}', file_type['full'])) - return result - - def _find_root_path(self, extracted_files_dir: Path) -> Path: - root_path = extracted_files_dir - if (root_path / self.FACT_EXTRACTION_FOLDER_NAME).is_dir(): - # if there is a 'fact_extracted' folder in the tmp dir: reset root path to that folder - root_path /= self.FACT_EXTRACTION_FOLDER_NAME - return root_path - - def _has_relevant_type(self, file_type: dict): - if file_type is not None and file_type['mime'] in self.FILE_TYPES: - return True - return False +class ArchResult(BaseModel): + architecture: str = Field(description='QEMU system ISA that was used for trying to run the executable.') + parameter_results: List[ParameterResult] = Field( + description=( + 'The file is called with a list of different CLI parameters (no parameter, --help, etc.) and these ' + 'are the individual results for each parameter (or combination of parameters if multiple different' + 'parameters produced the same output).' + ) + ) + strace: Optional[str] = Field( + None, + description='A system call trace of executing the file (zlib compressed and base64 encoded to reduce size).', + ) + error: Optional[str] = None + + @classmethod + def from_arch_result(cls, arch: str, result_dict: dict) -> ArchResult: + return cls( + architecture=arch, + strace=result_dict.get('strace'), + error=result_dict.get('error'), + parameter_results=[ + ParameterResult.from_result(parameter, parameter_results) + for parameter, parameter_results in result_dict.items() + if parameter not in {'strace', 'error'} + ], + ) - def _process_included_files(self, file_list, file_object): - manager = Manager() - executor = ThreadPoolExecutor(max_workers=8) - results_dict = manager.dict() - - jobs = self._run_analysis_jobs(executor, file_list, file_object, results_dict) - for future in jobs: # wait for jobs to finish - future.result() - executor.shutdown(wait=False) - self._enter_results(dict(results_dict), file_object) - self._add_tag(file_object) - manager.shutdown() - - def _run_analysis_jobs( - self, - executor: ThreadPoolExecutor, - file_list: list[tuple[str, str]], - file_object: FileObject, - results_dict: dict, - ) -> list[Future]: - jobs = [] - for file_path, full_type in file_list: - uid = self._get_uid(file_path, self.root_path) - if self._analysis_not_already_completed(file_object, uid): - for arch_suffix in self._find_arch_suffixes(full_type): - jobs.append( - executor.submit(process_qemu_job, file_path, arch_suffix, self.root_path, results_dict, uid) - ) - return jobs - - def _analysis_not_already_completed(self, file_object, uid): - # file could be contained in the fo multiple times (but should be tested only once) - return uid not in file_object.processed_analysis[self.NAME]['files'] - @staticmethod - def _get_uid(file_path, root_path: Path): - return create_uid(get_binary_from_file(str(root_path / file_path[1:]))) +class FileResult(BaseModel): + is_executable: bool + path: str = Field(description='File path of the included file in this file (obtained through unpacking).') + uid: str + extended_results: List[ArchResult] = Field(description='Individual results for all tested architectures') + + @classmethod + def from_file_dict(cls, uid: str, file_result_dict: dict) -> FileResult: + return cls( + is_executable=file_result_dict[EXECUTABLE], + path=file_result_dict['path'], + uid=uid, + extended_results=[ + ArchResult.from_arch_result(arch, arch_result_dict) + for arch, arch_result_dict in file_result_dict['results'].items() + ], + ) - def _find_arch_suffixes(self, full_type): - for arch_string in self.arch_to_bin_dict: - if arch_string in full_type: - return self.arch_to_bin_dict[arch_string] - return [] - def _enter_results(self, results, file_object): - tmp = file_object.processed_analysis[self.NAME]['files'] = results - for uid in tmp: - tmp[uid][EXECUTABLE] = _valid_execution_in_results(tmp[uid]['results']) - file_object.processed_analysis['qemu_exec']['summary'] = self._get_summary(tmp) - - def _add_tag(self, file_object: FileObject): - result = file_object.processed_analysis[self.NAME]['files'] - if any(result[uid][EXECUTABLE] for uid in result): - self.add_analysis_tag( - file_object=file_object, - tag_name=self.NAME, - value='QEMU executable', - color=TagColor.BLUE, - propagate=True, +class AnalysisPlugin(AnalysisPluginV0): + class Schema(BaseModel): + parent_flag: bool = Field( + description=( + 'Flag that is true if the parent file of this file should contain results for this file (since results ' + 'are generated only for included files).' ) + ) + included_file_results: List[FileResult] = Field( + description='Results for individual included files (unpacked from this file).' + ) - @staticmethod - def _get_summary(results: dict): - if any(results[uid][EXECUTABLE] for uid in results): - return [EXECUTABLE] + def __init__(self, unpacker=None): + super().__init__( + metadata=self.MetaData( + name=PLUGIN_NAME, + description='test if included binaries can be executed with QEMU system and collect the output', + dependencies=['file_type'], + version=Version(1, 0, 0), + Schema=self.Schema, + ), + ) + self.unpacker = Unpacker() if unpacker is None else unpacker + + def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema: + del virtual_file_path + if analyses['file_type'].mime in EXECUTABLE_TYPES: + return self._process_included_binary() + return self._process_container(file_handle.name) + + def _process_included_binary(self) -> Schema: + # File should get analyzed when the parent file (container/file system/etc.) gets passed to this plugin + # for this file we set only a flag, so that the data is dynamically loaded in the template + return self.Schema( + parent_flag=True, + included_file_results=[], + ) + + def _process_container(self, file_path: str) -> Schema: + with self.unpacker.unpack_file(file_path) as extraction_dir: + return self.Schema( + parent_flag=False, + included_file_results=self._get_included_file_results(extraction_dir), + ) + + def _get_included_file_results(self, extraction_dir: str) -> list[FileResult]: + extracted_files_dir = self.unpacker.get_extracted_files_dir(extraction_dir) + if not extracted_files_dir.is_dir(): + return [] + root_path = _find_root_path(extracted_files_dir) + file_list = _find_relevant_files(extracted_files_dir, root_path) + if not file_list: + return [] + result_dict = _process_included_files(file_list, root_path) + return [FileResult.from_file_dict(uid, file_results) for uid, file_results in result_dict.items()] + + def summarize(self, result: Schema) -> list[str]: + return [EXECUTABLE] if self._results_contain_executable_file(result) else [] + + def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]: + del summary + if self._results_contain_executable_file(result): + return [ + Tag( + name=self.metadata.name, + value='QEMU executable', + propagate=True, + color=TagColor.BLUE, + ) + ] return [] + @staticmethod + def _results_contain_executable_file(results: Schema) -> bool: + return any(file.is_executable for file in results.included_file_results) + + +def _find_relevant_files(extracted_files_dir: Path, root_path: Path) -> list[tuple[str, str]]: + result = [] + for path in safe_rglob(extracted_files_dir): + if path.is_file() and not path.is_symlink(): + mime = magic.from_file(path.absolute(), mime=True) + if mime in EXECUTABLE_TYPES: + file_type = magic.from_file(path.absolute(), mime=False) + result.append((f'/{path.relative_to(root_path)}', file_type)) + return result + + +def _find_root_path(extracted_files_dir: Path) -> Path: + root_path = extracted_files_dir + if (root_path / FACT_EXTRACTION_FOLDER_NAME).is_dir(): + # if there is a 'fact_extracted' folder in the tmp dir: reset root path to that folder + root_path /= FACT_EXTRACTION_FOLDER_NAME + return root_path + + +def _process_included_files(file_list: list[tuple[str, str]], root_path: Path): + with Manager() as manager: + with ThreadPoolExecutor(max_workers=8) as executor: + shared_dict = manager.dict() + jobs = _run_analysis_jobs(executor, file_list, root_path, shared_dict) + for future in jobs: # wait for jobs to finish + future.result() + result_dict = shared_dict.copy() # convert to a regular dict so we can use it after shutting down the manager + for uid in shared_dict: + result_dict[uid].update({EXECUTABLE: _valid_execution_in_results(result_dict[uid]['results'])}) + return result_dict + + +def _run_analysis_jobs( + executor: ThreadPoolExecutor, + file_list: list[tuple[str, str]], + root_path: Path, + results_dict: dict, +) -> list[Future]: + jobs = [] + for file_path, full_type in file_list: + uid = _get_uid(file_path, root_path) + if uid not in results_dict: + # file could be contained in the fo multiple times (but should be tested only once) + for arch_suffix in _find_arch_suffixes(full_type): + jobs.append(executor.submit(process_qemu_job, file_path, arch_suffix, root_path, results_dict, uid)) + return jobs + + +def _get_uid(file_path: str, root_path: Path) -> str: + return create_uid(get_binary_from_file(str(root_path / file_path[1:]))) + + +def _find_arch_suffixes(full_type): + for arch_string in ARCH_TO_BIN_DICT: + if arch_string in full_type: + return ARCH_TO_BIN_DICT[arch_string] + return [] + def process_qemu_job(file_path: str, arch_suffix: str, root_path: Path, results_dict: dict, uid: str): result = check_qemu_executability(file_path, arch_suffix, root_path) @@ -315,7 +379,7 @@ def process_strace_output(docker_output: dict): # b64 + zip is still smaller than raw on average b64encode(zlib.compress(docker_output['strace']['stdout'].encode())).decode() if _strace_output_exists(docker_output) - else {} + else None ) diff --git a/src/plugins/analysis/qemu_exec/routes/ajax_view.html b/src/plugins/analysis/qemu_exec/routes/ajax_view.html index e39b451a2..27518d1fb 100644 --- a/src/plugins/analysis/qemu_exec/routes/ajax_view.html +++ b/src/plugins/analysis/qemu_exec/routes/ajax_view.html @@ -7,12 +7,12 @@ - {% for parent_uid, result in results.items() %} + {% for parent_uid, file_result in results.items() %} - {{ parent_uid | replace_uid_with_hid | safe }} -- {{ result['path'] }} - {% if result['executable'] %} + {{ parent_uid | replace_uid_with_hid | safe }} -- {{ file_result.path }} + {% if file_result.is_executable %} {% else %} @@ -23,77 +23,72 @@
- - - + + - {% if result['results'] %} - - - + {% endif %}
Executable in QEMU:{{ result['executable'] }}Executable in QEMU:{{ file_result.is_executable }}
Individual Results: - - {% if 'error' in result['results'] %} - - - {% else %} - {% for arch in result['results'] %} + {% if file_result.extended_results %} + + + - +
Error: {{ result['results']['error'] }}
Individual Results: + + {% for arch_result in file_result.extended_results %} - - {% if 'error' in result['results'][arch] %} + {% if arch_result.error %} - + {% else %} - {% for option in result['results'][arch] %} - {% if option != 'strace' %} - {% set option_result = result['results'][arch][option] %} - - - - - - - - - - - {% endif %} + {% for option_result in arch_result.parameter_results %} + + + + + + + + + + {% endfor %} {% endif %} - {% if 'strace' in result['results'][arch] %} - - + + - {% else %} - - {% endif %} {% endfor %} - {% endif %} -
- {{ arch }} + {% set row_count = (arch_result.parameter_results | length) * 3 %} + + {{ arch_result.architecture }} Error: {{ result['results'][arch]['error'] }}Error: {{ arch_result.error }}{{ option }}stdout - {% if option_result['stdout'] %} -
{{ '$ .' + result['path'] + ' ' + option + '\n' + option_result['stdout'] }}
- {% else %}Empty{% endif %} -
stderr - {% if option_result['stderr'] %}{{ option_result['stderr'] }}{% else %}Empty{% endif %} -
return code{{ option_result['return_code'] }}
{{ option_result.parameters }}stdout + {% if option_result.stdout %} +
{{ '$ .' + file_result.path + ' ' + option_result.parameters + '\n' + option_result.stdout }}
+ {% else %} + Empty + {% endif %} +
stderr + {% if option_result.stderr %} + {{ option_result.stderr }} + {% else %} + Empty + {% endif %} +
return code{{ option_result.return_code }}
strace - {% if result['results'][arch]['strace'] %} -
{{ result['results'][arch]['strace'] | decompress }}
- {% else %}Empty{% endif %} -
strace + {% if arch_result.strace %} +
{{ arch_result.strace | decompress }}
+ {% else %} + Empty + {% endif %} +
-
+
-
{% endfor %} + {% endif %} diff --git a/src/plugins/analysis/qemu_exec/routes/routes.py b/src/plugins/analysis/qemu_exec/routes/routes.py index e5249dc12..5f42f8d49 100644 --- a/src/plugins/analysis/qemu_exec/routes/routes.py +++ b/src/plugins/analysis/qemu_exec/routes/routes.py @@ -11,7 +11,7 @@ from web_interface.security.decorator import roles_accepted from web_interface.security.privileges import PRIVILEGES -from ..code.qemu_exec import AnalysisPlugin +from ..code.qemu_exec import PLUGIN_NAME VIEW_PATH = Path(__file__).absolute().parent / 'ajax_view.html' @@ -22,7 +22,7 @@ def get_analysis_results_for_included_uid(uid: str, db_interface: FrontEndDbInte this_fo = db.get_object(uid) if this_fo is not None: for parent_uid in this_fo.parents: - parent_results = _get_results_from_parent_fo(db.get_analysis(parent_uid, AnalysisPlugin.NAME), uid) + parent_results = _get_results_from_parent_fo(db.get_analysis(parent_uid, PLUGIN_NAME), uid) if parent_results: results[parent_uid] = parent_results return results @@ -32,10 +32,11 @@ def _get_results_from_parent_fo(analysis_entry: dict, uid: str): if ( analysis_entry is not None and analysis_entry['result'] is not None - and 'files' in analysis_entry['result'] - and uid in analysis_entry['result']['files'] + and 'included_file_results' in analysis_entry['result'] ): - return analysis_entry['result']['files'][uid] + for file_result in analysis_entry['result']['included_file_results']: + if file_result['uid'] == uid: + return file_result return None @@ -64,4 +65,4 @@ def get(self, uid): endpoint = self.ENDPOINTS[0][0] if not results: error_message(f'no results found for uid {uid}', endpoint, request_data={'uid': uid}) - return success_message({AnalysisPlugin.NAME: results}, endpoint, request_data={'uid': uid}) + return success_message({PLUGIN_NAME: results}, endpoint, request_data={'uid': uid}) diff --git a/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py b/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py index 2f3db1dfe..4efba2dd7 100644 --- a/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py +++ b/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py @@ -1,15 +1,18 @@ -import os +from __future__ import annotations + from base64 import b64decode, b64encode +from contextlib import contextmanager +from dataclasses import dataclass from pathlib import Path from subprocess import CompletedProcess -from unittest import TestCase +from tempfile import TemporaryDirectory import pytest from common_helper_files import get_dir_of_file from requests.exceptions import ConnectionError as RequestConnectionError from requests.exceptions import ReadTimeout -from test.common_helper import TEST_FW, create_test_firmware, get_test_data_dir +from test.common_helper import get_test_data_dir from test.mock import mock_patch from ..code import qemu_exec @@ -22,8 +25,8 @@ class MockTmpDir: - def __init__(self, name): - self.name = name + def __init__(self, name: Path | str): + self.name = str(name) def cleanup(self): pass @@ -32,8 +35,9 @@ def cleanup(self): class MockUnpacker: tmp_dir = None - def unpack_fo(self, _): - return self.tmp_dir + @contextmanager + def unpack_file(self, _): + yield self.tmp_dir.name def set_tmp_dir(self, tmp_dir): self.tmp_dir = tmp_dir @@ -82,149 +86,153 @@ def execute_docker_error(monkeypatch): # noqa: PT004 monkeypatch.setattr('docker.client.from_env', DockerClientMock) -@pytest.mark.AnalysisPluginTestConfig( - plugin_class=AnalysisPlugin, - init_kwargs={'unpacker': MockUnpacker()}, -) -class TestPluginQemuExec: - def test_has_relevant_type(self, analysis_plugin): - assert analysis_plugin._has_relevant_type(None) is False - assert analysis_plugin._has_relevant_type({'mime': 'foo'}) is False - assert analysis_plugin._has_relevant_type({'mime': 'application/x-executable'}) is True +@pytest.fixture +def _mock_unpacker(monkeypatch): + monkeypatch.setattr('plugins.analysis.qemu_exec.code.qemu_exec.Unpacker', MockUnpacker) - def test_find_relevant_files(self, analysis_plugin): - tmp_dir = MockTmpDir(str(TEST_DATA_DIR)) - analysis_plugin.root_path = tmp_dir.name - analysis_plugin.unpacker.set_tmp_dir(tmp_dir) - result = sorted(analysis_plugin._find_relevant_files(Path(tmp_dir.name))) - assert len(result) == 4 +def test_find_relevant_files(): + tmp_dir = MockTmpDir(str(TEST_DATA_DIR)) + result = sorted(qemu_exec._find_relevant_files(Path(tmp_dir.name), root_path=Path(tmp_dir.name))) + assert len(result) == 4 - path_list, mime_types = list(zip(*result)) - for path in ['/lib/ld.so.1', '/lib/libc.so.6', '/test_mips_static', '/usr/bin/test_mips']: - assert path in path_list - assert all('MIPS' in mime for mime in mime_types) + path_list, mime_types = list(zip(*result)) + for path in ['/lib/ld.so.1', '/lib/libc.so.6', '/test_mips_static', '/usr/bin/test_mips']: + assert path in path_list + assert all('MIPS' in mime for mime in mime_types) - def test_check_qemu_executability(self, analysis_plugin): - analysis_plugin.OPTIONS = ['-h'] - result = qemu_exec.check_qemu_executability('/test_mips_static', 'mips', TEST_DATA_DIR) - assert any('--help' in option for option in result) - option = [option for option in result if '--help' in option][0] - assert result[option]['stdout'] == 'Hello World\n' - assert result[option]['stderr'] == '' - assert result[option]['return_code'] == '0' - - result = qemu_exec.check_qemu_executability('/test_mips_static', 'i386', TEST_DATA_DIR) - assert result == {} - - def test_find_arch_suffixes(self, analysis_plugin): - mime_str = 'ELF 32-bit MSB executable, MIPS, MIPS32 rel2 version 1 (SYSV), statically linked' - result = analysis_plugin._find_arch_suffixes(mime_str) - assert result != [] - # the more specific architecture variants should be checked first - assert result == analysis_plugin.arch_to_bin_dict['MIPS32'] - assert result != analysis_plugin.arch_to_bin_dict['MIPS'] - - def test_find_arch_suffixes__unknown_arch(self, analysis_plugin): - mime_str = 'foo' - result = analysis_plugin._find_arch_suffixes(mime_str) - assert result == [] +def test_check_qemu_executability(): + result = qemu_exec.check_qemu_executability('/test_mips_static', 'mips', TEST_DATA_DIR) + assert any('--help' in option for option in result) + option = [option for option in result if '--help' in option][0] + assert result[option]['stdout'] == 'Hello World\n' + assert result[option]['stderr'] == '' + assert result[option]['return_code'] == '0' - @pytest.mark.timeout(10) - def test_process_included_files(self, analysis_plugin): - analysis_plugin.OPTIONS = ['-h'] - test_fw = create_test_firmware() - test_uid = '6b4142fa7e0a35ff6d10e18654be8ac5b778c3b5e2d3d345d1a01c2bcbd51d33_676340' - test_fw.processed_analysis[analysis_plugin.NAME] = result = {'files': {}} - file_list = [('/test_mips_static', '-MIPS32-')] - - analysis_plugin.root_path = Path(TEST_DATA_DIR) - analysis_plugin._process_included_files(file_list, test_fw) - assert result is not None - assert 'files' in result - assert test_uid in result['files'] - assert result['files'][test_uid]['executable'] is True + result = qemu_exec.check_qemu_executability('/test_mips_static', 'i386', TEST_DATA_DIR) + assert result == {} + + +def test_find_arch_suffixes(): + mime_str = 'ELF 32-bit MSB executable, MIPS, MIPS32 rel2 version 1 (SYSV), statically linked' + result = qemu_exec._find_arch_suffixes(mime_str) + assert result != [] + # the more specific architecture variants should be checked first + assert result == qemu_exec.ARCH_TO_BIN_DICT['MIPS32'] + assert result != qemu_exec.ARCH_TO_BIN_DICT['MIPS'] + + +def test_find_arch_suffixes__unknown_arch(): + mime_str = 'foo' + result = qemu_exec._find_arch_suffixes(mime_str) + assert result == [] + + +@pytest.mark.timeout(10) +def test_process_included_files(): + test_uid = '6b4142fa7e0a35ff6d10e18654be8ac5b778c3b5e2d3d345d1a01c2bcbd51d33_676340' + file_list = [('/test_mips_static', '-MIPS32-')] + result = qemu_exec._process_included_files(file_list, root_path=Path(TEST_DATA_DIR)) + assert test_uid in result + assert result[test_uid]['executable'] is True + + +@dataclass +class MockFileTypeResult: + mime: str + full: str + + +@dataclass +class MockFile: + name: str + + +MOCK_ANALYSES = {'file_type': MockFileTypeResult(mime='test_type', full='Not a PE file')} +MOCK_ANALYSES_EXECUTABLE = { + 'file_type': MockFileTypeResult(mime='application/x-executable', full='ELF 64-bit executable') +} + + +@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) +class TestPluginQemuExec: + @pytest.mark.usefixtures('_mock_unpacker') @pytest.mark.timeout(15) - def test_process_object(self, analysis_plugin): + def test_process_object(self, analysis_plugin: AnalysisPlugin): analysis_plugin.OPTIONS = ['-h'] - test_fw = self._set_up_fw_for_process_object(analysis_plugin) + analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(TEST_DATA_DIR)) - analysis_plugin.process_object(test_fw) - result = test_fw.processed_analysis[analysis_plugin.NAME] - assert 'files' in result - assert len(result['files']) == 4 - assert any(result['files'][uid]['executable'] for uid in result['files']) + result = analysis_plugin.analyze(MockFile(name=''), {}, MOCK_ANALYSES) + assert len(result.included_file_results) == 4 + assert any(file.is_executable for file in result.included_file_results) + paths = sorted(file.path for file in result.included_file_results) + assert paths == ['/lib/ld.so.1', '/lib/libc.so.6', '/test_mips_static', '/usr/bin/test_mips'] + summary = analysis_plugin.summarize(result) + assert summary == [EXECUTABLE] + + @pytest.mark.usefixtures('_mock_unpacker') @pytest.mark.timeout(15) - def test_process_object__with_extracted_folder(self, analysis_plugin): + def test_process_object__with_extracted_folder(self, analysis_plugin: AnalysisPlugin): analysis_plugin.OPTIONS = ['-h'] - test_fw = self._set_up_fw_for_process_object(analysis_plugin, path=TEST_DATA_DIR_2) + analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(TEST_DATA_DIR_2)) test_file_uid = '68bbef24a7083ca2f5dc93f1738e62bae73ccbd184ea3e33d5a936de1b23e24c_8020' - analysis_plugin.process_object(test_fw) - result = test_fw.processed_analysis[analysis_plugin.NAME] - assert 'files' in result - assert len(result['files']) == 3 - assert result['files'][test_file_uid]['executable'] is True + result = analysis_plugin.analyze(MockFile(name=''), {}, MOCK_ANALYSES) + assert len(result.included_file_results) == 3 + file_result_by_uid = {file.uid: file for file in result.included_file_results} + assert file_result_by_uid[test_file_uid].is_executable is True + @pytest.mark.usefixtures('_mock_unpacker') @pytest.mark.timeout(10) def test_process_object__error(self, analysis_plugin): - test_fw = self._set_up_fw_for_process_object(analysis_plugin, path=TEST_DATA_DIR / 'usr') - - analysis_plugin.process_object(test_fw) - result = test_fw.processed_analysis[analysis_plugin.NAME] - - assert 'files' in result - assert any(result['files'][uid]['executable'] for uid in result['files']) is False + analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(TEST_DATA_DIR / 'usr')) + result = analysis_plugin.analyze(MockFile(name=''), {}, MOCK_ANALYSES) + summary = analysis_plugin.summarize(result) + + assert len(result.included_file_results) == 1 + file_result = result.included_file_results[0] + assert file_result.is_executable is False + assert len(file_result.extended_results) == 1 + arch_result = file_result.extended_results[0] + assert arch_result.architecture == 'mips' assert all( - "/lib/ld.so.1': No such file or directory" in result['files'][uid]['results']['mips'][option]['stderr'] - for uid in result['files'] - for option in result['files'][uid]['results']['mips'] - if option != 'strace' + "/lib/ld.so.1': No such file or directory" in parameter_result.stderr + for parameter_result in arch_result.parameter_results ) + assert summary == [] @pytest.mark.timeout(10) - @pytest.mark.usefixtures('execute_docker_error') + @pytest.mark.usefixtures('_mock_unpacker', 'execute_docker_error') def test_process_object__timeout(self, analysis_plugin): - test_fw = self._set_up_fw_for_process_object(analysis_plugin) + analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(TEST_DATA_DIR / 'usr')) + result = analysis_plugin.analyze(MockFile(name=''), {}, MOCK_ANALYSES) - analysis_plugin.process_object(test_fw) - result = test_fw.processed_analysis[analysis_plugin.NAME] - - assert 'files' in result - assert all( - arch_results['error'] == 'timeout' - for uid in result['files'] - for arch_results in result['files'][uid]['results'].values() - ) - assert all(result['files'][uid]['executable'] is False for uid in result['files']) + assert len(result.included_file_results) == 1 + file_result = result.included_file_results[0] + assert file_result.is_executable is False + assert all(arch_results.error == 'timeout' for arch_results in file_result.extended_results) + @pytest.mark.usefixtures('_mock_unpacker') @pytest.mark.timeout(10) def test_process_object__no_files(self, analysis_plugin): - test_fw = create_test_firmware() - test_fw.files_included = [] + with TemporaryDirectory() as tmp_dir: + analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(tmp_dir)) + result = analysis_plugin.analyze(MockFile(name=''), {}, MOCK_ANALYSES) + summary = analysis_plugin.summarize(result) - analysis_plugin.process_object(test_fw) - assert analysis_plugin.NAME in test_fw.processed_analysis - assert test_fw.processed_analysis[analysis_plugin.NAME] == {'summary': []} + assert len(result.included_file_results) == 0 + assert summary == [] + @pytest.mark.usefixtures('_mock_unpacker') @pytest.mark.timeout(10) def test_process_object__included_binary(self, analysis_plugin): - test_fw = create_test_firmware() - test_fw.processed_analysis['file_type']['result']['mime'] = analysis_plugin.FILE_TYPES[0] - - analysis_plugin.process_object(test_fw) - assert analysis_plugin.NAME in test_fw.processed_analysis - assert 'parent_flag' in test_fw.processed_analysis[analysis_plugin.NAME] - assert test_fw.processed_analysis[analysis_plugin.NAME]['parent_flag'] is True - - def _set_up_fw_for_process_object(self, analysis_plugin, path: Path = TEST_DATA_DIR): - test_fw = create_test_firmware() - test_fw.files_included = ['foo', 'bar'] - analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(str(path))) - return test_fw + analysis_plugin.unpacker.set_tmp_dir(MockTmpDir(TEST_DATA_DIR)) + result = analysis_plugin.analyze(MockFile(name=''), {}, MOCK_ANALYSES_EXECUTABLE) + assert result.parent_flag is True + assert len(result.included_file_results) == 0 def test_get_docker_output__static(): @@ -293,19 +301,6 @@ def test_process_qemu_job(): } -@pytest.mark.parametrize( - ('input_data', 'expected_output'), - [ - ({}, []), - ({'foo': {EXECUTABLE: False}}, []), - ({'foo': {EXECUTABLE: False}, 'bar': {EXECUTABLE: True}}, [EXECUTABLE]), - ], -) -def test_get_summary(input_data, expected_output): - result = qemu_exec.AnalysisPlugin._get_summary(input_data) - assert result == expected_output - - @pytest.mark.parametrize( ('input_data', 'expected_output'), [ @@ -410,7 +405,7 @@ def test_decode_output_values(input_data, expected_output): ) def test_process_strace_output__no_strace(input_data): qemu_exec.process_strace_output(input_data) - assert input_data['strace'] == {} + assert input_data['strace'] is None def test_process_strace_output(): @@ -421,58 +416,18 @@ def test_process_strace_output(): assert b64decode(result)[:2].hex() == '789c' # magic string for zlib compressed data -class TestQemuExecUnpacker(TestCase): - def setUp(self): +class TestQemuExecUnpacker: + def setup_method(self): self.name_prefix = 'FACT_plugin_qemu' self.unpacker = qemu_exec.Unpacker() - qemu_exec.FSOrganizer = MockFSOrganizer def test_unpack_fo(self): - test_fw = create_test_firmware() - tmp_dir = self.unpacker.unpack_fo(test_fw) - - try: - assert self.name_prefix in tmp_dir.name - content = os.listdir(str(Path(tmp_dir.name, 'files'))) - assert content != [] - assert 'get_files_test' in content - finally: - tmp_dir.cleanup() - - def test_unpack_fo__no_file_path(self): - test_fw = create_test_firmware() - test_fw.file_path = None - - with mock_patch(self.unpacker.fs_organizer, 'generate_path', lambda _: TEST_FW.file_path): - tmp_dir = self.unpacker.unpack_fo(test_fw) - - try: - assert self.name_prefix in tmp_dir.name - content = os.listdir(str(Path(tmp_dir.name, 'files'))) + with self.unpacker.unpack_file(get_test_data_dir() / 'container/test.zip') as tmp_dir: + assert self.name_prefix in tmp_dir + content = [p.name for p in Path(tmp_dir, 'files').iterdir()] assert content != [] assert 'get_files_test' in content - finally: - tmp_dir.cleanup() def test_unpack_fo__path_not_found(self): - test_fw = create_test_firmware() - test_fw.file_path = 'foo/bar' - tmp_dir = self.unpacker.unpack_fo(test_fw) - - assert tmp_dir is None - - def test_unpack_fo__binary_not_found(self): - test_fw = create_test_firmware() - test_fw.uid = 'foo' - test_fw.file_path = None - tmp_dir = self.unpacker.unpack_fo(test_fw) - - assert tmp_dir is None - - -class MockFSOrganizer: - @staticmethod - def generate_path(fo): - if fo.uid != 'foo': - return str(get_test_data_dir() / 'container/test.zip') - return None + with self.unpacker.unpack_file('foo/bar') as tmp_dir: + assert tmp_dir is None diff --git a/src/plugins/analysis/qemu_exec/test/test_routes.py b/src/plugins/analysis/qemu_exec/test/test_routes.py index d54da2fe6..3b67d45b0 100644 --- a/src/plugins/analysis/qemu_exec/test/test_routes.py +++ b/src/plugins/analysis/qemu_exec/test/test_routes.py @@ -4,7 +4,7 @@ from test.common_helper import create_test_file_object, create_test_firmware -from ..code.qemu_exec import AnalysisPlugin +from ..code.qemu_exec import PLUGIN_NAME from ..routes import routes @@ -17,26 +17,33 @@ class DbInterfaceMock: def __init__(self): self.fw = create_test_firmware() self.fw.uid = 'parent_uid' - self.fw.processed_analysis[AnalysisPlugin.NAME] = { + self.fw.processed_analysis[PLUGIN_NAME] = { 'result': { - 'files': { - 'foo': {'executable': False}, - 'bar': { + 'included_file_results': [ + {'uid': 'foo', 'executable': False}, + { + 'uid': 'bar', 'executable': True, 'path': '/some/path', - 'results': { + 'extended_results': { 'some-arch': { '-h': {'stdout': 'stdout result', 'stderr': 'stderr result', 'return_code': '1337'} } }, }, - 'error-outside': {'executable': False, 'path': '/some/path', 'results': {'error': 'some error'}}, - 'error-inside': { + { + 'uid': 'error-outside', 'executable': False, 'path': '/some/path', - 'results': {'some-arch': {'error': 'some error'}}, + 'extended_results': {'error': 'some error'}, }, - } + { + 'uid': 'error-inside', + 'executable': False, + 'path': '/some/path', + 'extended_results': {'some-arch': {'error': 'some error'}}, + }, + ], } } @@ -55,7 +62,7 @@ def get_analysis(self, uid, plugin): if uid == self.fo.uid: return self.fo.processed_analysis.get(plugin) if uid == self.fw.uid: - return self.fw.processed_analysis[AnalysisPlugin.NAME] + return self.fw.processed_analysis[PLUGIN_NAME] return None def shutdown(self): @@ -72,15 +79,16 @@ def test_get_results_for_included(self): assert result is not None assert result != {} assert 'parent_uid' in result - assert result['parent_uid'] == {'executable': False} + assert result['parent_uid']['executable'] is False def test_get_results_from_parent_fo(self): - analysis_result = {'executable': False} - result = routes._get_results_from_parent_fo({'result': {'files': {'foo': analysis_result}}}, 'foo') - assert result == analysis_result + expected_result = {'uid': 'foo', 'executable': False} + analysis = {'result': {'included_file_results': [expected_result]}} + result = routes._get_results_from_parent_fo(analysis, 'foo') + assert result == expected_result def test_no_results_from_parent(self): - result = routes._get_results_from_parent_fo({'result': {}}, 'foo') + result = routes._get_results_from_parent_fo({'result': {'included_file_results': []}}, 'foo') assert result is None @@ -138,11 +146,11 @@ def setup_method(self): def test_get_rest(self): result = self.test_client.get('/plugins/qemu_exec/rest/foo').json - assert AnalysisPlugin.NAME in result - assert 'parent_uid' in result[AnalysisPlugin.NAME] - assert result[AnalysisPlugin.NAME]['parent_uid'] == {'executable': False} + assert PLUGIN_NAME in result + assert 'parent_uid' in result[PLUGIN_NAME] + assert result[PLUGIN_NAME]['parent_uid'] == {'executable': False} def test_get_rest_no_result(self): result = self.test_client.get('/plugins/qemu_exec/rest/not_found').json - assert AnalysisPlugin.NAME in result - assert result[AnalysisPlugin.NAME] == {} + assert PLUGIN_NAME in result + assert result[PLUGIN_NAME] == {} diff --git a/src/plugins/analysis/qemu_exec/view/qemu_exec.html b/src/plugins/analysis/qemu_exec/view/qemu_exec.html index 573cefdbf..a04d4583f 100644 --- a/src/plugins/analysis/qemu_exec/view/qemu_exec.html +++ b/src/plugins/analysis/qemu_exec/view/qemu_exec.html @@ -14,18 +14,18 @@ {% block analysis_result_details %} - {% if 'files' in analysis_result %} + {% if analysis_result.included_file_results %} Results for Included Files - {% for uid, result in analysis_result['files'].items() %} + {% for file_result in analysis_result.included_file_results %} - + - {{ result['path'] }} - {% if result['executable'] %} + {{ file_result.path }} + {% if file_result.is_executable %} {% else %} @@ -35,61 +35,67 @@ -
+
- - + + - {% if result['results'] %} + {% if file_result.extended_results %}
Executable in QEMU:{{ result['executable'] }}Executable in QEMU:{{ file_result.is_executable }}
Individual Results: - {% for arch in result['results'] %} + {% for arch_result in file_result.extended_results %} - {% if "strace" in result['results'][arch] %} - - {% for option in result['results'][arch] %} - {% if option != 'strace' %} - {% set option_result = result['results'][arch][option] %} - - - - - - - - - - - {% endif %} + {% for option_result in arch_result.parameter_results %} + + + + + + + + + + + + + {% endfor %} - {% if 'strace' in result['results'][arch] %} - - - + {% if arch_result.strace != None %} + + + {% else %} - + {% endif %} {% endfor %}
- {% else %} - + {% set rowspan = (arch_result.parameter_results | length) * 3 %} + {% if arch_result.strace != None %} + {% set rowspan = rowspan + 1 %} {% endif %} - {{ arch }} + + {{ arch_result.architecture }} {{ option }}stdout - {% if option_result['stdout'] %} -
{{ '$ .' + result['path'] + ' ' + option + '\n' + option_result['stdout'] }}
- {% else %}Empty{% endif %} -
stderr - {% if option_result['stderr'] %} -
{{ '$ .' + result['path'] + ' ' + option + '\n' + option_result['stderr'] }}
- {% else %}Empty{% endif %} -
return code{{ option_result['return_code'] }}
{{ option_result.parameters }}stdout + {% if option_result.stdout %} +
{{ '$ .' + file_result.path + ' ' + option_result.parameters + '\n' + option_result.stdout }}
+ {% else %} + Empty + {% endif %} +
stderr + {% if option_result['stderr'] %} +
{{ '$ .' + file_result.path + ' ' + option_result.parameters + '\n' + option_result.stderr }}
+ {% else %} + Empty + {% endif %} +
return code{{ option_result.return_code }}
strace - {% if result['results'][arch]['strace'] %} -
{{ result['results'][arch]['strace'] | decompress }}
- {% else %}Empty{% endif %} -
strace + {% if arch_result.strace %} +
{{ arch_result.strace | decompress }}
+ {% else %} + Empty + {% endif %} +