Skip to content

Commit

Permalink
crypto material plugin: converted to new base class
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Dec 9, 2024
1 parent ad4d6d8 commit e33954f
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 143 deletions.
155 changes: 94 additions & 61 deletions src/plugins/analysis/crypto_material/code/crypto_material.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,117 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, NamedTuple
from typing import TYPE_CHECKING, List, NamedTuple

from analysis.YaraPluginBase import YaraBasePlugin
from pydantic import BaseModel, Field

from analysis.plugin import AnalysisPluginV0, Tag, addons, compat
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.hash import get_md5
from helperFunctions.tag import TagColor
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED

from ..internal.key_parser import read_asn1_key, read_pkcs_cert, read_ssl_cert

if TYPE_CHECKING:
import io
from collections.abc import Callable

STARTEND = [
'PgpPublicKeyBlock',
'PgpPrivateKeyBlock',
'PgpPublicKeyBlock_GnuPG',
'genericPublicKey',
'SshRsaPrivateKeyBlock',
'SshEncryptedRsaPrivateKeyBlock',
'SSLPrivateKey',
]
STARTONLY = ['SshRsaPublicKeyBlock']
PKCS8 = 'Pkcs8PrivateKey'
PKCS12 = 'Pkcs12Certificate'
SSLCERT = 'SSLCertificate'


class Match(NamedTuple):
offset: int
label: str
matched_string: str


class AnalysisPlugin(YaraBasePlugin):
def _read_from_file(file_handle: io.FileIO, start: int, end: int) -> bytes:
file_handle.seek(start)
return file_handle.read(end - start)


class CryptoMaterialMatch(BaseModel):
rule: str = Field(description='The YARA rule that matched this crypto material')
material: List[str] = Field(description='An array with the contents of the matched keys/certificates')
count: int = Field(description='The number of matched keys/certificates')
hashes: List[str] = Field(description='The MD5 hashes of the keys/certificates (in the same order as `material`)')


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
"""
Searches for known Crypto material (e.g., public and private keys)
"""

NAME = 'crypto_material'
DESCRIPTION = 'detects crypto material like SSH keys and SSL certificates'
VERSION = '0.5.2'
MIME_BLACKLIST = ['filesystem'] # noqa: RUF012
FILE = __file__

STARTEND = [ # noqa: RUF012
'PgpPublicKeyBlock',
'PgpPrivateKeyBlock',
'PgpPublicKeyBlock_GnuPG',
'genericPublicKey',
'SshRsaPrivateKeyBlock',
'SshEncryptedRsaPrivateKeyBlock',
'SSLPrivateKey',
]
STARTONLY = ['SshRsaPublicKeyBlock'] # noqa: RUF012
PKCS8 = 'Pkcs8PrivateKey'
PKCS12 = 'Pkcs12Certificate'
SSLCERT = 'SSLCertificate'

def process_object(self, file_object):
file_object = super().process_object(file_object)
yara_results = file_object.processed_analysis[self.NAME]
analysis_result = self.convert_yara_result(yara_results, file_object.binary)
analysis_result['summary'] = list(analysis_result)

file_object.processed_analysis[self.NAME] = analysis_result
self._add_private_key_tag(file_object, analysis_result)
return file_object

def convert_yara_result(self, yara_results, binary):
analysis_result = {}
for matching_rule in yara_results.get('summary', []):
matches = [Match(*t) for t in yara_results[matching_rule]['strings']]
class Schema(BaseModel):
matches: List[CryptoMaterialMatch] = Field(description='A list of matched crypto material')

def __init__(self):
metadata = self.MetaData(
name='crypto_material',
description='detects crypto material like SSH keys and SSL certificates',
version='1.0.0',
mime_blacklist=['filesystem', *MIME_BLACKLIST_COMPRESSED],
Schema=self.Schema,
)
super().__init__(metadata=metadata)
self._yara = addons.Yara(plugin=self)

def analyze(self, file_handle: io.FileIO, virtual_file_path: str, analyses: dict) -> Schema:
del virtual_file_path, analyses
raw_yara_results = [compat.yara_match_to_dict(m) for m in self._yara.match(file_handle)]
return self.Schema(matches=self.convert_yara_result(raw_yara_results, file_handle))

