Skip to content

Commit

Permalink
plugin users&pws: ported to new base class
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke authored and maringuu committed Dec 5, 2024
1 parent 960280f commit f2b9250
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 258 deletions.
195 changes: 55 additions & 140 deletions src/plugins/analysis/users_and_passwords/code/password_file_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,152 +1,67 @@
from __future__ import annotations

import logging
import re
from base64 import b64decode
from contextlib import suppress
from itertools import chain
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

from docker.types import Mount
import pydantic
from pydantic import Field

from analysis.PluginBase import AnalysisBasePlugin
from helperFunctions.docker import run_docker_container
from helperFunctions.fileSystem import get_src_dir
from analysis.plugin import AnalysisPluginV0, Tag
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.tag import TagColor
from plugins.analysis.users_and_passwords.internal.credentials_finder import (
CredentialResult,
HtpasswdCredentialFinder,
MosquittoCredentialFinder,
UnixCredentialFinder,
)
from plugins.mime_blacklists import MIME_BLACKLIST_NON_EXECUTABLE

if TYPE_CHECKING:
from collections.abc import Callable

from objects.file import FileObject

JOHN_PATH = Path(__file__).parent.parent / 'bin' / 'john'
JOHN_POT = Path(__file__).parent.parent / 'bin' / 'john.pot'
WORDLIST_PATH = Path(get_src_dir()) / 'bin' / 'passwords.txt'
USER_NAME_REGEX = rb'[a-zA-Z][a-zA-Z0-9_-]{2,15}'
UNIX_REGEXES = [
USER_NAME_REGEX + rb':[^:]?:\d+:\d*:[^:]*:[^:]*:[^\n ]*',
USER_NAME_REGEX + rb':\$[1256][ay]?\$[a-zA-Z0-9\./+]*\$[a-zA-Z0-9\./+]{16,128}={0,2}', # MD5 / Blowfish / SHA
USER_NAME_REGEX + rb':[a-zA-Z0-9\./=]{13}:\d*:\d*:', # DES
]
HTPASSWD_REGEXES = [
USER_NAME_REGEX + rb':\$apr1\$[a-zA-Z0-9\./+=]+\$[a-zA-Z0-9\./+]{22}', # MD5 apr1
USER_NAME_REGEX + rb':\{SHA\}[a-zA-Z0-9\./+]{27}=', # SHA-1
]
MOSQUITTO_REGEXES = [rb'[a-zA-Z][a-zA-Z0-9_-]{2,15}\:\$6\$[a-zA-Z0-9+/=]+\$[a-zA-Z0-9+/]{86}==']
RESULTS_DELIMITER = '=== Results: ==='


class AnalysisPlugin(AnalysisBasePlugin):
"""
This plug-in tries to find and crack passwords
"""

NAME = 'users_and_passwords'
DEPENDENCIES = [] # noqa: RUF012
MIME_BLACKLIST = MIME_BLACKLIST_NON_EXECUTABLE
DESCRIPTION = 'search for UNIX, httpd, and mosquitto password files, parse them and try to crack the passwords'
VERSION = '0.5.4'
FILE = __file__

def process_object(self, file_object: FileObject) -> FileObject:
if self.NAME not in file_object.processed_analysis:
file_object.processed_analysis[self.NAME] = {}
file_object.processed_analysis[self.NAME]['summary'] = []
self.find_password_entries(file_object, UNIX_REGEXES, generate_unix_entry)
self.find_password_entries(file_object, HTPASSWD_REGEXES, generate_htpasswd_entry)
self.find_password_entries(file_object, MOSQUITTO_REGEXES, generate_mosquitto_entry)
return file_object

def find_password_entries(self, file_object: FileObject, regex_list: list[bytes], entry_gen_function: Callable):
for passwd_regex in regex_list:
passwd_entries = re.findall(passwd_regex, file_object.binary)
for entry in passwd_entries:
self.update_file_object(file_object, entry_gen_function(entry))

def _add_found_password_tag(self, file_object: FileObject, result: dict):
for password_entry in result:
if 'password' in result[password_entry]:
username = password_entry.split(':', 1)[0]
password = result[password_entry]['password']
self.add_analysis_tag(
file_object, f'{username}_{password}', f'Password: {username}:{password}', TagColor.RED, True
)

def update_file_object(self, file_object: FileObject, result_entry: dict):
file_object.processed_analysis[self.NAME].update(result_entry)
file_object.processed_analysis[self.NAME]['summary'].extend(list(result_entry))
self._add_found_password_tag(file_object, result_entry)


def generate_unix_entry(entry: bytes) -> dict:
user_name, pw_hash, *_ = entry.split(b':')
result_entry = {'type': 'unix', 'entry': _to_str(entry)}
try:
if pw_hash.startswith(b'$') or _is_des_hash(pw_hash):
result_entry['password-hash'] = _to_str(pw_hash)
result_entry['cracked'] = crack_hash(b':'.join((user_name, pw_hash)), result_entry)
except (IndexError, AttributeError, TypeError):
logging.warning(f'Unsupported password format: {entry}', exc_info=True)
return {f'{_to_str(user_name)}:unix': result_entry}


