-
Notifications
You must be signed in to change notification settings - Fork 227
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
binwalk plugin: ported plugin to new base class
also updated blacklist
- Loading branch information
Showing
4 changed files
with
124 additions
and
83 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,83 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import string | ||
import subprocess | ||
from base64 import b64encode | ||
from pathlib import Path | ||
from subprocess import PIPE, STDOUT | ||
from tempfile import TemporaryDirectory | ||
from typing import TYPE_CHECKING, List | ||
|
||
import binwalk | ||
from pydantic import BaseModel, Field | ||
|
||
import config | ||
from analysis.PluginBase import AnalysisBasePlugin | ||
from analysis.plugin import AnalysisPluginV0 | ||
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin | ||
from analysis.plugin.plugin import AnalysisFailedError | ||
from helperFunctions.install import OperateInDirectory | ||
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED | ||
|
||
if TYPE_CHECKING: | ||
import io | ||
|
||
class AnalysisPlugin(AnalysisBasePlugin): | ||
NAME = 'binwalk' | ||
DESCRIPTION = 'binwalk signature and entropy analysis' | ||
DEPENDENCIES = [] # noqa: RUF012 | ||
MIME_BLACKLIST = ['audio', 'image', 'video'] # noqa: RUF012 | ||
VERSION = '0.5.5' | ||
FILE = __file__ | ||
from binwalk.modules.entropy import Entropy | ||
from binwalk.modules.signature import Signature | ||
|
||
def process_object(self, file_object): | ||
result = {} | ||
with TemporaryDirectory(prefix='fact_analysis_binwalk_', dir=config.backend.temp_dir_path) as tmp_dir: | ||
cmd_process = subprocess.run( | ||
f'(cd {tmp_dir} && xvfb-run -a binwalk -BEJ {file_object.file_path})', | ||
shell=True, | ||
stdout=PIPE, | ||
stderr=STDOUT, | ||
text=True, | ||
check=False, | ||
) | ||
signature_analysis_result = cmd_process.stdout | ||
try: | ||
pic_path = Path(tmp_dir) / f'{Path(file_object.file_path).name}.png' | ||
result['entropy_analysis_graph'] = b64encode(pic_path.read_bytes()).decode() | ||
result['signature_analysis'] = signature_analysis_result | ||
result['summary'] = list(set(self._extract_summary(signature_analysis_result))) | ||
except FileNotFoundError: | ||
result = {'failed': 'Binwalk analysis failed'} | ||
logging.error(f'Binwalk analysis on {file_object.uid} failed:\n{signature_analysis_result}') | ||
|
||
file_object.processed_analysis[self.NAME] = result | ||
return file_object | ||
class SignatureScanResult(BaseModel): | ||
offset: int | ||
description: str | ||
|
||
def _extract_summary(self, binwalk_output: str) -> list[str]: | ||
summary = [] | ||
for line in self._iterate_valid_signature_lines(binwalk_output.splitlines()): | ||
signature_description = self._extract_description_from_signature_line(line.split()) | ||
if 'entropy edge' in signature_description: | ||
continue | ||
if ',' in signature_description: | ||
summary.append(signature_description.split(',', maxsplit=1)[0]) | ||
elif signature_description: | ||
summary.append(signature_description) | ||
|
||
return summary | ||
class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): | ||
class Schema(BaseModel): | ||
entropy_analysis_graph: str = Field( | ||
description='An entropy analysis graph generated by binwalk as base64 string.', | ||
) | ||
signature_analysis: List[SignatureScanResult] = Field( | ||
description='The result of the signature analysis from binwalk.', | ||
) | ||
|
||
def __init__(self): | ||
super().__init__( | ||
metadata=AnalysisPluginV0.MetaData( | ||
name='binwalk', | ||
description='binwalk signature and entropy analysis', | ||
version='1.0.0', | ||
Schema=self.Schema, | ||
mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED], | ||
), | ||
) | ||
|
||
@staticmethod | ||
def _extract_description_from_signature_line(separated_by_spaces): | ||
return ' '.join(separated_by_spaces[2:]) if len(separated_by_spaces) > 2 else '' # noqa: PLR2004 | ||
def analyze(self, file_handle: io.FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema: | ||
del virtual_file_path, analyses | ||
|
||
@staticmethod | ||
def _iterate_valid_signature_lines(output_lines): | ||
return (line for line in output_lines if line and line[0] in string.digits) | ||
# FixMe: fix formatting once Python 3.8 is deprecated (2024-10-31) | ||
with TemporaryDirectory( | ||
prefix='fact_analysis_binwalk_', dir=config.backend.temp_dir_path | ||
) as tmp_dir, OperateInDirectory(tmp_dir): | ||
output: tuple[Signature, Entropy] = binwalk.scan( | ||
file_handle.name, | ||
signature=True, | ||
entropy=True, | ||
save=True, | ||
quiet=True, | ||
) | ||
signature_result, entropy_result = output | ||
if not entropy_result.output_file or not (pic_path := Path(entropy_result.output_file)).is_file(): | ||
raise AnalysisFailedError('Entropy output file is missing') | ||
return self.Schema( | ||
entropy_analysis_graph=b64encode(pic_path.read_bytes()).decode(), | ||
signature_analysis=[ | ||
SignatureScanResult(offset=i.offset, description=i.description) for i in signature_result.results | ||
], | ||
) | ||
|
||
def summarize(self, result: Schema) -> list: | ||
summary = [] | ||
for item in result.signature_analysis: # type: SignatureScanResult | ||
if 'entropy edge' in item.description: | ||
continue | ||
if ',' in item.description: | ||
summary.append(item.description.split(',', maxsplit=1)[0]) | ||
elif item.description: | ||
summary.append(item.description) | ||
return summary |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,32 @@ | ||
import string | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from objects.file import FileObject | ||
from test.common_helper import get_test_data_dir | ||
|
||
from ..code.binwalk import AnalysisPlugin | ||
|
||
TEST_OUTPUT = """ | ||
DECIMAL HEXADECIMAL DESCRIPTION | ||
-------------------------------------------------------------------------------- | ||
0 0x0 Microsoft executable, portable (PE) | ||
106008 0x19E18 XML document, version: "1.0" | ||
113771 0x1BC6B Zip archive data, at least v2.0 to extract, compressed size: 47799, uncompressed size: 119688, name: PH1BXRM_AM_000803003938.dat | ||
2752561 0x2A0031 Falling entropy edge (0.026681) | ||
12226608 0xBA9030 End of Zip archive, footer length: 22 | ||
""" # noqa: E501 | ||
TEST_FILE = Path(get_test_data_dir()) / 'container' / 'test.zip' | ||
|
||
|
||
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) | ||
class TestPluginBinwalk: | ||
def test_signature_analysis(self, analysis_plugin): | ||
test_file = FileObject(file_path=f'{get_test_data_dir()}/container/test.zip') | ||
processed_file = analysis_plugin.process_object(test_file) | ||
results = processed_file.processed_analysis[analysis_plugin.NAME] | ||
assert len(results['signature_analysis']) > 0, 'no binwalk signature analysis found' | ||
assert 'DECIMAL' in results['signature_analysis'], 'no valid binwalk signature analysis' | ||
assert TEST_FILE.is_file(), 'test file is missing' | ||
with TEST_FILE.open() as fp: | ||
result = analysis_plugin.analyze(fp, {}, {}) | ||
assert len(result.signature_analysis) > 0, 'no binwalk signature analysis found' | ||
assert 'Zip archive data' in result.signature_analysis[0].description, 'no valid binwalk signature analysis' | ||
|
||
def test_entropy_graph(self, analysis_plugin): | ||
test_file = FileObject(file_path=f'{get_test_data_dir()}/container/test.zip') | ||
processed_file = analysis_plugin.process_object(test_file) | ||
results = processed_file.processed_analysis[analysis_plugin.NAME] | ||
assert len(results['entropy_analysis_graph']) > 0, 'no binwalk entropy graph found' | ||
assert TEST_FILE.is_file(), 'test file is missing' | ||
with TEST_FILE.open() as fp: | ||
result = analysis_plugin.analyze(fp, {}, {}) | ||
assert len(result.entropy_analysis_graph) > 0, 'no binwalk entropy graph found' | ||
|
||
def test_summary(self, analysis_plugin): | ||
summary = analysis_plugin._extract_summary(TEST_OUTPUT) | ||
for x in summary: | ||
assert x in ['Microsoft executable', 'XML document', 'Zip archive data', 'End of Zip archive'] | ||
|
||
def test_iterate_valid_signature_lines(self, analysis_plugin): | ||
result = list(analysis_plugin._iterate_valid_signature_lines(TEST_OUTPUT.splitlines())) | ||
assert len(result) == 5 # noqa: PLR2004 | ||
assert all(line[0] in string.digits for line in result) | ||
assert result[0] == '0 0x0 Microsoft executable, portable (PE)' | ||
with TEST_FILE.open() as fp: | ||
test_result = analysis_plugin.analyze(fp, {}, {}) | ||
summary = analysis_plugin.summarize(test_result) | ||
for line in summary: | ||
assert line in {'Zip archive data', 'End of Zip archive'} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters