Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Patch] - Requests sync to support instance scaling #312

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import helmet from 'helmet'
import cors from 'cors'
import apiRouter from './routes'
import { EigenExplorerApiError, handleAndReturnErrorResponse } from './schema/errors'
import { startUserRequestsSync } from './utils/userRequestsSync'

const PORT = process.env.PORT ? Number.parseInt(process.env.PORT) : 3002

Expand Down Expand Up @@ -51,6 +50,4 @@ app.use((err: Error, req: Request, res: Response) => {
// Start the server
app.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`)

startUserRequestsSync()
})
37 changes: 24 additions & 13 deletions packages/api/src/utils/authMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dotenv/config'
import type { NextFunction, Request, Response } from 'express'
import { authStore, requestsStore } from './authCache'
import rateLimit from 'express-rate-limit'
import { triggerUserRequestsSync } from './requestsUpdateManager'

// --- Types ---

Expand All @@ -24,8 +25,7 @@ interface Plan {
const PLANS: Record<number, Plan> = {
0: {
name: 'Unauthenticated',
requestsPerMin: 30, // Remove in v2
requestsPerMonth: 1_000 // Remove in v2
requestsPerMin: 10_000 // Remove in v2
},
1: {
name: 'Free',
Expand All @@ -37,6 +37,10 @@ const PLANS: Record<number, Plan> = {
requestsPerMin: 1_000,
requestsPerMonth: 10_000
},
998: {
name: 'Unknown',
requestsPerMin: 100_000
},
999: {
name: 'Admin'
}
Expand All @@ -49,10 +53,11 @@ const PLANS: Record<number, Plan> = {
* Designed for speed over strictness, always giving user benefit of the doubt
*
* -1 -> Account restricted (monthly limit hit)
* 0 -> No API token (req will be blocked in v2)
* 0 -> Unauthenticated (req will be blocked in v2)
* 1 -> Hobby plan or server/db error
* 2 -> Basic plan
* 998 -> Fallback to db to in case auth store is updating (temp state, gets re-assigned to another value)
* 997 -> Fallback to db to in case auth store is updating (temp state, gets re-assigned to another value)
* 998 -> Unauthenticated and unknown IP. Since this can be multiple diff users, we set a higher rate limit (remove in v2)
* 999 -> Admin access
*
* @param req
Expand All @@ -64,10 +69,17 @@ export const authenticator = async (req: Request, res: Response, next: NextFunct
const apiToken = req.header('X-API-Token')
let accessLevel: number

// Find access level
// Find access level & set rate limiting key
if (!apiToken) {
accessLevel = 0
if (!req.ip) {
accessLevel = 998
req.key = 'unknown'
} else {
accessLevel = 0
req.key = req.ip
}
} else {
req.key = apiToken
const updatedAt: number | undefined = authStore.get('updatedAt')

if (!updatedAt && !authStore.get('isRefreshing')) refreshAuthStore()
Expand All @@ -76,14 +88,14 @@ export const authenticator = async (req: Request, res: Response, next: NextFunct
if (process.env.EE_AUTH_TOKEN === apiToken) {
accessLevel = 999
} else if (accountRestricted === 0) {
accessLevel = authStore.get(`apiToken:${apiToken}:accessLevel`) ?? 998
accessLevel = authStore.get(`apiToken:${apiToken}:accessLevel`) ?? 997
} else {
accessLevel = -1
}
}

// Handle limiting basis access level
if (accessLevel === 998) {
if (accessLevel === 997) {
const response = await fetch(`${process.env.SUPABASE_FETCH_ACCESS_LEVEL_URL}/${apiToken}`, {
method: 'GET',
headers: {
Expand All @@ -96,8 +108,7 @@ export const authenticator = async (req: Request, res: Response, next: NextFunct
}

// --- LIMITING TO BE ACTIVATED IN V2 ---
if (accessLevel === 0) accessLevel = 1
if (accessLevel === -1) accessLevel = 1
if (accessLevel === -1) accessLevel = 0

/*
if (accessLevel === 0) {
Expand Down Expand Up @@ -138,8 +149,7 @@ for (const [level, plan] of Object.entries(PLANS)) {
max: plan.requestsPerMin,
standardHeaders: true,
legacyHeaders: false,
keyGenerator: (req: Request): string =>
accessLevel === 0 ? req.ip ?? 'unknown' : req.header('X-API-Token') || '',
keyGenerator: (req: Request): string => req.key,
message: `You've reached the limit of ${plan.requestsPerMin} requests per minute. ${
accessLevel === 0
? 'Sign up for a plan on https://developer.eigenexplorer.com for increased limits.'
Expand All @@ -157,7 +167,7 @@ for (const [level, plan] of Object.entries(PLANS)) {
* @returns
*/
export const rateLimiter = (req: Request, res: Response, next: NextFunction) => {
const accessLevel = req.accessLevel || 0
const accessLevel = req.accessLevel

// No rate limiting for admin
if (accessLevel === 999) {
Expand All @@ -179,6 +189,7 @@ export const rateLimiter = (req: Request, res: Response, next: NextFunction) =>
const key = `apiToken:${apiToken}:newRequests`
const currentCalls: number = requestsStore.get(key) || 0
requestsStore.set(key, currentCalls + 1)
triggerUserRequestsSync(apiToken)
}
}
} catch {}
Expand Down
3 changes: 2 additions & 1 deletion packages/api/src/utils/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import * as express from 'express'
declare global {
namespace Express {
interface Request {
accessLevel?: number
accessLevel: number
key: string
}
}
}
125 changes: 125 additions & 0 deletions packages/api/src/utils/requestsUpdateManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { requestsStore } from './authCache'

interface UpdatePayload {
key: string
data: {
apiToken: string
requests: number
}
timestamp: string
}

interface QueueState {
current: Map<string, UpdatePayload>
next: Map<string, UpdatePayload>
}

/**
* Manages DB updates for API Token request count
*
*/
class RequestsUpdateManager {
private updateInterval = 60_000 // 1 minute
private updateTimeout: NodeJS.Timeout | null = null
private queue: QueueState = {
current: new Map(),
next: new Map()
}

constructor(private readonly supabaseUrl: string, private readonly supabaseKey: string) {}

async queueUpdate(apiToken: string): Promise<void> {
const requestKey = `apiToken:${apiToken}:newRequests`
const newRequests = Number(requestsStore.get(requestKey)) || 0

if (newRequests === 0) return

const payload: UpdatePayload = {
key: apiToken,
data: {
apiToken,
requests: newRequests
},
timestamp: new Date().toISOString()
}

if (!this.updateTimeout) {
this.queue.current.set(apiToken, payload)
this.scheduleUpdate()
} else {
if (this.queue.current.size === 0) {
this.queue.next.set(apiToken, payload)
} else {
this.queue.current.set(apiToken, payload)
}
}
}

private scheduleUpdate(): void {
if (this.updateTimeout) {
return
}

this.updateTimeout = setTimeout(() => {
this.performUpdate()
}, this.updateInterval)
}

private async performUpdate(): Promise<void> {
try {
if (this.queue.current.size > 0) {
const updatePayload = Array.from(this.queue.current.values())
await this.httpClient(this.supabaseUrl, updatePayload)

// Clear processed requests from cache
for (const payload of updatePayload) {
const requestKey = `apiToken:${payload.data.apiToken}:newRequests`
requestsStore.del(requestKey)
}

console.log(`[Data] User requests sync: size: ${updatePayload.length}`)
}
} catch (error) {
console.error('[Data] Update failed:', error)
} finally {
this.updateTimeout = null
this.queue.current = this.queue.next
this.queue.next = new Map()

if (this.queue.current.size > 0) {
this.scheduleUpdate()
}
}
}

private async httpClient(url: string, data: UpdatePayload[]): Promise<void> {
const response = await fetch(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${this.supabaseKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(data.map((payload) => payload.data))
})

if (!response.ok) {
throw new Error('Failed to post updates')
}
}
}

const updateManager = new RequestsUpdateManager(
// biome-ignore lint/style/noNonNullAssertion: <explanation>
process.env.SUPABASE_POST_REQUESTS_URL!,
// biome-ignore lint/style/noNonNullAssertion: <explanation>
process.env.SUPABASE_SERVICE_ROLE_KEY!
)

/**
* Call this function after a request is received & API Token is identified
*
* @returns
*/
export function triggerUserRequestsSync(apiToken: string) {
return updateManager.queueUpdate(apiToken)
}
93 changes: 0 additions & 93 deletions packages/api/src/utils/userRequestsSync.ts

This file was deleted.

Loading