Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash plugin v1 #1262

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/analysis/plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class MetaData(BaseModel):
#: It MUST be a `semver <https://semver.org/>`_ version.
#: Here is a quick summary how semver relates to plugins.
#: * MAJOR: The plugin schema changed.
#: * MINOR: The schema din't change but might contain more data.
#: * MINOR: The schema didn't change but might contain more data.
#: * PATCH: A bug was fixed e.g. a crash on some files.
#:
#: Note that any version change leads to rescheduling the analysis.
#: But backwards compatible results will still be shown in the fronfrontend.
#: But backwards compatible results will still be shown in the frontend.
version: semver.Version
#: The version of the backing analysis system.
#: E.g. for yara plugins this would be the yara version.
Expand Down Expand Up @@ -88,10 +88,9 @@ def __init__(self, metadata: MetaData):
# The type MetaData.Schema
Schema = typing.TypeVar('Schema')

@abc.abstractmethod
def summarize(self, result: Schema) -> list[str]:
def summarize(self, result: Schema) -> list[str]: # noqa: ARG002
"""
The summary is a list of categories in which the result can be grouped.
The summary is an optional list of categories in which the result can be grouped.
In the FACT_core frontend if you view the analysis of a container the
summary is used to group files included in it.

Expand All @@ -105,6 +104,7 @@ def summarize(self, result: Schema) -> list[str]:

:param result: The analysis as returned by :py:func:`analyze`
"""
return []

