diff --git a/alert/main.py b/alert/main.py index 2140862..cbdc65b 100644 --- a/alert/main.py +++ b/alert/main.py @@ -1,26 +1,28 @@ #!/usr/bin/env python3 """ -main.py (WIP) +main.py - Alert Function triggered by PubSub to output results to any - variety of services that exist. - - Outputs we can ideally dispatch out to: - * GitHub Comments (using authorization token) - * Slack Channel - * Threat Intelligence Products + Alert Function triggered by PubSub to output detected malicious forks to issue tracker. """ import json import base64 -from github import Github - +from github import Github, RateLimitExceededException def handler(request): - """Responds to any HTTP request. - Args: - request (flask.Request): HTTP request object. - Returns: - The response text or any set of values + try: + _handler(request) + + # in the edge case where we've exhausted the rate limit from + # analysis earlier, backoff creating issue alerts until the next hour. + except RateLimitExceededException as err: + return ("", 500) + + return ("", 204) + +def _handler(request): + """ + An alert may already be generated for a fork, so do a search for existing + issues that may already rise """ envelope = request.get_json() if not envelope: @@ -34,5 +36,31 @@ def handler(request): data = envelope["message"]["data"] payload = base64.b64decode(data).decode("utf-8") payload = json.loads(payload) - print(payload) - return "" + + # create client for processing + gh = Github(payload["token"]) + + parent = payload["parent"] + child = payload['name'] + repo = gh.get_repo(parent) + + # get or create a Fork Sentry label for search and tagging issues + + # get all fork sentry issues and detect if this fork already has been picked up previously + issues = gh.get_issues() + for issue in issues: + + # if found, create a comment under the issue + pass + + # create content for issue + title = f":warning: Fork Sentry: {child} is potentially malicious" + body = """## Suspicious Files & Indicators\n""" + + if payload["typosquatting"]: + body += "The fork appears to be __typosquatting__ your repository, creating opportunities to get victims that misspell your repo's name.\n" + + for path, indicators in payload["suspicious"].items(): + pass + + repo.create_issue(title=title, body=body) \ No newline at end of file diff --git a/analyzer/main.py b/analyzer/main.py index ce224c4..2937018 100644 --- a/analyzer/main.py +++ b/analyzer/main.py @@ -58,7 +58,7 @@ def handler(): # rate limit reached, backoff by pushing to seperate queue except github.RateLimitExceededException as err: analysis.backoff_queue(payload) - return ("", 204) + return ("", 500) # handle all other runtime errors except Exception as err: diff --git a/analyzer/repo_analysis.py b/analyzer/repo_analysis.py index 23d5b0c..7040c84 100644 --- a/analyzer/repo_analysis.py +++ b/analyzer/repo_analysis.py @@ -23,6 +23,7 @@ import ssdeep import requests from github import Github +from sqlalchemy import create_engine from google.cloud import pubsub_v1, storage from google.cloud import logging as cloudlogging @@ -45,8 +46,9 @@ _handler = _client.get_default_handler() logger.addHandler(_handler) -# url for serverless redis host -redis_host = os.environ.get("REDISHOST") +# database url +db_url = os.getenv("DATABASE_URL") +engine = create_engine(db_url, echo=True, future=True) # static analysis scanner scanner = clamd.ClamdUnixSocket() @@ -86,16 +88,6 @@ def __init__( if vt_token: self.vt_client = vt.Client(vt_token) - # Minhash for binary similarity - self.lsh = MinHashLSH( - threshold=0.75, - num_perm=128, - storage_config={ - "type": "redis", - "redis": {"host": redis_host, "port": 6379}, - }, - ) - def _analyze_artifact(self, path) -> t.Optional[str]: """ Heuristics to check if a modified file is suspicious and should be enqueued for further analysis. @@ -122,7 +114,7 @@ def _analyze_artifact(self, path) -> t.Optional[str]: iocs += ["binary"] targets += [path] - # do binary similarity analysis with samples we've seen + # do binary similarity analysis with samples we've seen results = self._detect_sims(path) if not results is None: return results @@ -144,16 +136,22 @@ def _analyze_artifact(self, path) -> t.Optional[str]: # threat detection time for target in targets: - # trigger ClamAV scan first with open(target, "rb") as fd: - contents = io.BytesIO(fd.read()) - results = scanner.instream(contents) - for path, tags in results.items(): - found, name = tags[0], tags[1] - if found == "FOUND": - iocs += [f"clamav:{name}"] + contents = fd.read() + iobuf = io.BytesIO(contents) - # trigger scan with VTotal enterprise + # trigger ClamAV scan first + results = scanner.instream(iobuf) + for path, tags in results.items(): + found, name = tags[0], tags[1] + if found == "FOUND": + iocs += [f"clamav:{name}"] + + # trigger scan with VTotal if API key is supplied + if self.vt_client: + analysis = self.vt_client.scan_file(contents, wait_for_completion=True) + vtotal_mal = analysis.last_analysis_stats["malicious"] + iocs += [f"virustotal:{vtotal_mal}"] # diffed file is not suspicious if len(iocs) == 0: @@ -165,14 +163,21 @@ def _analyze_artifact(self, path) -> t.Optional[str]: return { "sha256": hashlib.sha256(contents).hexdigest(), - # "ssdeep": ssdeep.hash(contents), "iocs": iocs, } def _detect_sims(self, path: str): """ + Given a binary, generate a fuzzy hash, and query for matching items against our database. """ - pass + with open(path, "rb") as fd: + fhash = ssdeep.hash(fd.read()) + + # recover attributes from fuzzy hash + chunksize, chunk, double_chunk = fhash.split(':') + chunksize = int(chunksize) + + def detect_suspicious(self): """ @@ -182,6 +187,7 @@ def detect_suspicious(self): logger.debug(f"Analyzing {self.repo_name}") results = { # metadata needed for further processing + "parent": self.orig_name, "name": self.repo_name, "token": self.token, # actually malicious indicators @@ -365,7 +371,7 @@ def _generate_alerts(self, results) -> None: def backoff_queue(self) -> None: """ Re-enqueue a repository to a seperate queue if we hit the rate limit. That one will - push all requests back to this analyzer scheduled in the next hour. + push all requests back to this analyzer scheduled in the next hour. (TODO) """ msg = json.dumps(results, indent=2).encode("utf-8") publisher.publish(topic, alerts) diff --git a/analyzer/requirements.txt b/analyzer/requirements.txt index 8b7620b..c9e8969 100644 --- a/analyzer/requirements.txt +++ b/analyzer/requirements.txt @@ -11,6 +11,7 @@ PyGithub Flask requests gunicorn +sqlalchemy sentry-sdk[flask] python-dotenv @@ -18,5 +19,4 @@ python-dotenv lief clamd ssdeep -vt-py -#datasketch \ No newline at end of file +vt-py \ No newline at end of file diff --git a/terraform/pubsub.tf b/terraform/pubsub.tf index 7a271c5..78a0a1d 100644 --- a/terraform/pubsub.tf +++ b/terraform/pubsub.tf @@ -15,7 +15,7 @@ resource "google_pubsub_subscription" "fork_analyzer_sub" { ack_deadline_seconds = 10 - // Make push subscription to the CLoud Run listener endpoint + // Make push subscription to the Cloud Run listener endpoint push_config { push_endpoint = google_cloud_run_service.analyzer.status[0].url