Skip to content

Commit

Permalink
♻️ refactor: Revised throttling parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
DevaOnBreaches committed Jan 28, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 92adf98 commit e7c09ac
Showing 2 changed files with 122 additions and 39 deletions.
101 changes: 73 additions & 28 deletions cloudflare.py
Original file line number Diff line number Diff line change
@@ -49,66 +49,86 @@ def update_cf_trans(ip_address):

def block_hour(ip_address):
"""
Block an IP address for one hour using the Cloudflare API.
This function sends a POST request to the Cloudflare API to block the given IP address.
It uses predefined authentication details (AUTH_EMAIL and AUTH_KEY) and sets the mode to
'challenge', targeting the specified IP address. The function logs the action by
calling `update_cf_trans` with the response content if the request is successful.
Block an IP address for one hour using the Cloudflare API, unless the ISP is "Cloudflare."
Parameters:
ip_address (str): The IP address to be blocked.
Returns:
None: The function doesn't return anything but performs an API request to block an IP
address and logs the transaction.
None: The function performs an API request to block an IP address if necessary.
"""
isp_info = get_isp_from_ip(ip_address)
if isp_info and "Cloudflare" in isp_info:

return

url = "https://api.cloudflare.com/client/v4/user/firewall/access_rules/rules"
headers = {
"X-Auth-Email": AUTH_EMAIL,
"X-Auth-Key": AUTH_KEY,
"Content-Type": "application/json",
}
payload = (
'{"mode":"challenge","configuration":{"target":"ip","value":"'
+ ip_address
+ '"},"notes":"Hour block enforced"}'
)
response = requests.post(url, headers=headers, data=payload, timeout=20)
if response.status_code in [200, 201]:
update_cf_trans(response.content)
payload = {
"mode": "challenge",
"configuration": {"target": "ip", "value": ip_address},
"notes": "Hour block enforced",
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=20)

if response.status_code in [200, 201]:
update_cf_trans(response.content)
else:
print(
f"Failed to block IP {ip_address}. Status code: {response.status_code}"
)
except Exception as e:
print(f"Error while blocking IP {ip_address}: {e}")


def block_day(ip_address):
"""
Block an IP address for one day using the Cloudflare API.
Block an IP address for one day using the Cloudflare API, unless the ISP is "Cloudflare."
This function sends a POST request to the Cloudflare API to block the given IP address.
It uses predefined authentication details (AUTH_EMAIL and AUTH_KEY) and sets the mode
to 'challenge', targeting the specified IP address. The function logs the action by
to 'block', targeting the specified IP address. The function logs the action by
calling `update_cf_trans` with the response content if the request is successful.
Parameters:
ip_address (str): The IP address to be blocked.
Returns:
None: The function doesn't return anything but performs an API request to block an
IP address and logs the transaction.
None: The function performs an API request to block an IP address if necessary.
"""

isp_info = get_isp_from_ip(ip_address)
if isp_info and "Cloudflare" in isp_info:
print(f"IP {ip_address} belongs to Cloudflare. Skipping block.")
return

url = "https://api.cloudflare.com/client/v4/user/firewall/access_rules/rules"
headers = {
"X-Auth-Email": AUTH_EMAIL,
"X-Auth-Key": AUTH_KEY,
"Content-Type": "application/json",
}
payload = (
'{"mode":"block","configuration":{"target":"ip","value":"'
+ ip_address
+ '"},"notes":"Day block enforced"}'
)
response = requests.post(url, headers=headers, data=payload, timeout=20)
if response.status_code in [200, 201]:
update_cf_trans(response.content)
payload = {
"mode": "block",
"configuration": {"target": "ip", "value": ip_address},
"notes": "Day block enforced",
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=20)

if response.status_code in [200, 201]:
update_cf_trans(response.content)
else:
print(
f"Failed to block IP {ip_address}. Status code: {response.status_code}"
)
except Exception as e:
print(f"Error while blocking IP {ip_address}: {e}")


def unblock():
@@ -154,3 +174,28 @@ def unblock():
entity.update({"release_timestamp": datetime.datetime.utcnow().isoformat()})
datastore_client.put(entity)
return True


def get_isp_from_ip(ip_address):
"""
Fetch the ISP for a given IP address using the ipinfo.io API or a similar service.
Parameters:
ip_address (str): The IP address to check.
Returns:
str: The name of the ISP for the given IP address.
"""
try:
url = f"https://ipinfo.io/{ip_address}/org"
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.text.strip()
else:
print(
f"Failed to fetch ISP for IP {ip_address}. Status code: {response.status_code}"
)
return None
except Exception as e:
print(f"Error fetching ISP for IP {ip_address}: {e}")
return None
60 changes: 49 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@
import socket
import threading
import time