@abc.abstractmethod
def analyze(
Expand Down
81 changes: 6 additions & 75 deletions src/helperFunctions/hash.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,39 @@
from __future__ import annotations

import contextlib
import logging
import sys
from hashlib import md5, new
from typing import TYPE_CHECKING
from hashlib import new

import lief
import ssdeep
import tlsh

from helperFunctions.data_conversion import make_bytes

if TYPE_CHECKING:
from objects.file import FileObject

ELF_MIME_TYPES = [
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
]


def get_hash(hash_function, binary):
def get_hash(hash_function: str, binary: bytes | str) -> str:
"""
Hashes binary with hash_function.

:param hash_function: The hash function to use. See hashlib for more
:param binary: The data to hash, either as string or array of Integers
:return: The hash as hexstring
:return: The hash as hex string
"""
binary = make_bytes(binary)
raw_hash = new(hash_function)
raw_hash.update(binary)
raw_hash.update(make_bytes(binary))
return raw_hash.hexdigest()


def get_sha256(code):
def get_sha256(code: bytes | str) -> str:
return get_hash('sha256', code)


def get_md5(code):
def get_md5(code: bytes | str) -> str:
return get_hash('md5', code)


def get_ssdeep(code):
binary = make_bytes(code)
raw_hash = ssdeep.Hash()
raw_hash.update(binary)
return raw_hash.digest()


def get_tlsh(code):
tlsh_hash = tlsh.hash(make_bytes(code))
return tlsh_hash if tlsh_hash != 'TNULL' else ''


def get_tlsh_comparison(first, second):
return tlsh.diff(first, second)


def get_imphash(file_object: FileObject) -> str | None:
"""
Generates and returns the md5 hash of the (sorted) imported functions of an ELF file represented by `file_object`.
Returns `None` if there are no imports or if an exception occurs.

:param file_object: The FileObject of which the imphash shall be computed
"""
if _is_elf_file(file_object):
try:
with _suppress_stdout():
functions = [f.name for f in lief.ELF.parse(file_object.file_path).imported_functions]
if functions:
return md5(','.join(sorted(functions)).encode()).hexdigest()
except Exception:
logging.exception(f'Could not compute imphash for {file_object.file_path}')
return None


def _is_elf_file(file_object: FileObject) -> bool:
return file_object.processed_analysis['file_type']['result']['mime'] in ELF_MIME_TYPES


def normalize_lief_items(functions):
"""
Shorthand to convert a list of objects to a list of strings
"""
return [str(function) for function in functions]


class _StandardOutWriter:
def write(self, _):
pass


@contextlib.contextmanager
def _suppress_stdout():
"""A context manager that suppresses any output to stdout and stderr."""
writer = _StandardOutWriter()

stdout, stderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = writer, writer

yield

sys.stdout, sys.stderr = stdout, stderr
138 changes: 104 additions & 34 deletions src/plugins/analysis/hash/code/hash.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,113 @@
from __future__ import annotations

import logging
from hashlib import algorithms_guaranteed
from typing import TYPE_CHECKING, Optional

import lief
import ssdeep
import tlsh
from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.PluginBase import AnalysisBasePlugin
from helperFunctions.hash import get_hash, get_imphash, get_ssdeep, get_tlsh
from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.hash import get_hash, get_md5

if TYPE_CHECKING:
from io import FileIO

ELF_MIME_TYPES = [
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
]


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
# The supported hashes are the ones from helperFunctions.hash and hashlib (except "shake" which is of
# little use considering its variable length).
# If they are not supported on the platform or not selected in the configuration of the plugin, the value will
# be `None`.
# Only the md5 and sha256 hashes are guaranteed to be available (since they are required down the line)

# from hashlib
md5: str = Field(description="md5 hash of the file's content")
sha256: str = Field(description="sha256 hash of the file's content")
sha1: Optional[str] = Field(description="sha1 hash of the file's content", default=None)
sha224: Optional[str] = Field(description="sha224 hash of the file's content", default=None)
sha384: Optional[str] = Field(description="sha384 hash of the file's content", default=None)
sha512: Optional[str] = Field(description="sha512 hash of the file's content", default=None)
blake2b: Optional[str] = Field(description="blake2b hash of the file's content", default=None)
blake2s: Optional[str] = Field(description="blake2s hash of the file's content", default=None)
sha3_224: Optional[str] = Field(description="sha3_224 hash of the file's content", default=None)
sha3_256: Optional[str] = Field(description="sha3_256 hash of the file's content", default=None)
sha3_384: Optional[str] = Field(description="sha3_384 hash of the file's content", default=None)
sha3_512: Optional[str] = Field(description="sha3_512 hash of the file's content", default=None)

ssdeep: Optional[str] = Field(description="ssdeep hash of the file's content", default=None)
tlsh: Optional[str] = Field(description="tlsh hash of the file's content", default=None)
imphash: Optional[str] = Field(
description='import hash: the MD5 hash of the sorted imported functions (ELF files only)',
default=None,
)

def __init__(self):
super().__init__(
metadata=self.MetaData(
name='file_hashes',
description='calculate different hash values of the file',
version=Version(1, 3, 0),
dependencies=['file_type'],
Schema=self.Schema,
),
)
configured_hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', [])
self.hashes_to_create = set(configured_hashes).union({'sha256', 'md5'})

def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema:
del virtual_file_path
result = {}

class AnalysisPlugin(AnalysisBasePlugin):
file_handle.seek(0)
file_contents = file_handle.read()
for hash_ in self.hashes_to_create.intersection(algorithms_guaranteed):
result[hash_] = get_hash(hash_, file_contents)
result['ssdeep'] = get_ssdeep(file_contents)
result['imphash'] = get_imphash(file_handle, analyses.get('file_type'))
result['tlsh'] = get_tlsh(file_contents)

return self.Schema(**result)


def get_imphash(file: FileIO, type_analysis: BaseModel | None) -> str | None:
"""
This Plugin creates several hashes of the file
Generates and returns the md5 hash for the (sorted) imported functions of an ELF file.
Returns `None` if there are no imports or if an exception occurs.
"""
if type_analysis is not None and _is_elf_file(type_analysis):
try:
if (parsed_elf := lief.ELF.parse(file.name)) is not None and len(parsed_elf.imported_functions) > 0:
functions = [f.name for f in parsed_elf.imported_functions]
return get_md5(','.join(sorted(functions)))
except Exception as error:
logging.warning(f'Could not compute imphash for {file}: {error}')
return None


def _is_elf_file(type_analysis: BaseModel) -> bool:
return type_analysis.mime in ELF_MIME_TYPES


def get_ssdeep(file_contents: bytes) -> str:
raw_hash = ssdeep.Hash()
raw_hash.update(file_contents)
return raw_hash.digest()


NAME = 'file_hashes'
DEPENDENCIES = ['file_type'] # noqa: RUF012
DESCRIPTION = 'calculate different hash values of the file'
VERSION = '1.2'
FILE = __file__

def additional_setup(self):
hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', ['sha256'])
self.hashes_to_create = hashes

def process_object(self, file_object):
"""
This function must be implemented by the plugin.
Analysis result must be a dict stored in file_object.processed_analysis[self.NAME]
If you want to propagate results to parent objects store a list of strings 'summary' entry of your result dict
"""
file_object.processed_analysis[self.NAME] = {}
for hash_ in self.hashes_to_create:
if hash_ in algorithms_guaranteed:
file_object.processed_analysis[self.NAME][hash_] = get_hash(hash_, file_object.binary)
else:
logging.debug(f'algorithm {hash_} not available')
file_object.processed_analysis[self.NAME]['ssdeep'] = get_ssdeep(file_object.binary)
file_object.processed_analysis[self.NAME]['imphash'] = get_imphash(file_object)

tlsh_hash = get_tlsh(file_object.binary)
if tlsh_hash:
file_object.processed_analysis[self.NAME]['tlsh'] = get_tlsh(file_object.binary)

return file_object
def get_tlsh(file_contents: bytes) -> str | None:
tlsh_hash = tlsh.hash(file_contents)
return tlsh_hash if tlsh_hash != 'TNULL' else None
56 changes: 39 additions & 17 deletions src/plugins/analysis/hash/test/test_plugin_hash.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import os
from pathlib import Path

import pytest
from common_helper_files import get_dir_of_file

from test.common_helper import MockFileObject
from ..code.hash import AnalysisPlugin, get_imphash, get_ssdeep, get_tlsh

from ..code.hash import AnalysisPlugin
TEST_DATA_DIR = Path(__file__).parent / 'data'
TEST_FILE = TEST_DATA_DIR / 'ls'
MD5_LEN = 32
TEST_STRING = b'test string'

TEST_DATA_DIR = os.path.join(get_dir_of_file(__file__), 'data') # noqa: PTH118

class MockTypeResultSchema:
mime = 'application/x-executable'


ANALYSIS_RESULT = {'file_type': MockTypeResultSchema()}


@pytest.mark.backend_config_overwrite(
Expand All @@ -23,20 +31,34 @@
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginHash:
def test_all_hashes(self, analysis_plugin):
result = analysis_plugin.process_object(MockFileObject()).processed_analysis[analysis_plugin.NAME]
with TEST_FILE.open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, ANALYSIS_RESULT)

assert 'md5' in result, 'md5 not in result'
assert 'sha1' in result, 'sha1 not in result'
assert 'foo' not in result, 'foo in result but not available'
assert result['md5'] == '6f8db599de986fab7a21625b7916589c', 'hash not correct'
assert 'ssdeep' in result, 'ssdeep not in result'
assert 'imphash' in result, 'imphash not in result'
assert result.md5 is not None
assert result.sha1 is not None
assert result.ssdeep is not None
assert result.imphash is not None
assert result.md5 == '87b02c9bea4be534649d3ab0b6f040a0', 'hash not correct'

def test_imphash(self, analysis_plugin):
file_path = os.path.join(TEST_DATA_DIR, 'ls') # noqa: PTH118
result = analysis_plugin.process_object(MockFileObject(file_path=file_path)).processed_analysis[
analysis_plugin.NAME
]
with TEST_FILE.open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, ANALYSIS_RESULT)

