diff --git a/src/api.py b/src/api.py index 92ef033..804b78b 100644 --- a/src/api.py +++ b/src/api.py @@ -6,9 +6,12 @@ def parse(family: str, config: Dict[str, Any]) -> IocCollection: iocs = IocCollection() + + # Handlers for special-cased families if family in modules.modules: iocs = modules.modules[family](config) + # Generic parser for the rest of the fields modules.parse(config, iocs) return iocs diff --git a/src/model.py b/src/model.py index 1425eda..8d45086 100644 --- a/src/model.py +++ b/src/model.py @@ -1,3 +1,4 @@ +import logging import re from base64 import b64encode from enum import Enum @@ -10,6 +11,8 @@ from .errors import IocExtractError +log = logging.getLogger(__name__) + PUBKEY_PEM_TEMPLATE = ( "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----" ) @@ -230,7 +233,7 @@ def try_add_rsa_from_pem(self, pem: str) -> None: if pem: self.add_rsa_key(RsaKey.parse_pem(pem)) except IocExtractError: - pass + log.warning("Failed to parse a RSA key from PEM") def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None: pem = PUBKEY_PEM_TEMPLATE.format(b64encode(blob).decode()) @@ -238,13 +241,13 @@ def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None: try: self.add_rsa_key(RsaKey.parse_pem(pem)) except IocExtractError: - pass + log.warning("Failed to parse a RSA key from ASN1") def try_add_rsa_from_base64(self, pem: str) -> None: try: self.add_rsa_key(RsaKey.parse_base64(pem)) except IocExtractError: - pass + log.warning("Failed to parse a RSA key from base64") def add_network_location(self, netloc: NetworkLocation) -> None: self.network_locations.append(netloc) @@ -253,13 +256,17 @@ def add_host_port( self, host: str, port: Union[str, int], schema: str = "unknown" ) -> None: if isinstance(port, str): - port_val = int(port) + try: + port_val = int(port) + except ValueError: + log.warning("Failed to add URL from host_port") + return else: port_val = port try: self.try_add_url(f"{schema}://{host}:{port_val}") except IocExtractError: - pass + log.warning("Failed to add URL from host_port") def try_add_url( self, url: str, location_type: LocationType = LocationType.CNC @@ -271,7 +278,7 @@ def try_add_url( NetworkLocation(url, location_type=location_type) ) except IocExtractError: - pass + log.warning("Failed to add URL directly") def add_password(self, password: str) -> None: self.passwords.append(password) diff --git a/src/modules.py b/src/modules.py index 63ee69f..0cc8bdf 100644 --- a/src/modules.py +++ b/src/modules.py @@ -1,14 +1,31 @@ +import logging import string from base64 import b64decode from typing import Any, Dict, List -from .errors import ModuleAlreadyRegisteredError +from .errors import IocExtractError, ModuleAlreadyRegisteredError from .model import EcdsaCurve, IocCollection, LocationType, RsaKey modules: Dict[str, Any] = {} +log = logging.getLogger(__name__) -# Utils +class CantFindHostForDomain(IocExtractError): + """Can't find a host for the domain when adding url.""" + + pass + + +class InvalidDomainObject(IocExtractError): + """Adding URL from something other than string or a dict.""" + + pass + + +class UnknownRsaKeyType(IocExtractError): + """Can't guess the RSA key format.""" + + pass def module(name): @@ -50,11 +67,9 @@ def add_url(iocs: IocCollection, config: Dict[str, Any], key: str) -> None: iocs.try_add_url(domain[hostkey]) break else: - raise NotImplementedError("Can't find a host for the domain") + raise CantFindHostForDomain() else: - raise NotImplementedError( - "The domain has to be either a string or a list" - ) + raise InvalidDomainObject() def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None: @@ -87,7 +102,7 @@ def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None: iocs.try_add_rsa_from_asn1_bytes(enc_bytes.rstrip(b"\x00")) continue - raise NotImplementedError("Unknown RSA key type") + raise UnknownRsaKeyType() def add_key(iocs: IocCollection, config: Dict, key: str, keytype: str) -> None: @@ -106,7 +121,10 @@ def add_mutex(iocs: IocCollection, config: Dict, key: str) -> None: def parse(config: Dict[str, Any], iocs: IocCollection) -> None: for name in ["publickey", "rsapub", "rsakey", "pubkey", "privkey"]: - add_rsa_key(iocs, config, name) + try: + add_rsa_key(iocs, config, name) + except UnknownRsaKeyType: + log.warning("Unknown RSA key type") for name in [ "urls", @@ -372,7 +390,7 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection: iocs.add_rsa_key(RsaKey(n=n, e=e)) del config["rsa_pub"] except Exception: - pass + log.warning("Failed to parse a lockbit key") return iocs diff --git a/src/run.py b/src/run.py index dea59f8..07fe7da 100644 --- a/src/run.py +++ b/src/run.py @@ -1,4 +1,5 @@ import argparse +import logging from mwdblib import MWDB # type: ignore @@ -14,8 +15,16 @@ def main(): parser.add_argument( "config_id", help="Config to parse", default=None, nargs="?" ) + parser.add_argument( + "-v", "--verbose", help="Print debug logs", action="store_true" + ) args = parser.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + mwdb = MWDB() mwdb.login(args.mwdb_user, args.mwdb_pass) @@ -28,9 +37,10 @@ def main(): for cfg in mwdb.recent_configs(): if cfg.type != "static": continue - print(cfg.id) + print(cfg.family, cfg.id) iocs = parse(cfg.family, cfg.cfg) print(iocs.prettyprint()) + print() continue diff --git a/tests/download_test_data.py b/tests/download_test_data.py index d2c22b5..06a69ef 100644 --- a/tests/download_test_data.py +++ b/tests/download_test_data.py @@ -1,6 +1,7 @@ import argparse -import os import json +import os + from mwdblib import Malwarecage, MalwarecageConfig # type: ignore diff --git a/tests/reparse_file.py b/tests/reparse_file.py index 892a4a4..5f13dad 100644 --- a/tests/reparse_file.py +++ b/tests/reparse_file.py @@ -1,6 +1,7 @@ import argparse -import os import json +import os + from src.api import parse diff --git a/tests/test_parse_regression.py b/tests/test_parse_regression.py index fab2cc7..0fbb9d6 100644 --- a/tests/test_parse_regression.py +++ b/tests/test_parse_regression.py @@ -1,7 +1,8 @@ -import unittest import json -from src.api import parse import os +import unittest + +from src.api import parse class TestParseRegression(unittest.TestCase):