Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: converted software components plugin to new base class #1332

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/plugins/analysis/cve_lookup/code/cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def process_object(self, file_object: FileObject) -> FileObject:
cves = {'cve_results': {}}
connection = DbConnection(f'sqlite:///{DB_PATH}')
lookup = Lookup(file_object, connection, match_any=self.match_any)
for value in file_object.processed_analysis['software_components']['result'].values():
product = value['meta']['software_name']
version = value['meta']['version'][0]
for sw_dict in file_object.processed_analysis['software_components']['result'].get('software_components', []):
product = sw_dict['name']
version = sw_dict['versions'][0] if sw_dict['versions'] else None
if product and version:
vulnerabilities = lookup.lookup_vulnerabilities(product, version)
if vulnerabilities:
Expand Down
17 changes: 8 additions & 9 deletions src/plugins/analysis/cve_lookup/test/test_cve_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@

SOFTWARE_COMPONENTS_ANALYSIS_RESULT = {
'result': {
'dnsmasq': {'meta': {'software_name': 'Dnsmasq', 'version': ['2.40']}},
'OpenSSL': {
'matches': True,
'meta': {
'software_components': [
{'name': 'Dnsmasq', 'versions': ['2.40']},
{
'name': 'OpenSSL',
'description': 'SSL library',
'open_source': True,
'software_name': 'OpenSSL',
'version': [''],
'versions': [],
'website': 'https://www.openssl.org',
'rule': 'OpenSSL',
'matching_strings': [{'offset': 7194, 'identifier': '$a', 'string': 'T1BFTlNTTA=='}],
},
'rule': 'OpenSSL',
'strings': [[7194, '$a', 'T1BFTlNTTA==']],
},
],
},
'analysis_date': 1563453634.37708,
'plugin_version': '0.3.2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AnalysisPlugin(YaraBasePlugin):
NAME = 'known_vulnerabilities'
DESCRIPTION = 'Rule based detection of known vulnerabilities like Heartbleed'
DEPENDENCIES = ['file_hashes', 'software_components'] # noqa: RUF012
VERSION = '0.3.0'
VERSION = '0.3.1'
FILE = __file__

def process_object(self, file_object):
Expand All @@ -47,15 +47,18 @@ def process_object(self, file_object):

def get_matched_vulnerabilities(self, yara_result: list[tuple[str, dict]], file_object) -> list[tuple[str, dict]]:
software_components_results = file_object.processed_analysis.get('software_components', {}).get('result', {})
software_by_name = {
sw_dict['name']: sw_dict for sw_dict in software_components_results.get('software_components', [])
}
matched_vulnerabilities = self._check_vulnerabilities(file_object.processed_analysis)

# CVE-2021-45608 NetUSB
if 'NetUSB' in software_components_results:
if 'NetUSB' in software_by_name:
matched_vulnerabilities.extend(self._check_netusb_vulnerability(file_object.file_path))

# CVE-2024-3094 XZ Backdoor secondary detection
if 'liblzma' in software_components_results and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_components_results))
if 'liblzma' in software_by_name and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_by_name['liblzma']))
return matched_vulnerabilities

def add_tags(self, file_object, vulnerability_list):
Expand Down Expand Up @@ -131,7 +134,7 @@ def _check_netusb_vulnerability(self, file_path: str) -> list[tuple[str, dict]]:


