Skip to content

Commit

Permalink
Enhance alert function
Browse files Browse the repository at this point in the history
  • Loading branch information
ex0dus-0x committed Jan 7, 2022
1 parent 8d58e6d commit d8c7f47
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 45 deletions.
62 changes: 45 additions & 17 deletions alert/main.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
#!/usr/bin/env python3
"""
main.py (WIP)
main.py
Alert Function triggered by PubSub to output results to any
variety of services that exist.
Outputs we can ideally dispatch out to:
* GitHub Comments (using authorization token)
* Slack Channel
* Threat Intelligence Products
Alert Function triggered by PubSub to output detected malicious forks to issue tracker.
"""
import json
import base64
from github import Github

from github import Github, RateLimitExceededException

def handler(request):
"""Responds to any HTTP request.
Args:
request (flask.Request): HTTP request object.
Returns:
The response text or any set of values
try:
_handler(request)

# in the edge case where we've exhausted the rate limit from
# analysis earlier, backoff creating issue alerts until the next hour.
except RateLimitExceededException as err:
return ("", 500)

return ("", 204)

def _handler(request):
"""
An alert may already be generated for a fork, so do a search for existing
issues that may already rise
"""
envelope = request.get_json()
if not envelope:
Expand All @@ -34,5 +36,31 @@ def handler(request):
data = envelope["message"]["data"]
payload = base64.b64decode(data).decode("utf-8")
payload = json.loads(payload)
print(payload)
return ""

# create client for processing
gh = Github(payload["token"])

parent = payload["parent"]
child = payload['name']
repo = gh.get_repo(parent)

# get or create a Fork Sentry label for search and tagging issues

# get all fork sentry issues and detect if this fork already has been picked up previously
issues = gh.get_issues()
for issue in issues:

# if found, create a comment under the issue
pass

# create content for issue
title = f":warning: Fork Sentry: {child} is potentially malicious"
body = """## Suspicious Files & Indicators\n"""

if payload["typosquatting"]:
body += "The fork appears to be __typosquatting__ your repository, creating opportunities to get victims that misspell your repo's name.\n"

for path, indicators in payload["suspicious"].items():
pass

repo.create_issue(title=title, body=body)
2 changes: 1 addition & 1 deletion analyzer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def handler():
# rate limit reached, backoff by pushing to seperate queue
except github.RateLimitExceededException as err:
analysis.backoff_queue(payload)
return ("", 204)
return ("", 500)

# handle all other runtime errors
except Exception as err:
Expand Down
54 changes: 30 additions & 24 deletions analyzer/repo_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ssdeep
import requests
from github import Github
from sqlalchemy import create_engine

from google.cloud import pubsub_v1, storage
from google.cloud import logging as cloudlogging
Expand All @@ -45,8 +46,9 @@
_handler = _client.get_default_handler()
logger.addHandler(_handler)

# url for serverless redis host
redis_host = os.environ.get("REDISHOST")
# database url
db_url = os.getenv("DATABASE_URL")
engine = create_engine(db_url, echo=True, future=True)

# static analysis scanner
scanner = clamd.ClamdUnixSocket()
Expand Down Expand Up @@ -86,16 +88,6 @@ def __init__(
if vt_token:
self.vt_client = vt.Client(vt_token)

# Minhash for binary similarity
self.lsh = MinHashLSH(
threshold=0.75,
num_perm=128,
storage_config={
"type": "redis",
"redis": {"host": redis_host, "port": 6379},
},
)

def _analyze_artifact(self, path) -> t.Optional[str]:
"""
Heuristics to check if a modified file is suspicious and should be enqueued for further analysis.
Expand All @@ -122,7 +114,7 @@ def _analyze_artifact(self, path) -> t.Optional[str]:
iocs += ["binary"]
targets += [path]

# do binary similarity analysis with samples we've seen
# do binary similarity analysis with samples we've seen
results = self._detect_sims(path)
if not results is None:
return results
Expand All @@ -144,16 +136,22 @@ def _analyze_artifact(self, path) -> t.Optional[str]:
# threat detection time
for target in targets:

# trigger ClamAV scan first
with open(target, "rb") as fd:
contents = io.BytesIO(fd.read())
results = scanner.instream(contents)
for path, tags in results.items():
found, name = tags[0], tags[1]
if found == "FOUND":
iocs += [f"clamav:{name}"]
contents = fd.read()
iobuf = io.BytesIO(contents)

# trigger scan with VTotal enterprise
# trigger ClamAV scan first
results = scanner.instream(iobuf)
for path, tags in results.items():
found, name = tags[0], tags[1]
if found == "FOUND":
iocs += [f"clamav:{name}"]

# trigger scan with VTotal if API key is supplied
if self.vt_client:
analysis = self.vt_client.scan_file(contents, wait_for_completion=True)
vtotal_mal = analysis.last_analysis_stats["malicious"]
iocs += [f"virustotal:{vtotal_mal}"]

# diffed file is not suspicious
if len(iocs) == 0:
Expand All @@ -165,14 +163,21 @@ def _analyze_artifact(self, path) -> t.Optional[str]:

return {
"sha256": hashlib.sha256(contents).hexdigest(),
# "ssdeep": ssdeep.hash(contents),
"iocs": iocs,
}

def _detect_sims(self, path: str):
"""
Given a binary, generate a fuzzy hash, and query for matching items against our database.
"""
pass
with open(path, "rb") as fd:
fhash = ssdeep.hash(fd.read())

# recover attributes from fuzzy hash
chunksize, chunk, double_chunk = fhash.split(':')
chunksize = int(chunksize)



def detect_suspicious(self):
"""
Expand All @@ -182,6 +187,7 @@ def detect_suspicious(self):
logger.debug(f"Analyzing {self.repo_name}")
results = {
# metadata needed for further processing
"parent": self.orig_name,
"name": self.repo_name,
"token": self.token,
# actually malicious indicators
Expand Down Expand Up @@ -365,7 +371,7 @@ def _generate_alerts(self, results) -> None:
def backoff_queue(self) -> None:
"""
Re-enqueue a repository to a seperate queue if we hit the rate limit. That one will
push all requests back to this analyzer scheduled in the next hour.
push all requests back to this analyzer scheduled in the next hour. (TODO)
"""
msg = json.dumps(results, indent=2).encode("utf-8")
publisher.publish(topic, alerts)
4 changes: 2 additions & 2 deletions analyzer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ PyGithub
Flask
requests
gunicorn
sqlalchemy
sentry-sdk[flask]
python-dotenv

# analysis
lief
clamd
ssdeep
vt-py
#datasketch
vt-py
2 changes: 1 addition & 1 deletion terraform/pubsub.tf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ resource "google_pubsub_subscription" "fork_analyzer_sub" {

ack_deadline_seconds = 10

// Make push subscription to the CLoud Run listener endpoint
// Make push subscription to the Cloud Run listener endpoint
push_config {
push_endpoint = google_cloud_run_service.analyzer.status[0].url

Expand Down

0 comments on commit d8c7f47

Please sign in to comment.