Skip to content

Commit

Permalink
feat: converted software components plugin to new base class
Browse files Browse the repository at this point in the history
also adjusted known_vuln. and cve_lookup plugins to work with the new result structure
  • Loading branch information
jstucke committed Jan 22, 2025
1 parent 57e7c49 commit 913fd86
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 195 deletions.
6 changes: 3 additions & 3 deletions src/plugins/analysis/cve_lookup/code/cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def process_object(self, file_object: FileObject) -> FileObject:
cves = {'cve_results': {}}
connection = DbConnection(f'sqlite:///{DB_PATH}')
lookup = Lookup(file_object, connection, match_any=self.match_any)
for value in file_object.processed_analysis['software_components']['result'].values():
product = value['meta']['software_name']
version = value['meta']['version'][0]
for sw_dict in file_object.processed_analysis['software_components']['result'].get('software_components', []):
product = sw_dict['name']
version = sw_dict['versions'][0] if sw_dict['versions'] else None
if product and version:
vulnerabilities = lookup.lookup_vulnerabilities(product, version)
if vulnerabilities:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ def process_object(self, file_object):

def get_matched_vulnerabilities(self, yara_result: list[tuple[str, dict]], file_object) -> list[tuple[str, dict]]:
software_components_results = file_object.processed_analysis.get('software_components', {}).get('result', {})
software_by_name = {
sw_dict['name']: sw_dict for sw_dict in software_components_results.get('software_components', [])
}
matched_vulnerabilities = self._check_vulnerabilities(file_object.processed_analysis)

# CVE-2021-45608 NetUSB
if 'NetUSB' in software_components_results:
if 'NetUSB' in software_by_name:
matched_vulnerabilities.extend(self._check_netusb_vulnerability(file_object.file_path))

# CVE-2024-3094 XZ Backdoor secondary detection
if 'liblzma' in software_components_results and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_components_results))
if 'liblzma' in software_by_name and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_by_name['liblzma']))
return matched_vulnerabilities

def add_tags(self, file_object, vulnerability_list):
Expand Down Expand Up @@ -131,7 +134,7 @@ def _check_netusb_vulnerability(self, file_path: str) -> list[tuple[str, dict]]:


def _check_xz_backdoor(software_results: dict) -> list[tuple[str, dict]]:
if any(v in software_results['liblzma']['meta']['version'] for v in ['5.6.0', '5.6.1']):
if any(v in software_results['version'] for v in ['5.6.0', '5.6.1']):
return [
(
'XZ Backdoor',
Expand Down
242 changes: 142 additions & 100 deletions src/plugins/analysis/software_components/code/software_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,125 +2,167 @@

import re
import string
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List, Optional

from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.YaraPluginBase import YaraBasePlugin
from helperFunctions.data_conversion import make_unicode_string
from analysis.plugin import AnalysisPluginV0, Tag, addons
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.tag import TagColor
from plugins.analysis.software_components.bin import OS_LIST
from plugins.mime_blacklists import MIME_BLACKLIST_NON_EXECUTABLE

from ..internal.resolve_version_format_string import extract_data_from_ghidra

if TYPE_CHECKING:
from objects.file import FileObject
from io import FileIO

import yara

class AnalysisPlugin(YaraBasePlugin):
"""
This plugin identifies software components

Credits:
OS Tagging functionality created by Roman Konertz during Firmware Bootcamp WT17/18 at University of Bonn
Maintained by Fraunhofer FKIE
"""
class SoftwareMatch(BaseModel):
name: str
versions: List[str]
rule: str = Field(description='Matching YARA rule name')
matching_strings: List[MatchingString]
description: Optional[str] = None
open_source: Optional[bool] = None
website: Optional[str] = Field(None, description='Website URL of the software')

NAME = 'software_components'
DESCRIPTION = 'identify software components'
MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE
VERSION = '0.5.1'
FILE = __file__

def process_object(self, file_object):
file_object = super().process_object(file_object)
analysis = file_object.processed_analysis[self.NAME]
if len(analysis) > 1:
analysis = self.add_version_information(analysis, file_object)
analysis['summary'] = self._get_summary(analysis)
class MatchingString(BaseModel):
string: str
offset: int
identifier: str = Field(description='Identifier of the rule that this string matched (e.g. "$a")')

self.add_os_key(file_object)
return file_object

def get_version(self, input_string: str, meta_dict: dict) -> str:
if 'version_regex' in meta_dict:
regex = meta_dict['version_regex'].replace('\\\\', '\\')
else:
regex = r'\d+.\d+(.\d+)?(\w)?'
pattern = re.compile(regex)
version = pattern.search(input_string)
if version is not None:
return self._strip_leading_zeroes(version.group(0))
return ''

@staticmethod
def _get_summary(results: dict) -> list[str]:
summary = set()
for key, result in results.items():
if key != 'summary':
software = result['meta']['software_name']
for version in result['meta']['version']:
summary.add(f'{software} {version}')
return sorted(summary)
class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
software_components: List[SoftwareMatch]

def add_version_information(self, results, file_object: FileObject):
for item in results:
if item != 'summary':
results[item] = self.get_version_for_component(results[item], file_object)
return results

def get_version_for_component(self, result, file_object: FileObject):
versions = set()
for matched_string in result['strings']:
match = matched_string[2].strip()
match = make_unicode_string(match)
versions.add(self.get_version(match, result['meta']))
if any(k in result['meta'] for k in ('format_string', '_version_function')):
if result['meta'].get('format_string'):
input_data = {
'mode': 'format_string',
'key_string_list': [s for _, _, s in result['strings'] if '%s' in s],
}
else:
input_data = {
'mode': 'version_function',
'function_name': result['meta']['_version_function'],
}
versions.update(
extract_data_from_ghidra(file_object.file_path, input_data, config.backend.docker_mount_base_dir)
def __init__(self):
super().__init__(
metadata=(
self.MetaData(
name='software_components',
description='identify software components',
mime_blacklist=MIME_BLACKLIST_NON_EXECUTABLE,
version=Version(1, 0, 0),
Schema=self.Schema,
)
)
if '' in versions and len(versions) > 1: # if there are actual version results, remove the "empty" result
versions.remove('')
result['meta']['version'] = list(versions)
return result
)
self._yara = addons.Yara(plugin=self)

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del virtual_file_path, analyses
return self.Schema(
software_components=[
SoftwareMatch(
name=match.meta.get('software_name'),
rule=match.rule,
matching_strings=_get_matching_strings(match),
versions=get_version_for_component(match, file_handle),
description=match.meta.get('description'),
website=match.meta.get('website'),
open_source=match.meta.get('open_source'),
)
for match in self._yara.match(file_handle)
]
)

def summarize(self, result: Schema) -> list[str]:
summary = set()
for software in result.software_components:
if software.versions:
for version in software.versions:
summary.add(f'{software.name} {version}')
else:
summary.add(software.name)
return sorted(summary)

def add_os_key(self, file_object):
for entry in file_object.processed_analysis[self.NAME]['summary']:
def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del result
tags = []
for entry in summary:
for os_ in OS_LIST:
if entry.find(os_) != -1:
if self._entry_has_no_trailing_version(entry, os_):
self.add_analysis_tag(file_object, 'OS', entry, TagColor.GREEN, True)
if _entry_has_no_trailing_version(entry, os_):
tags.append(Tag(name='OS', value=entry, color=TagColor.GREEN, propagate=True))
else:
self.add_analysis_tag(file_object, 'OS', os_, TagColor.GREEN, False)
self.add_analysis_tag(file_object, 'OS Version', entry, TagColor.GREEN, True)

@staticmethod
def _entry_has_no_trailing_version(entry, os_string):
return os_string.strip() == entry.strip()

@staticmethod
def _strip_leading_zeroes(version_string: str) -> str:
prefix, suffix = '', ''
while version_string and version_string[0] not in string.digits:
prefix += version_string[0]
version_string = version_string[1:]
while version_string and version_string[-1] not in string.digits:
suffix = version_string[-1] + suffix
version_string = version_string[:-1]
elements = []
for element in version_string.split('.'):
try:
elements.append(str(int(element)))
except ValueError:
elements.append(element)
return prefix + '.'.join(elements) + suffix
tags.append(Tag(name='OS', value=os_, color=TagColor.GREEN, propagate=False))
tags.append(Tag(name='OS Version', value=entry, color=TagColor.GREEN, propagate=True))
return tags


def _get_matching_strings(match: yara.Match) -> list[MatchingString]:
return [
MatchingString(
string=instance.matched_data.decode(errors='replace'),
offset=instance.offset,
identifier=_string.identifier,
)
for _string in match.strings # type: yara.StringMatch
for instance in _string.instances # type: yara.StringMatchInstance
]


def get_version_for_component(match: yara.Match, file: FileIO) -> list[str]:
matching_strings = _get_strings_from_match(match)
versions = {get_version(matching_str, match.meta) for matching_str in matching_strings}
if any(k in match.meta for k in ('format_string', '_version_function')):
if match.meta.get('format_string'):
input_data = {
'mode': 'format_string',
'key_string_list': [s for s in matching_strings if '%s' in s],
}
else:
input_data = {
'mode': 'version_function',
'function_name': match.meta['_version_function'],
}
versions.update(extract_data_from_ghidra(file.name, input_data, config.backend.docker_mount_base_dir))
return [v for v in versions if v]


def get_version(input_string: str, meta_dict: dict) -> str | None:
if 'version_regex' in meta_dict:
regex = meta_dict['version_regex'].replace('\\\\', '\\')
else:
regex = r'\d+.\d+(.\d+)?(\w)?'
pattern = re.compile(regex)
version = pattern.search(input_string)
if version is not None:
return _strip_leading_zeroes(version.group(0))
return None


def _get_strings_from_match(match: yara.Match) -> list[str]:
return [
instance.matched_data.decode(errors='replace').strip()
for string_match in match.strings
for instance in string_match.instances
]


def _entry_has_no_trailing_version(entry, os_string):
return os_string.strip() == entry.strip()


def _strip_leading_zeroes(version_string: str) -> str:
prefix, suffix = '', ''
while version_string and version_string[0] not in string.digits:
prefix += version_string[0]
version_string = version_string[1:]
while version_string and version_string[-1] not in string.digits:
suffix = version_string[-1] + suffix
version_string = version_string[:-1]
elements = []
for element in version_string.split('.'):
try:
elements.append(str(int(element)))
except ValueError:
elements.append(element)
return prefix + '.'.join(elements) + suffix
Loading

0 comments on commit 913fd86

Please sign in to comment.