Skip to content

Commit

Permalink
feat: converted software components plugin to new base class
Browse files Browse the repository at this point in the history
also adjusted known_vuln. and cve_lookup plugins to work with the new result structure
  • Loading branch information
jstucke committed Jan 23, 2025
1 parent 57e7c49 commit 0785fc3
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 259 deletions.
6 changes: 3 additions & 3 deletions src/plugins/analysis/cve_lookup/code/cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def process_object(self, file_object: FileObject) -> FileObject:
cves = {'cve_results': {}}
connection = DbConnection(f'sqlite:///{DB_PATH}')
lookup = Lookup(file_object, connection, match_any=self.match_any)
for value in file_object.processed_analysis['software_components']['result'].values():
product = value['meta']['software_name']
version = value['meta']['version'][0]
for sw_dict in file_object.processed_analysis['software_components']['result'].get('software_components', []):
product = sw_dict['name']
version = sw_dict['versions'][0] if sw_dict['versions'] else None
if product and version:
vulnerabilities = lookup.lookup_vulnerabilities(product, version)
if vulnerabilities:
Expand Down
17 changes: 8 additions & 9 deletions src/plugins/analysis/cve_lookup/test/test_cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@

SOFTWARE_COMPONENTS_ANALYSIS_RESULT = {
'result': {
'dnsmasq': {'meta': {'software_name': 'Dnsmasq', 'version': ['2.40']}},
'OpenSSL': {
'matches': True,
'meta': {
'software_components': [
{'name': 'Dnsmasq', 'versions': ['2.40']},
{
'name': 'OpenSSL',
'description': 'SSL library',
'open_source': True,
'software_name': 'OpenSSL',
'version': [''],
'versions': [],
'website': 'https://www.openssl.org',
'rule': 'OpenSSL',
'matching_strings': [{'offset': 7194, 'identifier': '$a', 'string': 'T1BFTlNTTA=='}],
},
'rule': 'OpenSSL',
'strings': [[7194, '$a', 'T1BFTlNTTA==']],
},
],
},
'analysis_date': 1563453634.37708,
'plugin_version': '0.3.2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AnalysisPlugin(YaraBasePlugin):
NAME = 'known_vulnerabilities'
DESCRIPTION = 'Rule based detection of known vulnerabilities like Heartbleed'
DEPENDENCIES = ['file_hashes', 'software_components'] # noqa: RUF012
VERSION = '0.3.0'
VERSION = '0.3.1'
FILE = __file__

def process_object(self, file_object):
Expand All @@ -47,15 +47,18 @@ def process_object(self, file_object):

def get_matched_vulnerabilities(self, yara_result: list[tuple[str, dict]], file_object) -> list[tuple[str, dict]]:
software_components_results = file_object.processed_analysis.get('software_components', {}).get('result', {})
software_by_name = {
sw_dict['name']: sw_dict for sw_dict in software_components_results.get('software_components', [])
}
matched_vulnerabilities = self._check_vulnerabilities(file_object.processed_analysis)

# CVE-2021-45608 NetUSB
if 'NetUSB' in software_components_results:
if 'NetUSB' in software_by_name:
matched_vulnerabilities.extend(self._check_netusb_vulnerability(file_object.file_path))

# CVE-2024-3094 XZ Backdoor secondary detection
if 'liblzma' in software_components_results and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_components_results))
if 'liblzma' in software_by_name and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_by_name['liblzma']))
return matched_vulnerabilities

def add_tags(self, file_object, vulnerability_list):
Expand Down Expand Up @@ -131,7 +134,7 @@ def _check_netusb_vulnerability(self, file_path: str) -> list[tuple[str, dict]]:


def _check_xz_backdoor(software_results: dict) -> list[tuple[str, dict]]:
if any(v in software_results['liblzma']['meta']['version'] for v in ['5.6.0', '5.6.1']):
if any(v in software_results['versions'] for v in ['5.6.0', '5.6.1']):
return [
(
'XZ Backdoor',
Expand Down
29 changes: 24 additions & 5 deletions src/plugins/analysis/known_vulnerabilities/internal/rulebook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from copy import deepcopy

RELATIONS = {
Expand Down Expand Up @@ -36,7 +38,7 @@ def _make_type_assertions(self, link, rule):
(self.score in ['low', 'medium', 'high'], 'score has to be one of low, medium or high'),
(isinstance(self.description, str), 'description must be a string'),
(
isinstance(self.rule, (SingleRule, MetaRule, SubPathRule)),
isinstance(self.rule, (SingleRule, MetaRule, SubPathRule, SoftwareRule)),
f'rule must be of type in [SingleRule, MetaRule, SubPathRule]. Has type {type(rule)}',
),
(isinstance(self.link, str) or not link, 'if link is set it has to be a string'),
Expand Down Expand Up @@ -95,6 +97,22 @@ def __init__(self, base_path, meta_rule):
self.meta_rule = meta_rule


class SoftwareRule:
def __init__(self, software_name: str, affected_versions: set[str]):
self.software_name = software_name
self.affected_versions = affected_versions

def evaluate(self, processed_analysis: dict[str, dict]) -> bool:
software_components = (
processed_analysis.get('software_components', {}).get('result', {}).get('software_components', [])
)
return any(
sw['name'] == self.software_name and v in self.affected_versions
for sw in software_components
for v in sw['versions']
)


def evaluate(analysis, rule):
try:
if isinstance(rule, MetaRule):
Expand All @@ -103,6 +121,8 @@ def evaluate(analysis, rule):
result = _evaluate_single_rule(analysis, rule)
elif isinstance(rule, SubPathRule):
result = _evaluate_sub_path_rule(analysis, rule)
elif isinstance(rule, SoftwareRule):
result = rule.evaluate(analysis)
else:
raise TypeError('rule must be of one in types [SingleRule, MetaRule, SubPathRule]')
return result
Expand Down Expand Up @@ -153,10 +173,9 @@ def _get_dotted_path_from_dictionary(dictionary, dotted_path):


def vulnerabilities():
heartbleed_rule = SingleRule(
value_path=['software_components.result.OpenSSL.meta.version'],
relation='intersection',
comparison=[f'1.0.1{minor}' for minor in 'abcde'],
heartbleed_rule = SoftwareRule(
software_name='OpenSSL',
affected_versions={f'1.0.1{minor}' for minor in 'abcde'},
)
heartbleed_vulnerability = Vulnerability(
rule=heartbleed_rule,
Expand Down
81 changes: 38 additions & 43 deletions src/plugins/analysis/known_vulnerabilities/test/data/sc.json
Original file line number Diff line number Diff line change
@@ -1,61 +1,56 @@
{
"analysis_date": 1517491696.332023,
"plugin_version": "0.3",
"analysis_date": 1737551071.128211,
"plugin_version": "1.0.0",
"result": {
"OpenSSL": {
"matches": true,
"meta": {
"software_components": [
{
"description": "SSL library",
"matching_strings": [
{
"identifier": "$a",
"offset": 13696143,
"string": "T3BlblNzbA=="
},
{
"identifier": "$a",
"offset": 14147446,
"string": "T3BlblNTTCAwLjkuOA=="
}
],
"name": "OpenSSL",
"open_source": true,
"software_name": "OpenSSL",
"version": [
"",
"rule": "OpenSSL",
"versions": [
"1.0.1a"
],
"website": "https://www.openssl.org"
},
"rule": "OpenSSL",
"strings": [
[
13696143,
"$a",
"T3BlblNzbA=="
],
[
14147446,
"$a",
"T3BlblNTTCAwLjkuOA=="
]
]
},
"VxWorks": {
"matches": true,
"meta": {
{
"description": "Operating system for embedded devices",
"matching_strings": [
{
"identifier": "$b",
"offset": 12748719,
"string": "dnh3b3Jrcy02LjU="
},
{
"identifier": "$b",
"offset": 12748792,
"string": "VnhXb3JrcyA2LjU="
}
],
"name": "VxWorks",
"open_source": false,
"software_name": "VxWorks",
"version": [
"rule": "VxWorks",
"versions": [
"6.5"
],
"website": "http://www.windriver.com/products/vxworks/"
},
"rule": "VxWorks",
"strings": [
[
12748719,
"$b",
"dnh3b3Jrcy02LjU="
],
[
12748792,
"$b",
"VnhXb3JrcyA2LjU="
]
]
}
}
]
},
"summary": [
"OpenSSL ",
"OpenSSL",
"OpenSSL 0.9.8",
"VxWorks 6.5"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def test_process_object_software(self, analysis_plugin):
def test_process_object_software_wrong_version(self, analysis_plugin):
test_file = FileObject(file_path=str(TEST_DATA_DIR / 'empty'))
test_file.processed_analysis['file_hashes'] = {'result': {'sha256': '1234'}}
self._software_components_result['result']['OpenSSL']['meta']['version'] = ['0.9.8', '1.0.0', '']
test_file.processed_analysis['software_components'] = self._software_components_result
software_components_result = {**self._software_components_result}
software_components_result['result']['software_components'][0]['versions'] = ['0.9.8', '1.0.0']
test_file.processed_analysis['software_components'] = software_components_result

results = analysis_plugin.process_object(test_file).processed_analysis[analysis_plugin.NAME]

Expand Down Expand Up @@ -94,7 +95,9 @@ def test_xz_backdoor_1st(self, analysis_plugin):
def test_xz_backdoor_2nd(self, analysis_plugin):
test_file = FileObject(file_path=str(TEST_DATA_DIR / 'empty'))
assert test_file.binary is not None
test_file.processed_analysis['software_components'] = {'result': {'liblzma': {'meta': {'version': ['5.6.1']}}}}
test_file.processed_analysis['software_components'] = {
'result': {'software_components': [{'name': 'liblzma', 'versions': ['5.6.1']}]}
}
fo = analysis_plugin.process_object(test_file)
result = fo.processed_analysis['known_vulnerabilities']
assert 'XZ Backdoor' in result
Expand Down
28 changes: 28 additions & 0 deletions src/plugins/analysis/known_vulnerabilities/test/test_rulebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
RELATIONS,
MetaRule,
SingleRule,
SoftwareRule,
SubPathRule,
_evaluate_meta_rule,
_evaluate_single_rule,
Expand All @@ -13,6 +14,7 @@
evaluate,
)

# FixMe: result structure changed when IP&URI plugin was ported to new base class
IPS = {
'ip_and_uri_finder': {
'summary': ['1', '2', '3', '4', 'a', 'b', 'c'],
Expand Down Expand Up @@ -107,6 +109,32 @@ def test_evaluate_base_rule():
assert not _evaluate_sub_path_rule(IPS, sub_path_no_match)


def test_software_rule():
rule = SoftwareRule(software_name='foo', affected_versions={'v1.2.2', 'v1.2.3'})
processed_analysis_match = {
'software_components': {
'result': {
'software_components': [
{'name': 'foo', 'versions': ['v1.2.2']},
]
}
}
}
processed_analysis_no_match = {
'software_components': {
'result': {
'software_components': [
{'name': 'foo', 'versions': ['v1.2.1', 'v1.2.4']},
{'name': 'bar', 'versions': ['v1.2.2']},
]
}
}
}

assert rule.evaluate(processed_analysis_match) is True
assert rule.evaluate(processed_analysis_no_match) is False


def test_evaluate_bad_type():
with pytest.raises(TypeError):
evaluate({}, object())
Loading

0 comments on commit 0785fc3

Please sign in to comment.