def generate_htpasswd_entry(entry: bytes) -> dict:
user_name, pw_hash = entry.split(b':')
result_entry = {'type': 'htpasswd', 'entry': _to_str(entry), 'password-hash': _to_str(pw_hash)}
result_entry['cracked'] = crack_hash(entry, result_entry)
return {f'{_to_str(user_name)}:htpasswd': result_entry}


def generate_mosquitto_entry(entry: bytes) -> dict:
entry_decoded = _to_str(entry)
user, _, _, salt_hash, passwd_hash, *_ = re.split(r'[:$]', entry_decoded)
passwd_entry = f'{user}:$dynamic_82${b64decode(passwd_hash).hex()}$HEX${b64decode(salt_hash).hex()}'
result_entry = {'type': 'mosquitto', 'entry': entry_decoded, 'password-hash': passwd_hash}
result_entry['cracked'] = crack_hash(passwd_entry.encode(), result_entry, '--format=dynamic_82')
return {f'{user}:mosquitto': result_entry}


def _is_des_hash(pw_hash: str) -> bool:
return len(pw_hash) == 13 # noqa: PLR2004


def crack_hash(passwd_entry: bytes, result_entry: dict, format_term: str = '') -> bool:
with NamedTemporaryFile() as fp:
fp.write(passwd_entry)
fp.seek(0)
john_process = run_docker_container(
'fact/john:alpine-3.18',
command=f'/work/input_file {format_term}',
mounts=[
Mount('/work/input_file', fp.name, type='bind'),
Mount('/root/.john/john.pot', str(JOHN_POT), type='bind'),
],
logging_label='users_and_passwords',
from io import FileIO


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(pydantic.BaseModel):
unix: List[CredentialResult] = Field(description='The list of found UNIX credentials.')
http: List[CredentialResult] = Field(description='The list of found HTTP basic auth credentials.')
mosquitto: List[CredentialResult] = Field(description='The list of found Mosquitto MQTT broker credentials.')

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
name='users_and_passwords',
description=(
'search for UNIX, httpd, and mosquitto password files, parse them and try to crack the passwords'
),
version='1.0.0',
Schema=self.Schema,
mime_blacklist=MIME_BLACKLIST_NON_EXECUTABLE,
),
)
result_entry['log'] = john_process.stdout
if 'No password hashes loaded' in john_process.stdout:
result_entry['ERROR'] = 'hash type is not supported'
return False
output = parse_john_output(john_process.stdout)
if output:
if any('0 password hashes cracked' in line for line in output):
result_entry['ERROR'] = 'password cracking not successful'
return False
with suppress(IndexError):
result_entry['password'] = output[0].split(':')[1]
return True
return False


def parse_john_output(john_output: str) -> list[str]:
if RESULTS_DELIMITER in john_output:
start_offset = john_output.find(RESULTS_DELIMITER) + len(RESULTS_DELIMITER) + 1 # +1 is '\n' after delimiter
return [line for line in john_output[start_offset:].split('\n') if line]
return []

def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema:
del virtual_file_path, analyses
file_contents = Path(file_handle.name).read_bytes()
return self.Schema(
unix=UnixCredentialFinder.find_credentials(file_contents),
http=HtpasswdCredentialFinder.find_credentials(file_contents),
mosquitto=MosquittoCredentialFinder.find_credentials(file_contents),
)

def _to_str(byte_str: bytes) -> str:
"""result entries must be converted from `bytes` to `str` in order to be saved as JSON"""
return byte_str.decode(errors='replace')
def summarize(self, result: Schema) -> list[str]:
return [f'{entry.username}:{entry.type}' for entry in chain(result.unix, result.http, result.mosquitto)]

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del summary
return [
Tag(
name=f'{entry.username}_{entry.password}',
value=f'Password: {entry.username}:{entry.password}',
color=TagColor.RED,
propagate=True,
)
for entry in chain(result.unix, result.http, result.mosquitto)
if entry.password
]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

from contextlib import suppress
from pathlib import Path
from tempfile import NamedTemporaryFile

from docker.types import Mount

from helperFunctions.docker import run_docker_container

JOHN_POT = Path(__file__).parent.parent / 'bin' / 'john.pot'
RESULTS_DELIMITER = '=== Results: ==='


def crack_hash(passwd_entry: bytes, format_term: str = '') -> tuple[str | None, str | None]:
with NamedTemporaryFile() as fp:
fp.write(passwd_entry)
fp.seek(0)
john_process = run_docker_container(
'fact/john:alpine-3.18',
command=f'/work/input_file {format_term}',
mounts=[
Mount('/work/input_file', fp.name, type='bind'),
Mount('/root/.john/john.pot', str(JOHN_POT), type='bind'),
],
logging_label='users_and_passwords',
)
if 'No password hashes loaded' in john_process.stdout:
return None, 'hash type is not supported'
output = _parse_john_output(john_process.stdout)
if output:
if any('0 password hashes cracked' in line for line in output):
return None, 'password cracking not successful'
with suppress(IndexError):
return output[0].split(':')[1], None
return None, None


