From 1a39c885d403137f09019dde01cf5f51c3081e1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 5 Sep 2024 12:36:55 +0200 Subject: [PATCH] feat: converted hash plugin to new base class also moved methods from helperFunctions.hash that were used exclusively in the plugin into the plugin --- src/helperFunctions/hash.py | 81 +-------- src/plugins/analysis/hash/code/hash.py | 159 ++++++++++++++---- .../analysis/hash/test/test_plugin_hash.py | 73 ++++++-- src/plugins/analysis/hash/view/hash.html | 14 ++ src/test/unit/helperFunctions/test_hash.py | 49 ------ 5 files changed, 201 insertions(+), 175 deletions(-) create mode 100644 src/plugins/analysis/hash/view/hash.html diff --git a/src/helperFunctions/hash.py b/src/helperFunctions/hash.py index a64e8451e8..acbd870b94 100644 --- a/src/helperFunctions/hash.py +++ b/src/helperFunctions/hash.py @@ -1,108 +1,39 @@ from __future__ import annotations -import contextlib -import logging -import sys -from hashlib import md5, new -from typing import TYPE_CHECKING +from hashlib import new -import lief -import ssdeep import tlsh from helperFunctions.data_conversion import make_bytes -if TYPE_CHECKING: - from objects.file import FileObject -ELF_MIME_TYPES = [ - 'application/x-executable', - 'application/x-object', - 'application/x-pie-executable', - 'application/x-sharedlib', -] - - -def get_hash(hash_function, binary): +def get_hash(hash_function: str, binary: bytes | str) -> str: """ Hashes binary with hash_function. :param hash_function: The hash function to use. See hashlib for more :param binary: The data to hash, either as string or array of Integers - :return: The hash as hexstring + :return: The hash as hex string """ - binary = make_bytes(binary) raw_hash = new(hash_function) - raw_hash.update(binary) + raw_hash.update(make_bytes(binary)) return raw_hash.hexdigest() -def get_sha256(code): +def get_sha256(code: bytes | str) -> str: return get_hash('sha256', code) -def get_md5(code): +def get_md5(code: bytes | str) -> str: return get_hash('md5', code) -def get_ssdeep(code): - binary = make_bytes(code) - raw_hash = ssdeep.Hash() - raw_hash.update(binary) - return raw_hash.digest() - - -def get_tlsh(code): - tlsh_hash = tlsh.hash(make_bytes(code)) - return tlsh_hash if tlsh_hash != 'TNULL' else '' - - def get_tlsh_comparison(first, second): return tlsh.diff(first, second) -def get_imphash(file_object: FileObject) -> str | None: - """ - Generates and returns the md5 hash of the (sorted) imported functions of an ELF file represented by `file_object`. - Returns `None` if there are no imports or if an exception occurs. - - :param file_object: The FileObject of which the imphash shall be computed - """ - if _is_elf_file(file_object): - try: - with _suppress_stdout(): - functions = [f.name for f in lief.ELF.parse(file_object.file_path).imported_functions] - if functions: - return md5(','.join(sorted(functions)).encode()).hexdigest() - except Exception: - logging.exception(f'Could not compute imphash for {file_object.file_path}') - return None - - -def _is_elf_file(file_object: FileObject) -> bool: - return file_object.processed_analysis['file_type']['result']['mime'] in ELF_MIME_TYPES - - def normalize_lief_items(functions): """ Shorthand to convert a list of objects to a list of strings """ return [str(function) for function in functions] - - -class _StandardOutWriter: - def write(self, _): - pass - - -@contextlib.contextmanager -def _suppress_stdout(): - """A context manager that suppresses any output to stdout and stderr.""" - writer = _StandardOutWriter() - - stdout, stderr = sys.stdout, sys.stderr - sys.stdout, sys.stderr = writer, writer - - yield - - sys.stdout, sys.stderr = stdout, stderr diff --git a/src/plugins/analysis/hash/code/hash.py b/src/plugins/analysis/hash/code/hash.py index bab8a5ecb4..5278e93111 100644 --- a/src/plugins/analysis/hash/code/hash.py +++ b/src/plugins/analysis/hash/code/hash.py @@ -1,43 +1,134 @@ +from __future__ import annotations + +import contextlib import logging +import sys from hashlib import algorithms_guaranteed +from typing import TYPE_CHECKING, Optional + +import lief +import ssdeep +import tlsh +from pydantic import BaseModel, Field +from semver import Version import config -from analysis.PluginBase import AnalysisBasePlugin -from helperFunctions.hash import get_hash, get_imphash, get_ssdeep, get_tlsh +from analysis.plugin import AnalysisPluginV0 +from analysis.plugin.compat import AnalysisBasePluginAdapterMixin +from helperFunctions.hash import get_hash, get_md5 + +if TYPE_CHECKING: + from io import FileIO + +ELF_MIME_TYPES = [ + 'application/x-executable', + 'application/x-object', + 'application/x-pie-executable', + 'application/x-sharedlib', +] + + +class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): + class Schema(BaseModel): + # The supported hashes are the ones from helperFunctions.hash and hashlib (except shake which does not make + # much sense with its variable length). If they are not supported on the platform or not selected in the + # configuration of the plugin, the value will be `None`. Only the md5 and sha256 hashes are guaranteed to be + # available (since they are needed down the line) + + # from hashlib + md5: str = Field(description="md5 hash of the file's content") + sha256: str = Field(description="sha256 hash of the file's content") + sha1: Optional[str] = Field(description="sha1 hash of the file's content", default=None) + sha224: Optional[str] = Field(description="sha224 hash of the file's content", default=None) + sha384: Optional[str] = Field(description="sha384 hash of the file's content", default=None) + sha512: Optional[str] = Field(description="sha512 hash of the file's content", default=None) + blake2b: Optional[str] = Field(description="blake2b hash of the file's content", default=None) + blake2s: Optional[str] = Field(description="blake2s hash of the file's content", default=None) + sha3_224: Optional[str] = Field(description="sha3_224 hash of the file's content", default=None) + sha3_256: Optional[str] = Field(description="sha3_256 hash of the file's content", default=None) + sha3_384: Optional[str] = Field(description="sha3_384 hash of the file's content", default=None) + sha3_512: Optional[str] = Field(description="sha3_512 hash of the file's content", default=None) + + # from helperFunctions.hash + ssdeep: Optional[str] = Field(description="ssdeep hash of the file's content", default=None) + tlsh: Optional[str] = Field(description="tlsh hash of the file's content", default=None) + imphash: Optional[str] = Field( + description="import hash of the executable's imported functions (only for ELF files)", + default=None, + ) + + def __init__(self): + super().__init__( + metadata=self.MetaData( + name='file_hashes', + description='calculate different hash values of the file', + version=Version(1, 3, 0), + dependencies=['file_type'], + Schema=self.Schema, + ), + ) + configured_hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', []) + self.hashes_to_create = set(configured_hashes).union({'sha256', 'md5'}) + + def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema: + del virtual_file_path + result = {} + + file_handle.seek(0) + file_contents = file_handle.read() + for hash_ in self.hashes_to_create.intersection(algorithms_guaranteed): + result[hash_] = get_hash(hash_, file_contents) + result['ssdeep'] = get_ssdeep(file_contents) + result['imphash'] = get_imphash(file_handle, analyses.get('file_type')) + result['tlsh'] = get_tlsh(file_contents) + + return self.Schema(**result) -class AnalysisPlugin(AnalysisBasePlugin): +def get_imphash(file: FileIO, type_analysis: BaseModel | None) -> str | None: """ - This Plugin creates several hashes of the file + Generates and returns the md5 hash of the (sorted) imported functions of an ELF file represented by `file_object`. + Returns `None` if there are no imports or if an exception occurs. """ + if type_analysis is not None and _is_elf_file(type_analysis): + try: + with suppress_stdout(): + functions = [f.name for f in lief.ELF.parse(file).imported_functions] + if functions: + return get_md5(','.join(sorted(functions))) + except Exception: + logging.exception(f'Could not compute imphash for {file}') + return None + + +def _is_elf_file(type_analysis: BaseModel) -> bool: + return type_analysis.mime in ELF_MIME_TYPES + + +class _StandardOutWriter: + def write(self, _): + pass + + +@contextlib.contextmanager +def suppress_stdout(): + """A context manager that suppresses any output to stdout and stderr.""" + writer = _StandardOutWriter() + + stdout, stderr = sys.stdout, sys.stderr + sys.stdout, sys.stderr = writer, writer + + yield + + sys.stdout, sys.stderr = stdout, stderr + + +def get_ssdeep(file_contents: bytes) -> str: + raw_hash = ssdeep.Hash() + raw_hash.update(file_contents) + return raw_hash.digest() + - NAME = 'file_hashes' - DEPENDENCIES = ['file_type'] # noqa: RUF012 - DESCRIPTION = 'calculate different hash values of the file' - VERSION = '1.2' - FILE = __file__ - - def additional_setup(self): - hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', ['sha256']) - self.hashes_to_create = hashes - - def process_object(self, file_object): - """ - This function must be implemented by the plugin. - Analysis result must be a dict stored in file_object.processed_analysis[self.NAME] - If you want to propagate results to parent objects store a list of strings 'summary' entry of your result dict - """ - file_object.processed_analysis[self.NAME] = {} - for hash_ in self.hashes_to_create: - if hash_ in algorithms_guaranteed: - file_object.processed_analysis[self.NAME][hash_] = get_hash(hash_, file_object.binary) - else: - logging.debug(f'algorithm {hash_} not available') - file_object.processed_analysis[self.NAME]['ssdeep'] = get_ssdeep(file_object.binary) - file_object.processed_analysis[self.NAME]['imphash'] = get_imphash(file_object) - - tlsh_hash = get_tlsh(file_object.binary) - if tlsh_hash: - file_object.processed_analysis[self.NAME]['tlsh'] = get_tlsh(file_object.binary) - - return file_object +def get_tlsh(file_contents: bytes) -> str | None: + tlsh_hash = tlsh.hash(file_contents) + return tlsh_hash if tlsh_hash != 'TNULL' else None diff --git a/src/plugins/analysis/hash/test/test_plugin_hash.py b/src/plugins/analysis/hash/test/test_plugin_hash.py index eb72084563..cfd2a27ba8 100644 --- a/src/plugins/analysis/hash/test/test_plugin_hash.py +++ b/src/plugins/analysis/hash/test/test_plugin_hash.py @@ -1,13 +1,21 @@ import os +from pathlib import Path import pytest -from common_helper_files import get_dir_of_file -from test.common_helper import MockFileObject +from ..code.hash import AnalysisPlugin, get_imphash, get_ssdeep, get_tlsh, suppress_stdout -from ..code.hash import AnalysisPlugin +TEST_DATA_DIR = Path(__file__).parent / 'data' +TEST_FILE = TEST_DATA_DIR / 'ls' +MD5_LEN = 32 +TEST_STRING = b'test string' -TEST_DATA_DIR = os.path.join(get_dir_of_file(__file__), 'data') # noqa: PTH118 + +class MockTypeResultSchema: + mime = 'application/x-executable' + + +ANALYSIS_RESULT = {'file_type': MockTypeResultSchema()} @pytest.mark.backend_config_overwrite( @@ -23,20 +31,51 @@ @pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) class TestAnalysisPluginHash: def test_all_hashes(self, analysis_plugin): - result = analysis_plugin.process_object(MockFileObject()).processed_analysis[analysis_plugin.NAME] + with TEST_FILE.open('rb') as fp: + result = analysis_plugin.analyze(fp, {}, ANALYSIS_RESULT) - assert 'md5' in result, 'md5 not in result' - assert 'sha1' in result, 'sha1 not in result' - assert 'foo' not in result, 'foo in result but not available' - assert result['md5'] == '6f8db599de986fab7a21625b7916589c', 'hash not correct' - assert 'ssdeep' in result, 'ssdeep not in result' - assert 'imphash' in result, 'imphash not in result' + assert isinstance(result.md5, str) + assert isinstance(result.sha1, str), 'sha1 not in result' + assert isinstance(result.ssdeep, str), 'ssdeep not in result' + assert isinstance(result.imphash, str), 'imphash not in result' + assert not hasattr(result, 'foo') + assert result.md5 == '87b02c9bea4be534649d3ab0b6f040a0', 'hash not correct' def test_imphash(self, analysis_plugin): - file_path = os.path.join(TEST_DATA_DIR, 'ls') # noqa: PTH118 - result = analysis_plugin.process_object(MockFileObject(file_path=file_path)).processed_analysis[ - analysis_plugin.NAME - ] + with TEST_FILE.open('rb') as fp: + result = analysis_plugin.analyze(fp, {}, ANALYSIS_RESULT) + + assert isinstance(result.imphash, str), 'imphash should be a string' + assert len(result.imphash) == MD5_LEN, 'imphash does not look like an md5' + + +def test_get_ssdeep(): + assert get_ssdeep(TEST_STRING) == '3:Hv2:HO', 'not correct from string' + + +def test_imphash_bad_file(): + this_file = Path(__file__) + with this_file.open('rb') as fp: + assert get_imphash(fp, MockTypeResultSchema()) is None + + +def print_foo(): + print('foo', end='') # noqa: T201 + + +def test_suppress_stdout(capsys): + print_foo() + + without_decorator = capsys.readouterr() + assert without_decorator.out == 'foo' + + with suppress_stdout(): + print_foo() + + with_decorator = capsys.readouterr() + assert not with_decorator.out + - assert isinstance(result['imphash'], str), 'imphash should be a string' - assert len(result['imphash']) == 32, 'imphash does not look like an md5' # noqa: PLR2004 +def test_get_tlsh(): + assert get_tlsh(b'foobar') is None # make sure the result is not 'TNULL' + assert get_tlsh(os.urandom(2**7)) not in [None, ''] # the new tlsh version should work for smaller inputs diff --git a/src/plugins/analysis/hash/view/hash.html b/src/plugins/analysis/hash/view/hash.html new file mode 100644 index 0000000000..410ca89b53 --- /dev/null +++ b/src/plugins/analysis/hash/view/hash.html @@ -0,0 +1,14 @@ +{% extends "analysis_plugins/general_information.html" %} + +{% block analysis_result_details %} + + {% for key, value in analysis_result.items() | sort %} + {% if value %} + + {{ key }} + {{ value }} + + {% endif %} + {% endfor %} + +{% endblock %} diff --git a/src/test/unit/helperFunctions/test_hash.py b/src/test/unit/helperFunctions/test_hash.py index 723e935e73..19a7e88968 100644 --- a/src/test/unit/helperFunctions/test_hash.py +++ b/src/test/unit/helperFunctions/test_hash.py @@ -1,21 +1,12 @@ -import os -from pathlib import Path - from helperFunctions.hash import ( - _suppress_stdout, - get_imphash, get_md5, get_sha256, - get_ssdeep, - get_tlsh, normalize_lief_items, ) -from test.common_helper import create_test_file_object, get_test_data_dir TEST_STRING = 'test string' TEST_SHA256 = 'd5579c46dfcc7f18207013e65b44e4cb4e2c2298f4ac457ba8f82743f31e930b' TEST_MD5 = '6f8db599de986fab7a21625b7916589c' -TEST_SSDEEP = '3:Hv2:HO' def test_get_sha256(): @@ -26,24 +17,6 @@ def test_get_md5(): assert get_md5(TEST_STRING) == TEST_MD5, 'not correct from string' -def test_get_ssdeep(): - assert get_ssdeep(TEST_STRING) == TEST_SSDEEP, 'not correct from string' - - -def test_imphash(): - fo = create_test_file_object(bin_path=str(Path(get_test_data_dir(), 'test_executable'))) - fo.processed_analysis = {'file_type': {'result': {'mime': 'application/x-executable'}}} - imphash = get_imphash(fo) - assert isinstance(imphash, str), 'imphash should be a string' - assert len(imphash) == 32, 'imphash does not seem to be an md5' # noqa: PLR2004 - - -def test_imphash_bad_file(): - fo = create_test_file_object() - fo.processed_analysis = {'file_type': {'result': {'mime': 'application/x-executable'}}} - assert get_imphash(fo) is None - - def test_normalize_items_from_strings(): functions = ['printf', '__libc_start_main'] assert normalize_lief_items(functions) == functions @@ -63,25 +36,3 @@ def __str__(self): def test_normalize_items_empty_list(): assert normalize_lief_items([]) == [] - - -def print_foo(): - print('foo', end='') # noqa: T201 - - -def test_suppress_stdout(capsys): - print_foo() - - without_decorator = capsys.readouterr() - assert without_decorator.out == 'foo' - - with _suppress_stdout(): - print_foo() - - with_decorator = capsys.readouterr() - assert not with_decorator.out - - -def test_get_tlsh(): - assert get_tlsh(b'foobar') == '' # make sure the result is not 'TNULL' - assert get_tlsh(os.urandom(2**7)) != '' # the new tlsh version should work for smaller inputs