def convert_yara_result(self, yara_results: list[dict], file_handle: io.FileIO) -> list[CryptoMaterialMatch]:
analysis_result = []
for matching_rule in yara_results:
matches = [Match(*t) for t in matching_rule['strings']]
matches.sort(key=lambda m: m.offset)
parsing_function = self._get_parsing_function(matching_rule)
parsing_function = self._get_parsing_function(matching_rule['rule'])
if not parsing_function:
continue
crypto_items = parsing_function(matches=matches, binary=binary)
crypto_items: list[str] = parsing_function(matches=matches, file_handle=file_handle)
hashes = [get_md5(item) for item in crypto_items]
if crypto_items:
analysis_result[matching_rule] = {'material': crypto_items, 'count': len(crypto_items)}
analysis_result.append(
CryptoMaterialMatch(
rule=matching_rule['rule'],
material=crypto_items,
count=len(crypto_items),
hashes=hashes,
)
)
return analysis_result

def _get_parsing_function(self, match: str) -> Callable | None:
if match in self.STARTEND:
if match in STARTEND:
return self.extract_labeled_keys
if match in self.STARTONLY:
if match in STARTONLY:
return self.extract_start_only_key
if match == self.PKCS8:
if match == PKCS8:
return self.get_pkcs8_key
if match == self.PKCS12:
if match == PKCS12:
return self.get_pkcs12_cert
if match == self.SSLCERT:
if match == SSLCERT:
return self.get_ssl_cert
logging.warning(f'Unknown crypto rule match: {match}')
return None

def extract_labeled_keys(self, matches: list[Match], binary, min_key_len=128) -> list[str]:
def extract_labeled_keys(self, matches: list[Match], file_handle: io.FileIO, min_key_len=128) -> list[str]:
return [
binary[start:end].decode(encoding='utf_8', errors='replace')
_read_from_file(file_handle, start, end).decode(encoding='utf_8', errors='replace')
for start, end in self.get_offset_pairs(matches)
if end - start > min_key_len
]
Expand All @@ -92,34 +121,33 @@ def extract_start_only_key(matches: list[Match], **_) -> list[str]:
return [match.matched_string for match in matches if match.label == '$start_string']

@staticmethod
def get_pkcs8_key(matches: list[Match], binary=None) -> list[str]:
def get_pkcs8_key(matches: list[Match], file_handle: io.FileIO) -> list[str]:
keys = []
for match in matches:
key = read_asn1_key(binary=binary, offset=match.offset)
key = read_asn1_key(file_handle=file_handle, offset=match.offset)
if key is not None:
keys.append(key)
return keys

@staticmethod
def get_pkcs12_cert(matches: list[Match], binary=None) -> list[str]:
def get_pkcs12_cert(matches: list[Match], file_handle: io.FileIO) -> list[str]:
keys = []
for match in matches:
text_cert = read_pkcs_cert(binary=binary, offset=match.offset)
text_cert = read_pkcs_cert(file_handle=file_handle, offset=match.offset)
if text_cert is not None:
keys.append(text_cert)
return keys

def get_ssl_cert(self, matches: list[Match], binary=None) -> list[str]:
def get_ssl_cert(self, matches: list[Match], file_handle: io.FileIO) -> list[str]:
contents = []
for pair in self.get_offset_pairs(matches):
start_index, end_index = pair
text_cert = read_ssl_cert(binary=binary, start=start_index, end=end_index)
for start_index, end_index in self.get_offset_pairs(matches):
text_cert = read_ssl_cert(file_handle=file_handle, start=start_index, end=end_index)
if text_cert is not None:
contents.append(text_cert)
return contents

@staticmethod
def get_offset_pairs(matches: list[Match]):
def get_offset_pairs(matches: list[Match]) -> list[tuple[int, int]]:
pairs = []
for index in range(len(matches) - 1):
if _is_consecutive_key_block(matches, index):
Expand All @@ -130,15 +158,20 @@ def get_offset_pairs(matches: list[Match]):
pairs.append((matches[index].offset, _calculate_end_index(matches[index + 3])))
return pairs