def _parse_john_output(john_output: str) -> list[str]:
if RESULTS_DELIMITER in john_output:
start_offset = john_output.find(RESULTS_DELIMITER) + len(RESULTS_DELIMITER) + 1 # +1 is '\n' after delimiter
return [line for line in john_output[start_offset:].split('\n') if line]
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from __future__ import annotations

import abc
import logging
import re
from base64 import b64decode
from typing import Optional

import pydantic
from pydantic import Field

from .crack_password import crack_hash

USER_NAME_REGEX = rb'[a-zA-Z][a-zA-Z0-9_-]{2,15}'
DES_HASH_LENGTH = 13


class CredentialResult(pydantic.BaseModel):
username: str = Field(description='The username.')
full_entry: str = Field(description='The full entry in unparsed form.')
type: str = Field(description='The type of credential (UNIX, htpasswd, etc.).')
password_hash: Optional[str] = Field(description='The password in hashed form.', default=None)
password: Optional[str] = Field(description='The password (if the hash was available and cracked).', default=None)
error: Optional[str] = Field(
description='Error message (if cracking the password hash was not successful).',
default=None,
)


class CredentialFinder(abc.ABC):
REGEX_LIST: tuple[re.Pattern]

@classmethod
def find_credentials(cls, file_contents: bytes) -> list[CredentialResult]:
return [
cls._parse_entry(pw_entry)
for passwd_regex in cls.REGEX_LIST
for pw_entry in passwd_regex.findall(file_contents)
]

@staticmethod
@abc.abstractmethod
def _parse_entry(entry: bytes) -> CredentialResult:
...


class UnixCredentialFinder(CredentialFinder):
REGEX_LIST = (
re.compile(USER_NAME_REGEX + rb':[^:]?:\d+:\d*:[^:]*:[^:]*:[^\n ]*'),
# MD5 / Blowfish / SHA
re.compile(USER_NAME_REGEX + rb':\$[1256][ay]?\$[a-zA-Z0-9./+]*\$[a-zA-Z0-9./+]{16,128}={0,2}'),
re.compile(USER_NAME_REGEX + rb':[a-zA-Z0-9./=]{13}:\d*:\d*:'), # DES
)

@staticmethod
def _parse_entry(entry: bytes) -> CredentialResult:
user_name, pw_hash, *_ = entry.split(b':')
password, error = None, None
try:
if pw_hash.startswith(b'$') or _is_des_hash(pw_hash):
password, error = crack_hash(b':'.join((user_name, pw_hash)))
except (IndexError, AttributeError, TypeError):
error = f'Unsupported password format: {entry}'
logging.warning(error, exc_info=True)
return CredentialResult(
username=_to_str(user_name),
full_entry=_to_str(entry),
type='unix',
password_hash=_to_str(pw_hash),
password=password,
error=error,
)


def _is_des_hash(pw_hash: str) -> bool:
return len(pw_hash) == DES_HASH_LENGTH


class HtpasswdCredentialFinder(CredentialFinder):
REGEX_LIST = (
re.compile(USER_NAME_REGEX + rb':\$apr1\$[a-zA-Z0-9./+=]+\$[a-zA-Z0-9./+]{22}'), # MD5 apr1
re.compile(USER_NAME_REGEX + rb':\{SHA}[a-zA-Z0-9./+]{27}='), # SHA-1
)

@staticmethod
def _parse_entry(entry: bytes) -> CredentialResult:
user_name, pw_hash = entry.split(b':')
password, error = crack_hash(entry)
return CredentialResult(
username=_to_str(user_name),
full_entry=_to_str(entry),
type='http',
password_hash=_to_str(pw_hash),
password=password,
error=error,
)


class MosquittoCredentialFinder(CredentialFinder):
REGEX_LIST = (re.compile(rb'[a-zA-Z][a-zA-Z0-9_-]{2,15}:\$6\$[a-zA-Z0-9+/=]+\$[a-zA-Z0-9+/]{86}=='),)

@staticmethod
def _parse_entry(entry: bytes) -> CredentialResult:
user, _, _, salt_hash, passwd_hash, *_ = re.split(r'[:$]', _to_str(entry))
passwd_entry = f'{user}:$dynamic_82${b64decode(passwd_hash).hex()}$HEX${b64decode(salt_hash).hex()}'
password, error = crack_hash(passwd_entry.encode(), '--format=dynamic_82')
return CredentialResult(
username=user,
full_entry=_to_str(entry),
type='mosquitto',
password_hash=passwd_entry,
password=password,
error=error,
)


def _to_str(byte_str: bytes) -> str:
"""
result entries must be converted from `bytes` to `str` in order to be saved as JSON
"""
return byte_str.decode(errors='replace')
Loading

0 comments on commit f2b9250

Please sign in to comment.