Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

converted IP&URI plugin to new base class #1288

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 91 additions & 78 deletions src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,118 @@
from __future__ import annotations

import logging
from contextlib import suppress
from itertools import product
from pathlib import Path
from re import search
from typing import TYPE_CHECKING, List, Optional

import geoip2.database
from common_analysis_ip_and_uri_finder import CommonAnalysisIPAndURIFinder
from geoip2.errors import AddressNotFoundError
from maxminddb.errors import InvalidDatabaseError
from pydantic import BaseModel

from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin

from analysis.PluginBase import AnalysisBasePlugin
if TYPE_CHECKING:
from io import FileIO

GEOIP_DATABASE_PATH = Path(__file__).parent.parent / 'bin/GeoLite2-City/GeoLite2-City.mmdb'

IP_V4_BLACKLIST = [r'127.0.[0-9]+.1', r'255.[0-9]+.[0-9]+.[0-9]+'] # localhost # subnet masks
IP_V6_BLACKLIST = [r'^[0-9A-Za-z]::$', r'^::[0-9A-Za-z]$', r'^[0-9A-Za-z]::[0-9A-Za-z]$', r'^::$'] # trivial addresses


class AnalysisPlugin(AnalysisBasePlugin):
NAME = 'ip_and_uri_finder'
DEPENDENCIES = [] # noqa: RUF012
MIME_WHITELIST = [ # noqa: RUF012
'text/plain',
'application/octet-stream',
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
'application/x-dosexec',
]
DESCRIPTION = 'Search file for IP addresses and URIs based on regular expressions.'
VERSION = '0.4.2'
FILE = __file__

def additional_setup(self):
class IpAddress(BaseModel):
address: str
location: Optional[Location]


class Location(BaseModel):
longitude: float
latitude: float


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
ips_v4: List[IpAddress]
ips_v6: List[IpAddress]
uris: List[str]

def __init__(self):
self.ip_and_uri_finder = CommonAnalysisIPAndURIFinder()
try:
self.reader = geoip2.database.Reader(str(GEOIP_DATABASE_PATH))
except FileNotFoundError:
logging.error('could not load GeoIP database')
self.reader = None

def process_object(self, file_object):
result = self.ip_and_uri_finder.analyze_file(file_object.file_path, separate_ipv6=True)

for key in ['uris', 'ips_v4', 'ips_v6']:
result[key] = self._remove_duplicates(result[key])
result['ips_v4'] = self._remove_blacklisted(result['ips_v4'], IP_V4_BLACKLIST)
result['ips_v6'] = self._remove_blacklisted(result['ips_v6'], IP_V6_BLACKLIST)

file_object.processed_analysis[self.NAME] = self._get_augmented_result(self.add_geo_uri_to_ip(result))

return file_object

def _get_augmented_result(self, result):
result['summary'] = self._get_summary(result)
result['system_version'] = self.ip_and_uri_finder.system_version
return result

def add_geo_uri_to_ip(self, result):
for key in ['ips_v4', 'ips_v6']:
result[key] = self.link_ips_with_geo_location(result[key])
return result

def find_geo_location(self, ip_address):
response = self.reader.city(ip_address)
return f'{response.location.latitude}, {response.location.longitude}'

def link_ips_with_geo_location(self, ip_addresses):
linked_ip_geo_list = []
for ip in ip_addresses:
try:
ip_tuple = ip, self.find_geo_location(ip)
except (
AttributeError,
AddressNotFoundError,
FileNotFoundError,
ValueError,
InvalidDatabaseError,
) as exception:
logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True)
ip_tuple = ip, ''
linked_ip_geo_list.append(ip_tuple)
return linked_ip_geo_list

@staticmethod
def _get_summary(results):
summary = []
summary.extend(results['uris'])
for key in ['ips_v4', 'ips_v6']:
for ip, *_ in results[key]: # IP results come in tuples (ip, latitude, longitude)
summary.append(ip)
super().__init__(
metadata=self.MetaData(
name='ip_and_uri_finder',
description='Search file for IP addresses and URIs based on regular expressions.',
version='1.0.0',
Schema=self.Schema,
mime_whitelist=[
'text/plain',
'application/octet-stream',
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
'application/x-dosexec',
],
system_version=self.ip_and_uri_finder.system_version,
),
)

def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema:
del virtual_file_path, analyses
ip_data = self.ip_and_uri_finder.analyze_file(file_handle.name, separate_ipv6=True)
ip_v4_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v4']), IP_V4_BLACKLIST)
ip_v6_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v6']), IP_V6_BLACKLIST)
uris = _remove_duplicates(ip_data['uris'])
return self.Schema(
ips_v4=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v4_results],
ips_v6=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v6_results],
uris=uris,
)

def find_geo_location(self, ip_address: str) -> Location | None:
if self.reader is None:
return None
try:
response = self.reader.city(ip_address)
return Location(
longitude=float(response.location.longitude),
latitude=float(response.location.latitude),
)
except (
AttributeError,
AddressNotFoundError,
FileNotFoundError,
ValueError,
InvalidDatabaseError,
) as exception:
logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True)
return None