# import hashlib
from collections import defaultdict
from datetime import timedelta
from operator import itemgetter
@@ -41,6 +43,7 @@
from itsdangerous import SignatureExpired, URLSafeTimedSerializer
from user_agents import parse
from validate_email import validate_email
from threading import Lock

# Local Application/Library Specific Imports
from cloudflare import block_day, block_hour, unblock
@@ -63,7 +66,6 @@
XMLAPI_KEY = os.environ["XMLAPI_KEY"]
FERNET_KEY = os.environ.get("ENCRYPTION_KEY")
CIPHER_SUITE = Fernet(FERNET_KEY)
# openai.api_key = os.environ.get("OPENAI_API_KEY")

# Initialize the Flask app
XON = Flask(__name__)
@@ -130,6 +132,29 @@ def add_cache_control(response):
return response


"""
@XON.before_request
def globe_ip():
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr).split(',')[0]
try:
response = requests.post('https://globe.xposedornot.com/inflow', json={'ip': client_ip})
if response.status_code == 200:
print(f"IP {client_ip} successfully posted.")
else:
print(f"Failed to post IP {client_ip}. Status code: {response.status_code}")
except Exception as e:
print(f"Error posting IP {client_ip}: {e}")
def is_valid_ipv4(ip):
try:
ipaddress.IPv4Address(ip)
return True
except ipaddress.AddressValueError:
return False
"""

# Initialize and configure rate limiting
LIMITER = Limiter(
app=XON,
@@ -148,6 +173,7 @@ def not_found(error):
@XON.errorhandler(429)
def ratelimit_handler(error):
"""Returns response for 429"""
# Extract rate limit details
rate_limit_info = re.search(r"(\d+)\sper\s(\d+\s\w+)", str(error.description))
if rate_limit_info:
count, period = rate_limit_info.groups()
@@ -161,12 +187,20 @@ def ratelimit_handler(error):
else:
retry_after = "unknown"

if "hour" in error.description:
# pass
block_hour(ip_address=request.headers.get("X-Forwarded-For"))
elif "day" in error.description:
block_day(ip_address=request.headers.get("X-Forwarded-For"))
forwarded_for = request.headers.get("X-Forwarded-For", "")
if forwarded_for:
ip_addresses = [ip.strip() for ip in forwarded_for.split(",")]
first_ip = ip_addresses[0] if ip_addresses else None
else:
first_ip = None

if "hour" in error.description and first_ip:
block_hour(ip_address=first_ip)
pass
elif "day" in error.description and first_ip:
block_day(ip_address=first_ip)

# Prepare and return the response
response = make_response(
jsonify(
error=f"Ratelimit exceeded {error.description}",
@@ -1821,7 +1855,8 @@ def get_detailed_metrics():


@XON.route("/v2/breach-analytics", methods=["GET"])
@LIMITER.limit("500 per day;100 per hour;2 per second")
@LIMITER.limit("5 per hour; 100 per day")
# @LIMITER.limit("100 per day;3 per hour;2 per second")
def search_data_breaches_v2():
"""Returns AI summary and details of data breaches for a given email"""
MAX_EMAIL_LENGTH = 254
@@ -1935,7 +1970,8 @@ def search_data_breaches_v2():


@XON.route("/v1/breach-analytics", methods=["GET"])
@LIMITER.limit("500 per day;100 per hour;2 per second")
@LIMITER.limit("10 per hour; 100 per day")
# @LIMITER.limit("500 per day;100 per hour;2 per second")
def search_data_breaches():
"""Returns summary and details of data breaches for a given email"""
verification_token = request.args.get("token", default=None)
@@ -2136,7 +2172,8 @@ def get_breach_analytics(user_email):


@XON.route("/v1/check-email/<email>", methods=["GET"])
@LIMITER.limit("50000 per day;10000 per hour;100 per second")
# @LIMITER.limit("100 per day;50 per hour;2 per second")
@LIMITER.limit("2 per second; 50 per hour; 100 per day")
def search_email(email):
"""Returns exposed breaches for a given email"""
try:
@@ -4389,7 +4426,8 @@ def get_webhook_config():


@XON.route("/v1/breaches", methods=["GET"])
@LIMITER.limit("100 per day;50 per hour;2 per second")
# @LIMITER.limit("100 per day;50 per hour;2 per second")
@LIMITER.limit("2 per second; 50 per hour; 1000 per day")
def get_xposed_breaches():
"""
Fetches and returns the details of data breaches for a specified domain,
@@ -4598,5 +4636,5 @@ def rss_feed():


if __name__ == "__main__":
XON.run(host="0.0.0.0", port=1806)
print("Connected and ready to serve the world !")
XON.run(host="0.0.0.0", port=1806)

0 comments on commit e7c09ac

Please sign in to comment.