assert isinstance(result.imphash, str), 'imphash should be a string'
assert len(result.imphash) == MD5_LEN, 'imphash does not look like an md5'
assert result.imphash == 'd9eccd5f72564ac07601458b26040259'


def test_get_ssdeep():
assert get_ssdeep(TEST_STRING) == '3:Hv2:HO', 'not correct from string'


def test_imphash_bad_file():
this_file = Path(__file__)
with this_file.open('rb') as fp:
assert get_imphash(fp, MockTypeResultSchema()) is None


assert isinstance(result['imphash'], str), 'imphash should be a string'
assert len(result['imphash']) == 32, 'imphash does not look like an md5'
def test_get_tlsh():
assert get_tlsh(b'foobar') is None # make sure the result is not 'TNULL'
assert get_tlsh(os.urandom(2**7)) not in [None, ''] # the new tlsh version should work for smaller inputs
14 changes: 14 additions & 0 deletions src/plugins/analysis/hash/view/hash.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% extends "analysis_plugins/general_information.html" %}

{% block analysis_result_details %}

{% for key, value in analysis_result.items() | sort %}
{% if value %}
<tr>
<td>{{ key }}</td>
<td style="font-family: monospace">{{ value }}</td>
</tr>
{% endif %}
{% endfor %}
maringuu marked this conversation as resolved.
Show resolved Hide resolved

{% endblock %}
Loading