Skip to content

Commit

Permalink
Xz vuln detection (#1271)
Browse files Browse the repository at this point in the history
* Added xz backdoor (known_vulnerabilities) and added capabilities to retrieve versions from functions (software_components)
  • Loading branch information
jstucke authored Oct 1, 2024
1 parent d96db1d commit adfbfe8
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 70 deletions.
119 changes: 91 additions & 28 deletions src/plugins/analysis/ipc/docker/ipc_analyzer/ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
from collections import OrderedDict

from ghidra.app.decompiler import DecompileOptions, DecompInterface
from ghidra.program.model.symbol import RefType
from ghidra.util.task import ConsoleTaskMonitor

from ipc_analysis.analyze import analyze_function_call_site
from ipc_analysis.helper_functions import get_call_site_pcode_ops, get_relevant_sources
from resolve_format_strings.format_strings import (
filter_relevant_indices,
get_format_specifier_indices,
get_format_string_version,
get_format_types,
get_key_strings,
load_input_data,
)


Expand Down Expand Up @@ -88,7 +90,7 @@ def set_up_decompiler(self, current_program):

def get_result_path():
"""
:return: str
:rtype: str
"""
script_args = getScriptArgs()
if len(script_args) == 1:
Expand Down Expand Up @@ -209,7 +211,7 @@ def openflags_to_symbols(openflags):
"""
Converts open flags to symbol
:param openflags: int
:type openflags: int
:return: str
"""
open_symbols = []
Expand Down Expand Up @@ -251,10 +253,10 @@ def add_json_call(ipc, func_name, arg_values):
"""
Adds an ipc call to the JSON file
:param ipc: dict
:param func_name: str
:param arg_values: list
:return: None
:type ipc: dict
:type func_name: str
:type arg_values: list
:rtype: None
"""
first_arg = arg_values[0]
rest_args = arg_values[1:]
Expand All @@ -273,10 +275,10 @@ def write_to_file(file_name, result, result_path):
"""
Writes the json to file
:param file_name: dict
:param result: str
:param result_path: str
:return: None
:type file_name: str
:type result: list[str]
:type result_path: str
:rtype: None
"""
print('\nWriting {}'.format(result_path + '/' + file_name))
with open(result_path + '/' + file_name, 'wb') as output_file:
Expand All @@ -288,8 +290,8 @@ def write_to_file(file_name, result, result_path):
def resolve_version_format_string(ghidra_analysis, key_string_list):
"""
:param ghidra_analysis: instance of GhidraAnalysis
:param key_string_list: list[str]
:return: list
:type key_string_list: set[str]
:rtype: list[str]
"""
result = []
call_args = {}
Expand All @@ -300,13 +302,48 @@ def resolve_version_format_string(ghidra_analysis, key_string_list):
return result


def find_function_ref_strings(function_name):
"""
Get all strings that are referenced in function `function_name`.
:param function_name: The name of the function.
:type function_name: str
:return: a list of strings referenced in the function
:rtype: list[str]
"""
listing = currentProgram.getListing()

try:
function = getGlobalFunctions(function_name)[0]
except (IndexError, TypeError):
print("Error: Function {} not found.".format(function_name))
return []

# string constants are usually in the .rodata section
rodata_section = getMemoryBlock(".rodata")
if not rodata_section:
print("Error: .rodata section not found.")
return []

# iterate over all instructions of the function and find referenced strings
instructions = listing.getInstructions(function.getBody(), True)
strings = []
while instructions.hasNext():
instruction = instructions.next()
for ref in instruction.getReferencesFrom():
if ref.getReferenceType() == RefType.DATA and rodata_section.contains(ref.getToAddress()):
strings.append(listing.getDataAt(ref.getToAddress()).getValue())
return strings


def get_fstring_from_functions(ghidra_analysis, key_string, call_args, called_fstrings):
"""
:param ghidra_analysis: instance of GhidraAnalysis
:param key_string: str
:param call_args: dict
:param called_fstrings: dict
:return: list[str]
:param key_string: the format string we are looking for
:type key_string: str
:type call_args: dict
:type called_fstrings: dict
:rtype: list[str]
"""
result = []
for func, calls in called_fstrings.items():
Expand All @@ -328,11 +365,11 @@ def get_fstring_from_functions(ghidra_analysis, key_string, call_args, called_fs
def get_fstring_from_call(ghidra_analysis, key_string, call_args, func, call, sources):
"""
:param ghidra_analysis: instance of GhidraAnalysis
:param key_string: str
:param call_args: dict
:param func: ghidra.program.database.function.FunctionDB
:param call: ghidra.program.model.pcode.PcodeOpAST
:param sources: ghidra.program.model.pcode.PcodeOpAST
:type key_string: str
:type call_args: dict
:type func: ghidra.program.database.function.FunctionDB
:type call: ghidra.program.model.pcode.PcodeOpAST
:type sources: ghidra.program.model.pcode.PcodeOpAST
"""
if call in call_args:
arg_values = call_args[call]
Expand All @@ -353,6 +390,35 @@ def get_fstring_from_call(ghidra_analysis, key_string, call_args, func, call, so
return filter_relevant_indices(start, arg_values, indices, format_types)


def find_version_strings(input_data, ghidra_analysis, result_path):
"""
:type input_data: dict
:param ghidra_analysis: instance of GhidraAnalysis
:param result_path: the path of the output data file
:type result_path: str
:rtype: int
"""
mode = input_data.get('mode')
if mode == 'format_string':
key_string_list = input_data.get('key_string_list')
if not key_string_list:
print("Error: key_string_list not found.")
return 1
result_list = resolve_version_format_string(ghidra_analysis, set(key_string_list))
elif mode == 'version_function':
# the elf file contains a special function just for returning its version
function_name = input_data.get('function_name')
if not function_name:
print("Error: Function name not found.")
return 1
result_list = find_function_ref_strings(function_name)
else:
print("Error: Invalid mode.")
return 1
write_to_file('ghidra_output.json', result_list, result_path)
return 0


def main():
"""
:return: int
Expand All @@ -361,12 +427,9 @@ def main():
ghidra_analysis = GhidraAnalysis()

# Resolve version format string
key_string_list = get_key_strings(result_path)
if key_string_list:
key_string_list = set(key_string_list)
result_list = resolve_version_format_string(ghidra_analysis, key_string_list)
write_to_file('ghidra_output.json', result_list, result_path)
return 0
input_data = load_input_data(result_path)
if input_data:
return find_version_strings(input_data, ghidra_analysis, result_path)

# IPC Analysis
ipc = {'ipcCalls': {}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@
from ipc_analysis.helper_functions import iter_array, string_is_printable


def get_key_strings(path):
def load_input_data(path):
"""
Tries to open the key_file for the software_component plugin
:path: str
:return: None/list
:type path: str
:rtype: None | dict
"""
try:
with open(path + '/key_file', 'r') as key_file:
key_strings = json.loads(key_file.read())
data = json.loads(key_file.read())
except IOError:
logging.info('key string file not found')
logging.info('input data file not found')
return None
logging.info('key: {}'.format(repr(key_strings)))
return key_strings
logging.info('input data: {}'.format(repr(data)))
return data


def get_format_string_version(ghidra_analysis, key_string):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
from contextlib import suppress
from pathlib import Path
Expand All @@ -20,7 +22,7 @@ class AnalysisPlugin(YaraBasePlugin):
NAME = 'known_vulnerabilities'
DESCRIPTION = 'Rule based detection of known vulnerabilities like Heartbleed'
DEPENDENCIES = ['file_hashes', 'software_components'] # noqa: RUF012
VERSION = '0.2.1'
VERSION = '0.3.0'
FILE = __file__

def process_object(self, file_object):
Expand All @@ -30,11 +32,7 @@ def process_object(self, file_object):
file_object.processed_analysis[self.NAME] = {}

binary_vulnerabilities = self._post_process_yara_results(yara_results)
matched_vulnerabilities = self._check_vulnerabilities(file_object.processed_analysis)

# CVE-2021-45608 NetUSB
if 'NetUSB' in file_object.processed_analysis.get('software_components', {}).get('result', {}):
matched_vulnerabilities.extend(self._check_netusb_vulnerability(file_object.binary))
matched_vulnerabilities = self.get_matched_vulnerabilities(binary_vulnerabilities, file_object)

for name, vulnerability in binary_vulnerabilities + matched_vulnerabilities:
file_object.processed_analysis[self.NAME][name] = vulnerability
Expand All @@ -47,6 +45,19 @@ def process_object(self, file_object):

return file_object

def get_matched_vulnerabilities(self, yara_result: list[tuple[str, dict]], file_object) -> list[tuple[str, dict]]:
software_components_results = file_object.processed_analysis.get('software_components', {}).get('result', {})
matched_vulnerabilities = self._check_vulnerabilities(file_object.processed_analysis)

# CVE-2021-45608 NetUSB
if 'NetUSB' in software_components_results:
matched_vulnerabilities.extend(self._check_netusb_vulnerability(file_object.binary))

# CVE-2024-3094 XZ Backdoor secondary detection
if 'liblzma' in software_components_results and not any(vuln == 'xz_backdoor' for vuln, _ in yara_result):
matched_vulnerabilities.extend(_check_xz_backdoor(software_components_results))
return matched_vulnerabilities

def add_tags(self, file_object, vulnerability_list):
for name, details in vulnerability_list:
if details['score'] == 'none':
Expand Down Expand Up @@ -86,7 +97,7 @@ def _check_vulnerabilities(processed_analysis):

return matched_vulnerabilities

def _check_netusb_vulnerability(self, input_file_data: bytes):
def _check_netusb_vulnerability(self, input_file_data: bytes) -> list[tuple[str, dict]]:
with TemporaryDirectory(prefix='known_vulns_', dir=config.backend.docker_mount_base_dir) as tmp_dir:
tmp_dir_path = Path(tmp_dir)
ghidra_input_file = tmp_dir_path / 'ghidra_input'
Expand Down Expand Up @@ -118,3 +129,23 @@ def _check_netusb_vulnerability(self, input_file_data: bytes):
]
except (json.JSONDecodeError, FileNotFoundError):
return []


def _check_xz_backdoor(software_results: dict) -> list[tuple[str, dict]]:
if any(v in software_results['liblzma']['meta']['version'] for v in ['5.6.0', '5.6.1']):
return [
(
'XZ Backdoor',
{
'description': 'CVE-2024-3094: a malicious backdoor was planted into the xz compression library',
'score': 'high',
# the vulnerability is only contained in certain versions built for debian; a more reliable
# yara rule is in the signatures files
'reliability': 20,
'link': 'https://nvd.nist.gov/vuln/detail/CVE-2024-3094',
'short_name': 'XZ Backdoor',
'additional_data': {},
},
)
]
return []
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,20 @@ rule WPA_Key_Hardcoded {
strings:
$a = /\swpa_passphrase=\S+/ nocase ascii wide
condition: $a
}
}

rule xz_backdoor {
meta:
description = "CVE-2024-3094: a malicious backdoor was planted into the xz compression library"
reliability = "80"
score = "high"
link = "https://nvd.nist.gov/vuln/detail/CVE-2024-3094"
strings:
$a = {f30f1efa554889f54c89ce5389fb81e7000000804883ec28488954241848894c2410}
$b = {488d7c2408f3ab488d4424084889d14c89c74889c2e8????????89c2}
$c = {4d8b6c2408458b3c244c8b6310898578f1ffff31c083bd78f1ffff00f3ab7907}
$d = {31c04989ffb9160000004d89c5488d7c24484d89cef3ab488d442448}
$e = "yolAbejyiejuvnup=Evjtgvsh5okmkAvj"
condition:
any of them
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
�ZR�[~s��0~Qz/yolAbejyiejuvnup=Evjtgvsh5okmkAvj��׸����@��cWH�|$�H�D$H��L��H����SQ�MՉ�W��l�X���4�L�
Loading

0 comments on commit adfbfe8

Please sign in to comment.