def summarize(self, result: Schema) -> list:
summary = [*result.uris]
for ip_list in [result.ips_v4, result.ips_v6]:
for ip in ip_list:
summary.append(ip.address)
return summary

@staticmethod
def _remove_duplicates(input_list):
return list(set(input_list))

@staticmethod
def _remove_blacklisted(ip_list, blacklist):
for ip, blacklist_entry in product(ip_list, blacklist):
if search(blacklist_entry, ip):
with suppress(ValueError):
ip_list.remove(ip)
return ip_list

def _remove_duplicates(input_list: list[str]) -> list[str]:
return list(set(input_list))


def _remove_blacklisted(ip_list: list[str], blacklist: list[str]) -> list[str]:
for ip, blacklist_entry in product(ip_list, blacklist):
if search(blacklist_entry, ip):
with suppress(ValueError):
ip_list.remove(ip)
return ip_list
129 changes: 45 additions & 84 deletions src/plugins/analysis/ip_and_uri_finder/test/test_ip_and_uri_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import tempfile
from collections import namedtuple
from pathlib import Path

import pytest
from geoip2.errors import AddressNotFoundError

from objects.file import FileObject

from ..code.ip_and_uri_finder import AnalysisPlugin
from ..code.ip_and_uri_finder import AnalysisPlugin, _remove_blacklisted

MockResponse = namedtuple('MockResponse', ['location'])
MockLocation = namedtuple('MockLocation', ['latitude', 'longitude'])
Expand Down Expand Up @@ -51,97 +50,59 @@ def ip_and_uri_finder_plugin(analysis_plugin):
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginIpAndUriFinder:
def test_process_object_ips(self, ip_and_uri_finder_plugin):
with tempfile.NamedTemporaryFile() as tmp:
with open(tmp.name, 'w') as fp: # noqa: PTH123
fp.write(
'1.2.3.4 abc 1.1.1.1234 abc 3. 3. 3. 3 abc 1255.255.255.255 1234:1234:abcd:abcd:1234:1234:abcd:abc'
'd xyz 2001:db8::8d3:: xyz 2001:db8:0:0:8d3::'
)
tmp_fo = FileObject(file_path=tmp.name)
processed_object = ip_and_uri_finder_plugin.process_object(tmp_fo)
results = processed_object.processed_analysis[ip_and_uri_finder_plugin.NAME]
assert results['uris'] == []
assert {
('1.2.3.4', '47.913, -122.3042'),
('1.1.1.123', '-37.7, 145.1833'),
} == set(results['ips_v4'])
assert len(
[
('1.2.3.4', '47.913, -122.3042'),
('1.1.1.123', '-37.7, 145.1833'),
]
) == len(results['ips_v4'])
assert {
('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1'),
('2001:db8:0:0:8d3::', '3.1, 3.1'),
} == set(results['ips_v6'])
assert len(
[
('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1'),
('2001:db8:0:0:8d3::', '3.1, 3.1'),
]
) == len(results['ips_v6'])
with tempfile.NamedTemporaryFile() as tmp, Path(tmp.name).open('w') as fp:
fp.write(
'1.2.3.4 abc 1.1.1.1234 abc 3. 3. 3. 3 abc 1255.255.255.255 1234:1234:abcd:abcd:1234:1234:abcd:abc'
'd xyz 2001:db8::8d3:: xyz 2001:db8:0:0:8d3::'
)
fp.seek(0)
results = ip_and_uri_finder_plugin.analyze(fp, {}, {})
assert results.uris == []
assert len(results.ips_v4) == 2
ip_v4_addresses = {ipa.address: f'{ipa.location.latitude}, {ipa.location.longitude}' for ipa in results.ips_v4}
assert ip_v4_addresses == {
'1.2.3.4': '47.913, -122.3042',
'1.1.1.123': '-37.7, 145.1833',
}
assert len(results.ips_v6) == 2
ip_v6_addresses = {ipa.address: f'{ipa.location.latitude}, {ipa.location.longitude}' for ipa in results.ips_v6}
assert ip_v6_addresses == {
'1234:1234:abcd:abcd:1234:1234:abcd:abcd': '2.1, 2.1',
'2001:db8:0:0:8d3::': '3.1, 3.1',
}

assert set(ip_and_uri_finder_plugin.summarize(results)) == {*ip_v4_addresses, *ip_v6_addresses}