def _add_private_key_tag(self, file_object, result):
if any('private' in key.lower() for key in result):
self.add_analysis_tag(
file_object=file_object,
tag_name='private_key_inside',
def summarize(self, result: AnalysisPlugin.Schema) -> list[str]:
return [match.rule for match in result.matches]

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del result
if any('private' in item.lower() for item in summary):
tag = Tag(
name='private_key_inside',
value='Private Key Found',
color=TagColor.ORANGE,
propagate=True,
)
return [tag]
return []


def _is_consecutive_key_block(matches: list[Match], index: int) -> bool:
Expand Down
94 changes: 56 additions & 38 deletions src/plugins/analysis/crypto_material/internal/key_parser.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,89 @@
from __future__ import annotations

import logging
from struct import unpack
from typing import TYPE_CHECKING

import OpenSSL
import OpenSSL.crypto as ssl
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12

from helperFunctions.data_conversion import make_unicode_string

TLV_KNOWN_STARTS = [0x30]
if TYPE_CHECKING:
import io

TLV_KNOWN_STARTS = {0x30}
LENGTH_TO_FORMAT = {
1: '>b',
2: '>h',
4: '>i',
}
DER_LIMIT = 0x80
DER_HEADER_SIZE = 2

def _get_start_and_size_of_der_field(binary=None, offset=None):
if binary[offset + 1] > 127: # noqa: PLR2004
length_of_length = binary[offset + 1] ^ 0x80
logging.debug(f'[LOG] - Length {length_of_length}')
form_string = _determine_format_string(length_of_length)
return (
offset + 2 + length_of_length,
unpack(form_string, binary[(offset + 2) : (offset + 2 + length_of_length)])[0],
)
return offset + 2, binary[offset + 1]

def _read_der_key(file_handle: io.FileIO, offset: int) -> bytes | None:
file_handle.seek(offset + 1)
value = int.from_bytes(file_handle.read(1), byteorder='little')
# The field at offset + 1 is the length field. If the value is > 0x80, the field contains only the size (+0x80)
# and the actual length is in the next field
if value >= DER_LIMIT:
value ^= DER_LIMIT
logging.debug(f'[LOG] - Length {value}')
length = unpack(_determine_format_string(value), file_handle.read(value))[0] + value
else:
length = value
# we need to reset the file pointer because we need the entire key (including the length field)
file_handle.seek(offset)
return file_handle.read(length + DER_HEADER_SIZE)

def _determine_format_string(length=None):
if length not in [1, 2, 4]:
logging.warning('Unregular format in DER encoding')
return None
formats = ['>b', '>h', None, '>i']
return formats[length - 1]

def _determine_format_string(length: int | None) -> str | None:
if length not in LENGTH_TO_FORMAT:
raise ValueError('Irregular format in DER encoding')
return LENGTH_TO_FORMAT[length]


def read_asn1_key(binary: bytes, offset: int):
if binary[offset] not in TLV_KNOWN_STARTS:
def read_asn1_key(file_handle: io.FileIO, offset: int):
file_handle.seek(offset)
start = int.from_bytes(file_handle.read(1), byteorder='little')
if start not in TLV_KNOWN_STARTS:
return None
start, size = _get_start_and_size_of_der_field(binary=binary, offset=offset)
try:
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_ASN1, binary[offset : start + size])
return make_unicode_string(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_TEXT, key))
except OpenSSL.crypto.Error:
file_handle.seek(offset)
key_data = _read_der_key(file_handle=file_handle, offset=offset)
key = ssl.load_privatekey(ssl.FILETYPE_ASN1, key_data)
return make_unicode_string(ssl.dump_privatekey(ssl.FILETYPE_TEXT, key))
except ssl.Error:
logging.debug('Found PKCS#8 key signature, but looks false positive')
return None
except TypeError:
logging.warning('Found PKCS#8 key signature but openssl binding could not decode it.')
return None


def read_pkcs_cert(binary: bytes, offset: int):
if binary[offset] not in TLV_KNOWN_STARTS:
def read_pkcs_cert(file_handle: io.FileIO, offset: int):
file_handle.seek(offset)
value = int.from_bytes(file_handle.read(1), byteorder='little')
if value not in TLV_KNOWN_STARTS:
return None
start, size = _get_start_and_size_of_der_field(binary=binary, offset=offset)
try:
private_key, certificate, additional_certificates = pkcs12.load_key_and_certificates(
binary[offset : start + size], None
)
x509_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, certificate.public_bytes(serialization.Encoding.PEM)
)
return make_unicode_string(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_TEXT, cert=x509_cert))
key_data = _read_der_key(file_handle=file_handle, offset=offset)
private_key, certificate, additional_certificates = pkcs12.load_key_and_certificates(key_data, None)
x509_cert = ssl.load_certificate(ssl.FILETYPE_PEM, certificate.public_bytes(serialization.Encoding.PEM))
return make_unicode_string(ssl.dump_certificate(type=ssl.FILETYPE_TEXT, cert=x509_cert))
except ValueError:
logging.debug('Found PKCS#12 certificate, but passphrase is missing or false positive.')
return None


def read_ssl_cert(binary: bytes, start: int, end: int):
def read_ssl_cert(file_handle: io.FileIO, start: int, end: int):
try:
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, binary[start : end + 25])
return make_unicode_string(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, cert))
except OpenSSL.crypto.Error:
file_handle.seek(start)
key_data = file_handle.read(end - start + 25)
cert = ssl.load_certificate(ssl.FILETYPE_PEM, key_data)
return make_unicode_string(ssl.dump_certificate(ssl.FILETYPE_TEXT, cert))
except ssl.Error:
logging.debug('Found SSL certificate signature, but looks false positive')
return None
Loading

0 comments on commit e33954f

Please sign in to comment.