def _check_xz_backdoor(software_results: dict) -> list[tuple[str, dict]]:
if any(v in software_results['liblzma']['meta']['version'] for v in ['5.6.0', '5.6.1']):
if any(v in software_results['versions'] for v in ['5.6.0', '5.6.1']):
return [
(
'XZ Backdoor',
Expand Down
29 changes: 24 additions & 5 deletions src/plugins/analysis/known_vulnerabilities/internal/rulebook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from copy import deepcopy

RELATIONS = {
Expand Down Expand Up @@ -36,7 +38,7 @@ def _make_type_assertions(self, link, rule):
(self.score in ['low', 'medium', 'high'], 'score has to be one of low, medium or high'),
(isinstance(self.description, str), 'description must be a string'),
(
isinstance(self.rule, (SingleRule, MetaRule, SubPathRule)),
isinstance(self.rule, (SingleRule, MetaRule, SubPathRule, SoftwareRule)),
f'rule must be of type in [SingleRule, MetaRule, SubPathRule]. Has type {type(rule)}',
),
(isinstance(self.link, str) or not link, 'if link is set it has to be a string'),
Expand Down Expand Up @@ -95,6 +97,22 @@ def __init__(self, base_path, meta_rule):
self.meta_rule = meta_rule


class SoftwareRule:
def __init__(self, software_name: str, affected_versions: set[str]):
self.software_name = software_name
self.affected_versions = affected_versions

def evaluate(self, processed_analysis: dict[str, dict]) -> bool:
software_components = (
processed_analysis.get('software_components', {}).get('result', {}).get('software_components', [])
)
return any(
sw['name'] == self.software_name and v in self.affected_versions
for sw in software_components
for v in sw['versions']
)


def evaluate(analysis, rule):
try:
if isinstance(rule, MetaRule):
Expand All @@ -103,6 +121,8 @@ def evaluate(analysis, rule):
result = _evaluate_single_rule(analysis, rule)
elif isinstance(rule, SubPathRule):
result = _evaluate_sub_path_rule(analysis, rule)
elif isinstance(rule, SoftwareRule):
result = rule.evaluate(analysis)
else:
raise TypeError('rule must be of one in types [SingleRule, MetaRule, SubPathRule]')
return result
Expand Down Expand Up @@ -153,10 +173,9 @@ def _get_dotted_path_from_dictionary(dictionary, dotted_path):


def vulnerabilities():
heartbleed_rule = SingleRule(
value_path=['software_components.result.OpenSSL.meta.version'],
relation='intersection',
comparison=[f'1.0.1{minor}' for minor in 'abcde'],
heartbleed_rule = SoftwareRule(
software_name='OpenSSL',
affected_versions={f'1.0.1{minor}' for minor in 'abcde'},
)
heartbleed_vulnerability = Vulnerability(
rule=heartbleed_rule,
Expand Down
81 changes: 38 additions & 43 deletions src/plugins/analysis/known_vulnerabilities/test/data/sc.json
Original file line number Diff line number Diff line change
@@ -1,61 +1,56 @@
{
"analysis_date": 1517491696.332023,
"plugin_version": "0.3",
"analysis_date": 1737551071.128211,
"plugin_version": "1.0.0",
"result": {
"OpenSSL": {
"matches": true,
"meta": {
"software_components": [
{
"description": "SSL library",
"matching_strings": [
{
"identifier": "$a",
"offset": 13696143,
"string": "T3BlblNzbA=="
},
{
"identifier": "$a",
"offset": 14147446,
"string": "T3BlblNTTCAwLjkuOA=="
}
],
"name": "OpenSSL",
"open_source": true,
"software_name": "OpenSSL",
"version": [
"",
"rule": "OpenSSL",
"versions": [
"1.0.1a"
],
"website": "https://www.openssl.org"
},
"rule": "OpenSSL",
"strings": [
[
13696143,
"$a",
"T3BlblNzbA=="
],
[
14147446,
"$a",
"T3BlblNTTCAwLjkuOA=="
]
]
},
"VxWorks": {
"matches": true,
"meta": {
{
"description": "Operating system for embedded devices",
"matching_strings": [
{
"identifier": "$b",
"offset": 12748719,
"string": "dnh3b3Jrcy02LjU="
},
{
"identifier": "$b",
"offset": 12748792,
"string": "VnhXb3JrcyA2LjU="
}
],
"name": "VxWorks",
"open_source": false,
"software_name": "VxWorks",
"version": [
"rule": "VxWorks",
"versions": [
"6.5"
],
"website": "http://www.windriver.com/products/vxworks/"
},
"rule": "VxWorks",
"strings": [
[
12748719,
"$b",
"dnh3b3Jrcy02LjU="
],
[
12748792,
"$b",
"VnhXb3JrcyA2LjU="
]
]
}
}
]
},
"summary": [
"OpenSSL ",
"OpenSSL",
"OpenSSL 0.9.8",
"VxWorks 6.5"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def test_process_object_software(self, analysis_plugin):
def test_process_object_software_wrong_version(self, analysis_plugin):
test_file = FileObject(file_path=str(TEST_DATA_DIR / 'empty'))
test_file.processed_analysis['file_hashes'] = {'result': {'sha256': '1234'}}
self._software_components_result['result']['OpenSSL']['meta']['version'] = ['0.9.8', '1.0.0', '']
test_file.processed_analysis['software_components'] = self._software_components_result
software_components_result = {**self._software_components_result}
software_components_result['result']['software_components'][0]['versions'] = ['0.9.8', '1.0.0']
test_file.processed_analysis['software_components'] = software_components_result

results = analysis_plugin.process_object(test_file).processed_analysis[analysis_plugin.NAME]

Expand Down Expand Up @@ -94,7 +95,9 @@ def test_xz_backdoor_1st(self, analysis_plugin):
def test_xz_backdoor_2nd(self, analysis_plugin):
test_file = FileObject(file_path=str(TEST_DATA_DIR / 'empty'))
assert test_file.binary is not None
test_file.processed_analysis['software_components'] = {'result': {'liblzma': {'meta': {'version': ['5.6.1']}}}}
test_file.processed_analysis['software_components'] = {
'result': {'software_components': [{'name': 'liblzma', 'versions': ['5.6.1']}]}
}
fo = analysis_plugin.process_object(test_file)
result = fo.processed_analysis['known_vulnerabilities']
assert 'XZ Backdoor' in result
Expand Down
28 changes: 28 additions & 0 deletions src/plugins/analysis/known_vulnerabilities/test/test_rulebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
RELATIONS,
MetaRule,
SingleRule,
SoftwareRule,
SubPathRule,
_evaluate_meta_rule,
_evaluate_single_rule,
Expand All @@ -13,6 +14,7 @@
evaluate,
)

# FixMe: result structure changed when IP&URI plugin was ported to new base class
IPS = {
'ip_and_uri_finder': {
'summary': ['1', '2', '3', '4', 'a', 'b', 'c'],
Expand Down Expand Up @@ -107,6 +109,32 @@ def test_evaluate_base_rule():
assert not _evaluate_sub_path_rule(IPS, sub_path_no_match)


def test_software_rule():
rule = SoftwareRule(software_name='foo', affected_versions={'v1.2.2', 'v1.2.3'})
processed_analysis_match = {
'software_components': {
'result': {
'software_components': [
{'name': 'foo', 'versions': ['v1.2.2']},
]
}
}
}
processed_analysis_no_match = {
'software_components': {
'result': {
'software_components': [
{'name': 'foo', 'versions': ['v1.2.1', 'v1.2.4']},
{'name': 'bar', 'versions': ['v1.2.2']},
]
}
}
}

assert rule.evaluate(processed_analysis_match) is True
assert rule.evaluate(processed_analysis_no_match) is False


def test_evaluate_bad_type():
with pytest.raises(TypeError):
evaluate({}, object())
Loading
Loading