def test_process_object_uris(self, ip_and_uri_finder_plugin):
with tempfile.NamedTemporaryFile() as tmp:
with open(tmp.name, 'w') as fp: # noqa: PTH123
fp.write(
'http://www.google.de https://www.test.de/test/?x=y&1=2 ftp://ftp.is.co.za/rfc/rfc1808.txt '
'telnet://192.0.2.16:80/'
)
tmp_fo = FileObject(file_path=tmp.name)
processed_object = ip_and_uri_finder_plugin.process_object(tmp_fo)
results = processed_object.processed_analysis[ip_and_uri_finder_plugin.NAME]
assert {
with tempfile.NamedTemporaryFile() as tmp, Path(tmp.name).open('w') as fp:
fp.write(
'http://www.google.de https://www.test.de/test/?x=y&1=2 ftp://ftp.is.co.za/rfc/rfc1808.txt '
'telnet://192.0.2.16:80/'
)
fp.seek(0)
results = ip_and_uri_finder_plugin.analyze(fp, {}, {})
assert set(results.uris) == {
'http://www.google.de',
'https://www.test.de/test/',
'ftp://ftp.is.co.za/rfc/rfc1808.txt',
'telnet://192.0.2.16:80/',
} == set(results['uris'])
assert len(
[
'http://www.google.de',
'https://www.test.de/test/',
'ftp://ftp.is.co.za/rfc/rfc1808.txt',
'telnet://192.0.2.16:80/',
]
) == len(results['uris'])

def test_add_geo_uri_to_ip(self, ip_and_uri_finder_plugin):
test_data = {
'ips_v4': ['128.101.101.101', '255.255.255.255'],
'ips_v6': ['1234:1234:abcd:abcd:1234:1234:abcd:abcd'],
'uris': 'http://www.google.de',
}
results = ip_and_uri_finder_plugin.add_geo_uri_to_ip(test_data)
assert results['uris'] == 'http://www.google.de'
assert [('128.101.101.101', '44.9759, -93.2166'), ('255.255.255.255', '0.0, 0.0')] == results['ips_v4']
assert [('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1')] == results['ips_v6']
assert len(results.uris) == 4

assert set(ip_and_uri_finder_plugin.summarize(results)) == set(results.uris).union({'192.0.2.16'})

def test_find_geo_location(self, ip_and_uri_finder_plugin):
assert ip_and_uri_finder_plugin.find_geo_location('128.101.101.101') == '44.9759, -93.2166'
assert ip_and_uri_finder_plugin.find_geo_location('127.101.101.101') == '4.1, 4.1'

with pytest.raises(AddressNotFoundError):
ip_and_uri_finder_plugin.find_geo_location('1.1.2.345')
with pytest.raises(ValueError): # noqa: PT011
ip_and_uri_finder_plugin.find_geo_location('aaa')

def test_link_ips_with_geo_location(self, ip_and_uri_finder_plugin):
ip_addresses = ['128.101.101.101', '255.255.255.255']
expected_results = [('128.101.101.101', '44.9759, -93.2166'), ('255.255.255.255', '0.0, 0.0')]
assert ip_and_uri_finder_plugin.link_ips_with_geo_location(ip_addresses) == expected_results

def test_get_summary(self):
results = {
'uris': ['http://www.google.de'],
'ips_v4': [('128.101.101.101', '44.9759, -93.2166')],
'ips_v6': [('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1')],
}
expected_results = ['http://www.google.de', '128.101.101.101', '1234:1234:abcd:abcd:1234:1234:abcd:abcd']
assert AnalysisPlugin._get_summary(results), expected_results
location = ip_and_uri_finder_plugin.find_geo_location('128.101.101.101')
assert location.latitude == 44.9759
assert location.longitude == -93.2166
location = ip_and_uri_finder_plugin.find_geo_location('127.101.101.101')
assert location.latitude == 4.1
assert location.longitude == 4.1

assert ip_and_uri_finder_plugin.find_geo_location('1.1.2.345') is None
assert ip_and_uri_finder_plugin.find_geo_location('aaa') is None

def test_remove_blacklisted(self, ip_and_uri_finder_plugin):
input_list = ['1.1.1.1', 'blah', '0.0.0.0']
blacklist = [r'[0-9].{4}', r'x.y']
assert ip_and_uri_finder_plugin._remove_blacklisted(input_list, blacklist) == ['blah']
assert _remove_blacklisted(input_list, blacklist) == ['blah']
14 changes: 4 additions & 10 deletions src/plugins/analysis/ip_and_uri_finder/view/ip_and_uri_finder.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,23 @@
<td>
{% if key == "ips_v4" %}IPv4
{% elif key == "ips_v6" %}IPv6
{% elif key == "ips" %}IP
{% else %}URI{% endif %}
</td>
<td class="p-0">
<ul class="list-group p-0 mb-0" style="width: 100%;">
{% for item in value %}
{% if key == "ips_v6" or key == "ips_v4" %}
<li class="list-group-item d-flex justify-content-between align-items-center rounded-0">
{{ item[0] }}
{% if item[1] %}
<a href="https://www.google.de/maps/place/{{ item[1] }}">
{{ item.address }}
{% if item.location %}
<a href="https://www.google.de/maps/place/{{ item.location.latitude }},{{ item.location.longitude }}">
<i class="fas fa-map-marker-alt"></i>
</a>
{% endif %}
</li>
{% else %}
<li class="list-group-item rounded-0">
{% if key != 'ips' %}
<a href="{{ item }}">{{ item }}</a>
{% else %}
{{ item }}
{% endif %}
<a href="{{ item }}">{{ item }}</a>
</li>
{% endif %}
{% endfor %}
Expand All @@ -40,4 +35,3 @@
{% endfor %}

{% endblock %}

Loading