Skip to content

ratelimit-io: Flexible bidirectional rate-limiting library with redis backend

License

Notifications You must be signed in to change notification settings

bagowix/ratelimit-io

Repository files navigation

RatelimitIO

A Python library for rate limiting, designed to handle both incoming and outgoing requests efficiently. Supports both synchronous and asynchronous paradigms. Powered by Redis, this library provides decorators, context managers, and easy integration with APIs to manage request limits with precision.

Project Information

Tests & Lint Test Coverage PyPI - Python Version License PyPI Downloads


Features

  • Incoming and Outgoing Support: Effectively handles limits for both inbound (e.g., API requests) and outbound (e.g., client requests to external APIs) traffic.
  • Synchronous and Asynchronous Support: Works seamlessly in both paradigms.
  • Redis Backend: Leverages Redis for fast and scalable rate limiting.
  • Flexible API:
    • Use as a decorator for methods or functions.
    • Use as a context manager for precise control over request flows.
    • Integrate directly into API clients, middlewares, or custom request handlers.
  • Customizable Rate Limits: Specify limits per key, time period, and requests.
  • Robust Lua Script: Ensures efficient and atomic rate limiting logic for high-concurrency use cases.
  • Automatic Error Handling: Easily manage 429 Too Many Requests errors in popular frameworks like Flask, Django, and FastAPI.
  • Support for Incoming Request Behavior: Use the is_incoming flag to distinguish between incoming requests (throwing errors immediately) and outgoing requests (waiting for available slots).
  • Ease of Use: Simple and intuitive integration into Python applications.

Installation

Install via pip:

pip install ratelimit-io

Quick Start

Using as a Synchronous Decorator

from ratelimit_io import RatelimitIO, LimitSpec
from redis import Redis

redis_client = Redis(host="localhost", port=6379)
limiter = RatelimitIO(
    backend=redis_client,
    default_limit=LimitSpec(requests=10, seconds=60),
    default_key="global_limit",
)


@limiter
def fetch_data():
  return "Request succeeded!"


# Use the decorated function
fetch_data()

Using as a Asynchronous Decorator

from ratelimit_io import RatelimitIO, LimitSpec
from redis.asyncio import Redis as AsyncRedis

async_redis_client = AsyncRedis(host="localhost", port=6379)
async_limiter = RatelimitIO(
    backend=async_redis_client,
    default_limit=LimitSpec(requests=10, seconds=60),
    default_key="global_limit",
)


@async_limiter
async def fetch_data():
  return "Request succeeded!"


# Use the decorated function
await fetch_data()

Incoming vs. Outgoing Request Handling (is_incoming)

  • The default_key or a dynamically generated key (e.g., based on unique_key or kwargs["ip"]) determines the rate limit bucket.
  • When is_incoming=True, the rate limiter will immediately raise a RatelimitIOError when limits are exceeded.
  • When is_incoming=False (default), the rate limiter will wait until a slot becomes available.
# Incoming request example (throws an error on limit breach)
limiter = RatelimitIO(backend=redis_client, is_incoming=True)

@limiter(LimitSpec(requests=5, seconds=10))
def fetch_data():
    return "Request succeeded!"

# Outgoing request example (waits if limits are exceeded)
outgoing_limiter = RatelimitIO(backend=redis_client)

@outgoing_limiter(LimitSpec(requests=5, seconds=10))
def fetch_data_outgoing():
    return "Request succeeded!"

Using as a Synchronous Context Manager

from ratelimit_io import RatelimitIO, LimitSpec
from redis import Redis

redis_client = Redis(host="localhost", port=6379)
limiter = RatelimitIO(backend=redis_client)

with limiter:
    limiter.wait("user:456", LimitSpec(requests=5, seconds=10))

Using as a Asynchronous Context Manager

from ratelimit_io import RatelimitIO, LimitSpec
from redis.asyncio import Redis as AsyncRedis

async_redis_client = AsyncRedis(host="localhost", port=6379)
async_limiter = RatelimitIO(backend=async_redis_client)

async with async_limiter:
    await async_limiter.a_wait("user:456", LimitSpec(requests=5, seconds=10))

Dynamic is_incoming Detection

Automatically determines request type (incoming or outgoing) based on the context. Incoming requests (default) raise RatelimitIOError if limits are exceeded, while outgoing requests wait for a slot.

Examples:

# Default behavior (is_incoming=True)
@limiter
def incoming_request():
    return "Handled incoming request!"

for _ in range(5):
    incoming_request()

# Raises RatelimitIOError after 5 requests
incoming_request()

# -------------------------------------------------------------------------------------------------------

# Outgoing request handling
limiter.is_incoming = False

for _ in range(5):
    limiter.wait("outgoing_request", LimitSpec(requests=5, seconds=1))

# Waits for a slot to become available
limiter.wait("outgoing_request", LimitSpec(requests=5, seconds=1))

Error Handling for 429 Responses

FastAPI Example

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from ratelimit_io import RatelimitIO, RatelimitIOError, LimitSpec
from redis.asyncio import Redis as AsyncRedis

app = FastAPI()
redis_client = AsyncRedis(host="localhost", port=6379)
limiter = RatelimitIO(
  backend=redis_client,
  default_limit=LimitSpec(requests=5, seconds=1),
  is_incoming=True
)


@app.middleware("http")
async def ratelimit_middleware(request: Request, call_next):
  try:
    response = await call_next(request)
    return response
  except RatelimitIOError as exc:
    return JSONResponse(
      status_code=exc.status_code,
      content={"detail": exc.detail}
    )


@app.get("/fetch")
@limit
async def fetch_data():
  return {"message": "Request succeeded!"}

Django Middleware Example

from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from ratelimit_io import RatelimitIO, LimitSpec, RatelimitIOError
from redis import Redis
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

redis = Redis("localhost", port=6379)
limit = RatelimitIO(
  backend=redis,
  default_limit=LimitSpec(requests=5, seconds=1),
  is_incoming=True,
)


class RatelimitMiddleware(MiddlewareMixin):
  def process_exception(self, request, exception):
    if isinstance(exception, RatelimitIOError):
      return JsonResponse(
        {"detail": exception.detail},
        status=exception.status_code,
      )
    return None


class Foo(APIView):
  permission_classes = ()

  @limit
  def get(self, request, *args, **kwargs):
    return Response(data={"message": "ok"}, status=status.HTTP_200_OK)

Flask Example

from flask import Flask, jsonify
from ratelimit_io import RatelimitIO, RatelimitIOError, LimitSpec
from redis import Redis

app = Flask(__name__)
redis_client = Redis(host="localhost", port=6379)
limiter = RatelimitIO(backend=redis_client, is_incoming=True)

@app.errorhandler(RatelimitIOError)
def handle_ratelimit_error(error):
    return jsonify({"error": error.detail}), error.status_code

@app.route("/fetch")
@limiter
def fetch_data():
    return jsonify({"message": "Request succeeded!"})

Key Handling Priority

The rate limiter determines the key for requests in the following priority:

  1. provided_key (directly passed to wait, a_wait, or decorator).
  2. unique_key (e.g., from a decorator argument).
  3. default_key (set during RatelimitIO initialization).
  4. kwargs.get("ip") (from additional arguments passed to the decorated function).
  5. "unknown_key" (fallback if no other key is found).

This flexible approach ensures minimal configuration while maintaining granular control when needed.

License

MIT

Contribution

Contributions are welcome! Follow the Contribution Guide for details.