diff --git a/src/plugins/analysis/cve_lookup/code/cve_lookup.py b/src/plugins/analysis/cve_lookup/code/cve_lookup.py index 61fca583e..e9b8dc5bd 100644 --- a/src/plugins/analysis/cve_lookup/code/cve_lookup.py +++ b/src/plugins/analysis/cve_lookup/code/cve_lookup.py @@ -46,9 +46,9 @@ def process_object(self, file_object: FileObject) -> FileObject: cves = {'cve_results': {}} connection = DbConnection(f'sqlite:///{DB_PATH}') lookup = Lookup(file_object, connection, match_any=self.match_any) - for value in file_object.processed_analysis['software_components']['result'].values(): - product = value['meta']['software_name'] - version = value['meta']['version'][0] + for sw_dict in file_object.processed_analysis['software_components']['result'].get('software_components', []): + product = sw_dict['name'] + version = sw_dict['versions'][0] if sw_dict['versions'] else None if product and version: vulnerabilities = lookup.lookup_vulnerabilities(product, version) if vulnerabilities: diff --git a/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py b/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py index 9b310480f..8c5f3a934 100644 --- a/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py +++ b/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py @@ -47,15 +47,18 @@ def process_object(self, file_object): def get_matched_vulnerabilities(self, yara_result: list[tuple[str, dict]], file_object) -> list[tuple[str, dict]]: software_components_results = file_object.processed_analysis.get('software_components', {}).get('result', {}) + software_by_name = { + sw_dict['name']: sw_dict for sw_dict in software_components_results.get('software_components', []) + } matched_vulnerabilities = self._check_vulnerabilities(file_object.processed_analysis) # CVE-2021-45608 NetUSB - if 'NetUSB' in software_components_results: + if 'NetUSB' in software_by_name: matched_vulnerabilities.extend(self._check_netusb_vulnerability(file_object.file_path)) # CVE-2024-3094 XZ Backdoor secondary detection - if 'liblzma' in software_components_results and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result): - matched_vulnerabilities.extend(_check_xz_backdoor(software_components_results)) + if 'liblzma' in software_by_name and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result): + matched_vulnerabilities.extend(_check_xz_backdoor(software_by_name['liblzma'])) return matched_vulnerabilities def add_tags(self, file_object, vulnerability_list): @@ -131,7 +134,7 @@ def _check_netusb_vulnerability(self, file_path: str) -> list[tuple[str, dict]]: def _check_xz_backdoor(software_results: dict) -> list[tuple[str, dict]]: - if any(v in software_results['liblzma']['meta']['version'] for v in ['5.6.0', '5.6.1']): + if any(v in software_results['version'] for v in ['5.6.0', '5.6.1']): return [ ( 'XZ Backdoor', diff --git a/src/plugins/analysis/software_components/code/software_components.py b/src/plugins/analysis/software_components/code/software_components.py index 9c59bc267..cefa760dc 100644 --- a/src/plugins/analysis/software_components/code/software_components.py +++ b/src/plugins/analysis/software_components/code/software_components.py @@ -2,11 +2,14 @@ import re import string -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Optional + +from pydantic import BaseModel, Field +from semver import Version import config -from analysis.YaraPluginBase import YaraBasePlugin -from helperFunctions.data_conversion import make_unicode_string +from analysis.plugin import AnalysisPluginV0, Tag, addons +from analysis.plugin.compat import AnalysisBasePluginAdapterMixin from helperFunctions.tag import TagColor from plugins.analysis.software_components.bin import OS_LIST from plugins.mime_blacklists import MIME_BLACKLIST_NON_EXECUTABLE @@ -14,113 +17,152 @@ from ..internal.resolve_version_format_string import extract_data_from_ghidra if TYPE_CHECKING: - from objects.file import FileObject + from io import FileIO + import yara -class AnalysisPlugin(YaraBasePlugin): - """ - This plugin identifies software components - Credits: - OS Tagging functionality created by Roman Konertz during Firmware Bootcamp WT17/18 at University of Bonn - Maintained by Fraunhofer FKIE - """ +class SoftwareMatch(BaseModel): + name: str + versions: List[str] + rule: str = Field(description='Matching YARA rule name') + matching_strings: List[MatchingString] + description: Optional[str] = None + open_source: Optional[bool] = None + website: Optional[str] = Field(None, description='Website URL of the software') - NAME = 'software_components' - DESCRIPTION = 'identify software components' - MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE - VERSION = '0.5.1' - FILE = __file__ - def process_object(self, file_object): - file_object = super().process_object(file_object) - analysis = file_object.processed_analysis[self.NAME] - if len(analysis) > 1: - analysis = self.add_version_information(analysis, file_object) - analysis['summary'] = self._get_summary(analysis) +class MatchingString(BaseModel): + string: str + offset: int + identifier: str = Field(description='Identifier of the rule that this string matched (e.g. "$a")') - self.add_os_key(file_object) - return file_object - def get_version(self, input_string: str, meta_dict: dict) -> str: - if 'version_regex' in meta_dict: - regex = meta_dict['version_regex'].replace('\\\\', '\\') - else: - regex = r'\d+.\d+(.\d+)?(\w)?' - pattern = re.compile(regex) - version = pattern.search(input_string) - if version is not None: - return self._strip_leading_zeroes(version.group(0)) - return '' - - @staticmethod - def _get_summary(results: dict) -> list[str]: - summary = set() - for key, result in results.items(): - if key != 'summary': - software = result['meta']['software_name'] - for version in result['meta']['version']: - summary.add(f'{software} {version}') - return sorted(summary) +class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): + class Schema(BaseModel): + software_components: List[SoftwareMatch] - def add_version_information(self, results, file_object: FileObject): - for item in results: - if item != 'summary': - results[item] = self.get_version_for_component(results[item], file_object) - return results - - def get_version_for_component(self, result, file_object: FileObject): - versions = set() - for matched_string in result['strings']: - match = matched_string[2].strip() - match = make_unicode_string(match) - versions.add(self.get_version(match, result['meta'])) - if any(k in result['meta'] for k in ('format_string', '_version_function')): - if result['meta'].get('format_string'): - input_data = { - 'mode': 'format_string', - 'key_string_list': [s for _, _, s in result['strings'] if '%s' in s], - } - else: - input_data = { - 'mode': 'version_function', - 'function_name': result['meta']['_version_function'], - } - versions.update( - extract_data_from_ghidra(file_object.file_path, input_data, config.backend.docker_mount_base_dir) + def __init__(self): + super().__init__( + metadata=( + self.MetaData( + name='software_components', + description='identify software components', + mime_blacklist=MIME_BLACKLIST_NON_EXECUTABLE, + version=Version(1, 0, 0), + Schema=self.Schema, + ) ) - if '' in versions and len(versions) > 1: # if there are actual version results, remove the "empty" result - versions.remove('') - result['meta']['version'] = list(versions) - return result + ) + self._yara = addons.Yara(plugin=self) + + def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema: + del virtual_file_path, analyses + return self.Schema( + software_components=[ + SoftwareMatch( + name=match.meta.get('software_name'), + rule=match.rule, + matching_strings=_get_matching_strings(match), + versions=get_version_for_component(match, file_handle), + description=match.meta.get('description'), + website=match.meta.get('website'), + open_source=match.meta.get('open_source'), + ) + for match in self._yara.match(file_handle) + ] + ) + + def summarize(self, result: Schema) -> list[str]: + summary = set() + for software in result.software_components: + if software.versions: + for version in software.versions: + summary.add(f'{software.name} {version}') + else: + summary.add(software.name) + return sorted(summary) - def add_os_key(self, file_object): - for entry in file_object.processed_analysis[self.NAME]['summary']: + def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]: + del result + tags = [] + for entry in summary: for os_ in OS_LIST: if entry.find(os_) != -1: - if self._entry_has_no_trailing_version(entry, os_): - self.add_analysis_tag(file_object, 'OS', entry, TagColor.GREEN, True) + if _entry_has_no_trailing_version(entry, os_): + tags.append(Tag(name='OS', value=entry, color=TagColor.GREEN, propagate=True)) else: - self.add_analysis_tag(file_object, 'OS', os_, TagColor.GREEN, False) - self.add_analysis_tag(file_object, 'OS Version', entry, TagColor.GREEN, True) - - @staticmethod - def _entry_has_no_trailing_version(entry, os_string): - return os_string.strip() == entry.strip() - - @staticmethod - def _strip_leading_zeroes(version_string: str) -> str: - prefix, suffix = '', '' - while version_string and version_string[0] not in string.digits: - prefix += version_string[0] - version_string = version_string[1:] - while version_string and version_string[-1] not in string.digits: - suffix = version_string[-1] + suffix - version_string = version_string[:-1] - elements = [] - for element in version_string.split('.'): - try: - elements.append(str(int(element))) - except ValueError: - elements.append(element) - return prefix + '.'.join(elements) + suffix + tags.append(Tag(name='OS', value=os_, color=TagColor.GREEN, propagate=False)) + tags.append(Tag(name='OS Version', value=entry, color=TagColor.GREEN, propagate=True)) + return tags + + +def _get_matching_strings(match: yara.Match) -> list[MatchingString]: + return [ + MatchingString( + string=instance.matched_data.decode(errors='replace'), + offset=instance.offset, + identifier=_string.identifier, + ) + for _string in match.strings # type: yara.StringMatch + for instance in _string.instances # type: yara.StringMatchInstance + ] + + +def get_version_for_component(match: yara.Match, file: FileIO) -> list[str]: + matching_strings = _get_strings_from_match(match) + versions = {get_version(matching_str, match.meta) for matching_str in matching_strings} + if any(k in match.meta for k in ('format_string', '_version_function')): + if match.meta.get('format_string'): + input_data = { + 'mode': 'format_string', + 'key_string_list': [s for s in matching_strings if '%s' in s], + } + else: + input_data = { + 'mode': 'version_function', + 'function_name': match.meta['_version_function'], + } + versions.update(extract_data_from_ghidra(file.name, input_data, config.backend.docker_mount_base_dir)) + return [v for v in versions if v] + + +def get_version(input_string: str, meta_dict: dict) -> str | None: + if 'version_regex' in meta_dict: + regex = meta_dict['version_regex'].replace('\\\\', '\\') + else: + regex = r'\d+.\d+(.\d+)?(\w)?' + pattern = re.compile(regex) + version = pattern.search(input_string) + if version is not None: + return _strip_leading_zeroes(version.group(0)) + return None + + +def _get_strings_from_match(match: yara.Match) -> list[str]: + return [ + instance.matched_data.decode(errors='replace').strip() + for string_match in match.strings + for instance in string_match.instances + ] + + +def _entry_has_no_trailing_version(entry, os_string): + return os_string.strip() == entry.strip() + + +def _strip_leading_zeroes(version_string: str) -> str: + prefix, suffix = '', '' + while version_string and version_string[0] not in string.digits: + prefix += version_string[0] + version_string = version_string[1:] + while version_string and version_string[-1] not in string.digits: + suffix = version_string[-1] + suffix + version_string = version_string[:-1] + elements = [] + for element in version_string.split('.'): + try: + elements.append(str(int(element))) + except ValueError: + elements.append(element) + return prefix + '.'.join(elements) + suffix diff --git a/src/plugins/analysis/software_components/test/test_plugin_software_components.py b/src/plugins/analysis/software_components/test/test_plugin_software_components.py index 439532383..beb5d8a2d 100644 --- a/src/plugins/analysis/software_components/test/test_plugin_software_components.py +++ b/src/plugins/analysis/software_components/test/test_plugin_software_components.py @@ -2,46 +2,44 @@ import pytest -from objects.file import FileObject +from ..code.software_components import AnalysisPlugin, _entry_has_no_trailing_version, get_version -from ..code.software_components import AnalysisPlugin - -YARA_TEST_FILE = str(Path(__file__).parent / 'data' / 'yara_test_file') +YARA_TEST_FILE = Path(__file__).parent / 'data' / 'yara_test_file' @pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) class TestAnalysisPluginsSoftwareComponents: def test_process_object(self, analysis_plugin): - test_file = FileObject(file_path=YARA_TEST_FILE) - - processed_file = analysis_plugin.process_object(test_file) - results = processed_file.processed_analysis[analysis_plugin.NAME] - assert len(results) == 2, 'incorrect number of software components found' - assert 'MyTestRule' in results, 'test Rule match not found' - assert ( - results['MyTestRule']['meta']['software_name'] == 'Test Software' - ), 'incorrect software name from yara meta' - assert ( - results['MyTestRule']['meta']['website'] == 'http://www.fkie.fraunhofer.de' - ), 'incorrect website from yara meta' - assert ( - results['MyTestRule']['meta']['description'] == 'This is a test rule' - ), 'incorrect description from yara meta' - assert results['MyTestRule']['meta']['open_source'], 'incorrect open-source flag from yara meta' - assert (10, '$a', 'MyTestRule 0.1.3.') in results['MyTestRule']['strings'], 'string not found' - assert '0.1.3' in results['MyTestRule']['meta']['version'], 'Version not detected' - assert len(results['MyTestRule']['strings']) == 1, 'to much strings found' - assert len(results['summary']) == 1, 'Number of summary results not correct' - assert 'Test Software 0.1.3' in results['summary'] + with YARA_TEST_FILE.open('rb') as fp: + results = analysis_plugin.analyze(fp, {}, {}) + + assert len(results.software_components) == 1, 'incorrect number of software components found' + software_result = results.software_components[0] + assert software_result.rule == 'MyTestRule', 'incorrect yara rule name' + assert software_result.name == 'Test Software', 'incorrect software name from yara meta' + assert software_result.website == 'http://www.fkie.fraunhofer.de', 'incorrect website from yara meta' + assert software_result.description == 'This is a test rule', 'incorrect description from yara meta' + assert software_result.open_source, 'incorrect open-source flag from yara meta' + + assert len(software_result.matching_strings) == 1, 'too many strings found' + string_match = software_result.matching_strings[0] + assert string_match.string == 'MyTestRule 0.1.3.', 'string not found' + assert string_match.offset == 10 + assert string_match.identifier == '$a' + assert '0.1.3' in software_result.versions, 'Version not detected' + + summary = analysis_plugin.summarize(results) + assert len(summary) == 1, 'Number of summary results not correct' + assert 'Test Software 0.1.3' in summary @pytest.mark.parametrize( ('version', 'expected_output', 'meta_dict'), [ - ('', '', {}), + ('', None, {}), ('Foo 15.14.13', '15.14.13', {}), ('Foo 1.0', '1.0', {}), ('Foo 1.1.1b', '1.1.1b', {}), - ('Foo', '', {}), + ('Foo', None, {}), ('Foo 01.02.03', '1.2.3', {}), ('Foo 00.1.', '0.1', {}), ('\x001.22.333\x00', '1.22.333', {}), @@ -54,43 +52,21 @@ def test_process_object(self, analysis_plugin): ], ) def test_get_version(self, analysis_plugin, version, expected_output, meta_dict): - assert analysis_plugin.get_version(version, meta_dict) == expected_output, f'{version} not found correctly' + assert get_version(version, meta_dict) == expected_output, f'{version} not found correctly' def test_get_version_from_meta(self, analysis_plugin): version = 'v15.14.1a' assert ( - analysis_plugin.get_version(f'Foo {version}', {'version_regex': 'v\\d\\d\\.\\d\\d\\.\\d[a-z]'}) == version + get_version(f'Foo {version}', {'version_regex': 'v\\d\\d\\.\\d\\d\\.\\d[a-z]'}) == version ), 'version not found correctly' def test_entry_has_no_trailing_version(self, analysis_plugin): - assert not analysis_plugin._entry_has_no_trailing_version('Linux', 'Linux 4.15.0-22') - assert analysis_plugin._entry_has_no_trailing_version('Linux', 'Linux') - assert analysis_plugin._entry_has_no_trailing_version(' Linux', 'Linux ') - - def test_add_os_key_fail(self, analysis_plugin): - test_file = FileObject(file_path=YARA_TEST_FILE) - with pytest.raises(KeyError): - analysis_plugin.add_os_key(test_file) - - test_file.processed_analysis[analysis_plugin.NAME] = {'summary': ['OpenSSL']} - analysis_plugin.add_os_key(test_file) - assert 'tags' not in test_file.processed_analysis[analysis_plugin.NAME] - - def test_add_os_key_success(self, analysis_plugin): - test_file = FileObject(file_path=YARA_TEST_FILE) - test_file.processed_analysis[analysis_plugin.NAME] = {'summary': ['Linux Kernel']} - analysis_plugin.add_os_key(test_file) - assert 'tags' in test_file.processed_analysis[analysis_plugin.NAME] - assert test_file.processed_analysis[analysis_plugin.NAME]['tags']['OS']['value'] == 'Linux Kernel' - - def test_update_os_key(self, analysis_plugin): - test_file = FileObject(file_path=YARA_TEST_FILE) - - test_file.processed_analysis[analysis_plugin.NAME] = { - 'summary': ['Linux Kernel'], - 'tags': {'OS': {'value': 'Fire OS'}}, - } - - assert test_file.processed_analysis[analysis_plugin.NAME]['tags']['OS']['value'] == 'Fire OS' - analysis_plugin.add_os_key(test_file) - assert test_file.processed_analysis[analysis_plugin.NAME]['tags']['OS']['value'] == 'Linux Kernel' + assert not _entry_has_no_trailing_version('Linux', 'Linux 4.15.0-22') + assert _entry_has_no_trailing_version('Linux', 'Linux') + assert _entry_has_no_trailing_version(' Linux', 'Linux ') + + def test_get_tags(self, analysis_plugin): + assert analysis_plugin.get_tags({}, ['OpenSSL']) == [] + tags = analysis_plugin.get_tags({}, ['Linux Kernel']) + assert tags != [] + assert tags[0].value == 'Linux Kernel' diff --git a/src/plugins/analysis/software_components/view/software_components.html b/src/plugins/analysis/software_components/view/software_components.html index 2a6f76d18..9bbc328ef 100644 --- a/src/plugins/analysis/software_components/view/software_components.html +++ b/src/plugins/analysis/software_components/view/software_components.html @@ -2,7 +2,7 @@ {% block analysis_result_details %} - {% for key in analysis_result %} + {% for software_data in analysis_result.software_components %}
identifier | +offset | +string | +
---|---|---|
{{ string_data.identifier }} | +{{ string_data.offset }} | +{{ string_data.string }} | +