A Python library for rate limiting, designed to handle both incoming and outgoing requests efficiently. Supports both synchronous and asynchronous paradigms. Powered by Redis, this library provides decorators, context managers, and easy integration with APIs to manage request limits with precision.
- Incoming and Outgoing Support: Effectively handles limits for both inbound (e.g., API requests) and outbound (e.g., client requests to external APIs) traffic.
- Synchronous and Asynchronous Support: Works seamlessly in both paradigms.
- Redis Backend: Leverages Redis for fast and scalable rate limiting.
- Flexible API:
- Use as a decorator for methods or functions.
- Use as a context manager for precise control over request flows.
- Integrate directly into API clients, middlewares, or custom request handlers.
- Customizable Rate Limits: Specify limits per key, time period, and requests.
- Robust Lua Script: Ensures efficient and atomic rate limiting logic for high-concurrency use cases.
- Automatic Error Handling: Easily manage 429 Too Many Requests errors in popular frameworks like Flask, Django, and FastAPI.
- Support for Incoming Request Behavior: Use the
is_incoming
flag to distinguish between incoming requests (throwing errors immediately) and outgoing requests (waiting for available slots). - Ease of Use: Simple and intuitive integration into Python applications.
Install via pip:
pip install ratelimit-io
from ratelimit_io import RatelimitIO, LimitSpec
from redis import Redis
redis_client = Redis(host="localhost", port=6379)
limiter = RatelimitIO(
backend=redis_client,
default_limit=LimitSpec(requests=10, seconds=60),
default_key="global_limit",
)
@limiter
def fetch_data():
return "Request succeeded!"
# Use the decorated function
fetch_data()
from ratelimit_io import RatelimitIO, LimitSpec
from redis.asyncio import Redis as AsyncRedis
async_redis_client = AsyncRedis(host="localhost", port=6379)
async_limiter = RatelimitIO(
backend=async_redis_client,
default_limit=LimitSpec(requests=10, seconds=60),
default_key="global_limit",
)
@async_limiter
async def fetch_data():
return "Request succeeded!"
# Use the decorated function
await fetch_data()
- The
default_key
or a dynamically generated key (e.g., based onunique_key
orkwargs["ip"]
) determines the rate limit bucket. - When
is_incoming=True
, the rate limiter will immediately raise aRatelimitIOError
when limits are exceeded. - When
is_incoming=False
(default), the rate limiter will wait until a slot becomes available.
# Incoming request example (throws an error on limit breach)
limiter = RatelimitIO(backend=redis_client, is_incoming=True)
@limiter(LimitSpec(requests=5, seconds=10))
def fetch_data():
return "Request succeeded!"
# Outgoing request example (waits if limits are exceeded)
outgoing_limiter = RatelimitIO(backend=redis_client)
@outgoing_limiter(LimitSpec(requests=5, seconds=10))
def fetch_data_outgoing():
return "Request succeeded!"
from ratelimit_io import RatelimitIO, LimitSpec
from redis import Redis
redis_client = Redis(host="localhost", port=6379)
limiter = RatelimitIO(backend=redis_client)
with limiter:
limiter.wait("user:456", LimitSpec(requests=5, seconds=10))
from ratelimit_io import RatelimitIO, LimitSpec
from redis.asyncio import Redis as AsyncRedis
async_redis_client = AsyncRedis(host="localhost", port=6379)
async_limiter = RatelimitIO(backend=async_redis_client)
async with async_limiter:
await async_limiter.a_wait("user:456", LimitSpec(requests=5, seconds=10))
Automatically determines request type (incoming or outgoing) based on the context. Incoming requests (default) raise RatelimitIOError
if limits are exceeded, while outgoing requests wait for a slot.
Examples:
# Default behavior (is_incoming=True)
@limiter
def incoming_request():
return "Handled incoming request!"
for _ in range(5):
incoming_request()
# Raises RatelimitIOError after 5 requests
incoming_request()
# -------------------------------------------------------------------------------------------------------
# Outgoing request handling
limiter.is_incoming = False
for _ in range(5):
limiter.wait("outgoing_request", LimitSpec(requests=5, seconds=1))
# Waits for a slot to become available
limiter.wait("outgoing_request", LimitSpec(requests=5, seconds=1))
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from ratelimit_io import RatelimitIO, RatelimitIOError, LimitSpec
from redis.asyncio import Redis as AsyncRedis
app = FastAPI()
redis_client = AsyncRedis(host="localhost", port=6379)
limiter = RatelimitIO(
backend=redis_client,
default_limit=LimitSpec(requests=5, seconds=1),
is_incoming=True
)
@app.middleware("http")
async def ratelimit_middleware(request: Request, call_next):
try:
response = await call_next(request)
return response
except RatelimitIOError as exc:
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)
@app.get("/fetch")
@limit
async def fetch_data():
return {"message": "Request succeeded!"}
from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from ratelimit_io import RatelimitIO, LimitSpec, RatelimitIOError
from redis import Redis
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
redis = Redis("localhost", port=6379)
limit = RatelimitIO(
backend=redis,
default_limit=LimitSpec(requests=5, seconds=1),
is_incoming=True,
)
class RatelimitMiddleware(MiddlewareMixin):
def process_exception(self, request, exception):
if isinstance(exception, RatelimitIOError):
return JsonResponse(
{"detail": exception.detail},
status=exception.status_code,
)
return None
class Foo(APIView):
permission_classes = ()
@limit
def get(self, request, *args, **kwargs):
return Response(data={"message": "ok"}, status=status.HTTP_200_OK)
from flask import Flask, jsonify
from ratelimit_io import RatelimitIO, RatelimitIOError, LimitSpec
from redis import Redis
app = Flask(__name__)
redis_client = Redis(host="localhost", port=6379)
limiter = RatelimitIO(backend=redis_client, is_incoming=True)
@app.errorhandler(RatelimitIOError)
def handle_ratelimit_error(error):
return jsonify({"error": error.detail}), error.status_code
@app.route("/fetch")
@limiter
def fetch_data():
return jsonify({"message": "Request succeeded!"})
The rate limiter determines the key for requests in the following priority:
provided_key
(directly passed towait
,a_wait
, or decorator).unique_key
(e.g., from a decorator argument).default_key
(set duringRatelimitIO
initialization).kwargs.get("ip")
(from additional arguments passed to the decorated function)."unknown_key"
(fallback if no other key is found).
This flexible approach ensures minimal configuration while maintaining granular control when needed.
Contributions are welcome! Follow the Contribution Guide for details.