diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 9c9a08d0..547fb009 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -8,7 +8,6 @@ import helmet from 'helmet' import cors from 'cors' import apiRouter from './routes' import { EigenExplorerApiError, handleAndReturnErrorResponse } from './schema/errors' -import { startUserRequestsSync } from './utils/userRequestsSync' const PORT = process.env.PORT ? Number.parseInt(process.env.PORT) : 3002 @@ -51,6 +50,4 @@ app.use((err: Error, req: Request, res: Response) => { // Start the server app.listen(PORT, () => { console.log(`Server is running on port ${PORT}`) - - startUserRequestsSync() }) diff --git a/packages/api/src/utils/authMiddleware.ts b/packages/api/src/utils/authMiddleware.ts index cd4a7b4a..98d012df 100644 --- a/packages/api/src/utils/authMiddleware.ts +++ b/packages/api/src/utils/authMiddleware.ts @@ -3,6 +3,7 @@ import 'dotenv/config' import type { NextFunction, Request, Response } from 'express' import { authStore, requestsStore } from './authCache' import rateLimit from 'express-rate-limit' +import { triggerUserRequestsSync } from './requestsUpdateManager' // --- Types --- @@ -24,8 +25,7 @@ interface Plan { const PLANS: Record = { 0: { name: 'Unauthenticated', - requestsPerMin: 30, // Remove in v2 - requestsPerMonth: 1_000 // Remove in v2 + requestsPerMin: 10_000 // Remove in v2 }, 1: { name: 'Free', @@ -37,6 +37,10 @@ const PLANS: Record = { requestsPerMin: 1_000, requestsPerMonth: 10_000 }, + 998: { + name: 'Unknown', + requestsPerMin: 100_000 + }, 999: { name: 'Admin' } @@ -49,10 +53,11 @@ const PLANS: Record = { * Designed for speed over strictness, always giving user benefit of the doubt * * -1 -> Account restricted (monthly limit hit) - * 0 -> No API token (req will be blocked in v2) + * 0 -> Unauthenticated (req will be blocked in v2) * 1 -> Hobby plan or server/db error * 2 -> Basic plan - * 998 -> Fallback to db to in case auth store is updating (temp state, gets re-assigned to another value) + * 997 -> Fallback to db to in case auth store is updating (temp state, gets re-assigned to another value) + * 998 -> Unauthenticated and unknown IP. Since this can be multiple diff users, we set a higher rate limit (remove in v2) * 999 -> Admin access * * @param req @@ -64,10 +69,17 @@ export const authenticator = async (req: Request, res: Response, next: NextFunct const apiToken = req.header('X-API-Token') let accessLevel: number - // Find access level + // Find access level & set rate limiting key if (!apiToken) { - accessLevel = 0 + if (!req.ip) { + accessLevel = 998 + req.key = 'unknown' + } else { + accessLevel = 0 + req.key = req.ip + } } else { + req.key = apiToken const updatedAt: number | undefined = authStore.get('updatedAt') if (!updatedAt && !authStore.get('isRefreshing')) refreshAuthStore() @@ -76,14 +88,14 @@ export const authenticator = async (req: Request, res: Response, next: NextFunct if (process.env.EE_AUTH_TOKEN === apiToken) { accessLevel = 999 } else if (accountRestricted === 0) { - accessLevel = authStore.get(`apiToken:${apiToken}:accessLevel`) ?? 998 + accessLevel = authStore.get(`apiToken:${apiToken}:accessLevel`) ?? 997 } else { accessLevel = -1 } } // Handle limiting basis access level - if (accessLevel === 998) { + if (accessLevel === 997) { const response = await fetch(`${process.env.SUPABASE_FETCH_ACCESS_LEVEL_URL}/${apiToken}`, { method: 'GET', headers: { @@ -96,8 +108,7 @@ export const authenticator = async (req: Request, res: Response, next: NextFunct } // --- LIMITING TO BE ACTIVATED IN V2 --- - if (accessLevel === 0) accessLevel = 1 - if (accessLevel === -1) accessLevel = 1 + if (accessLevel === -1) accessLevel = 0 /* if (accessLevel === 0) { @@ -138,8 +149,7 @@ for (const [level, plan] of Object.entries(PLANS)) { max: plan.requestsPerMin, standardHeaders: true, legacyHeaders: false, - keyGenerator: (req: Request): string => - accessLevel === 0 ? req.ip ?? 'unknown' : req.header('X-API-Token') || '', + keyGenerator: (req: Request): string => req.key, message: `You've reached the limit of ${plan.requestsPerMin} requests per minute. ${ accessLevel === 0 ? 'Sign up for a plan on https://developer.eigenexplorer.com for increased limits.' @@ -157,7 +167,7 @@ for (const [level, plan] of Object.entries(PLANS)) { * @returns */ export const rateLimiter = (req: Request, res: Response, next: NextFunction) => { - const accessLevel = req.accessLevel || 0 + const accessLevel = req.accessLevel // No rate limiting for admin if (accessLevel === 999) { @@ -179,6 +189,7 @@ export const rateLimiter = (req: Request, res: Response, next: NextFunction) => const key = `apiToken:${apiToken}:newRequests` const currentCalls: number = requestsStore.get(key) || 0 requestsStore.set(key, currentCalls + 1) + triggerUserRequestsSync(apiToken) } } } catch {} diff --git a/packages/api/src/utils/request.ts b/packages/api/src/utils/request.ts index aed37e71..3d2421dc 100644 --- a/packages/api/src/utils/request.ts +++ b/packages/api/src/utils/request.ts @@ -3,7 +3,8 @@ import * as express from 'express' declare global { namespace Express { interface Request { - accessLevel?: number + accessLevel: number + key: string } } } diff --git a/packages/api/src/utils/requestsUpdateManager.ts b/packages/api/src/utils/requestsUpdateManager.ts new file mode 100644 index 00000000..a8b53dc4 --- /dev/null +++ b/packages/api/src/utils/requestsUpdateManager.ts @@ -0,0 +1,125 @@ +import { requestsStore } from './authCache' + +interface UpdatePayload { + key: string + data: { + apiToken: string + requests: number + } + timestamp: string +} + +interface QueueState { + current: Map + next: Map +} + +/** + * Manages DB updates for API Token request count + * + */ +class RequestsUpdateManager { + private updateInterval = 60_000 // 1 minute + private updateTimeout: NodeJS.Timeout | null = null + private queue: QueueState = { + current: new Map(), + next: new Map() + } + + constructor(private readonly supabaseUrl: string, private readonly supabaseKey: string) {} + + async queueUpdate(apiToken: string): Promise { + const requestKey = `apiToken:${apiToken}:newRequests` + const newRequests = Number(requestsStore.get(requestKey)) || 0 + + if (newRequests === 0) return + + const payload: UpdatePayload = { + key: apiToken, + data: { + apiToken, + requests: newRequests + }, + timestamp: new Date().toISOString() + } + + if (!this.updateTimeout) { + this.queue.current.set(apiToken, payload) + this.scheduleUpdate() + } else { + if (this.queue.current.size === 0) { + this.queue.next.set(apiToken, payload) + } else { + this.queue.current.set(apiToken, payload) + } + } + } + + private scheduleUpdate(): void { + if (this.updateTimeout) { + return + } + + this.updateTimeout = setTimeout(() => { + this.performUpdate() + }, this.updateInterval) + } + + private async performUpdate(): Promise { + try { + if (this.queue.current.size > 0) { + const updatePayload = Array.from(this.queue.current.values()) + await this.httpClient(this.supabaseUrl, updatePayload) + + // Clear processed requests from cache + for (const payload of updatePayload) { + const requestKey = `apiToken:${payload.data.apiToken}:newRequests` + requestsStore.del(requestKey) + } + + console.log(`[Data] User requests sync: size: ${updatePayload.length}`) + } + } catch (error) { + console.error('[Data] Update failed:', error) + } finally { + this.updateTimeout = null + this.queue.current = this.queue.next + this.queue.next = new Map() + + if (this.queue.current.size > 0) { + this.scheduleUpdate() + } + } + } + + private async httpClient(url: string, data: UpdatePayload[]): Promise { + const response = await fetch(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.supabaseKey}`, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(data.map((payload) => payload.data)) + }) + + if (!response.ok) { + throw new Error('Failed to post updates') + } + } +} + +const updateManager = new RequestsUpdateManager( + // biome-ignore lint/style/noNonNullAssertion: + process.env.SUPABASE_POST_REQUESTS_URL!, + // biome-ignore lint/style/noNonNullAssertion: + process.env.SUPABASE_SERVICE_ROLE_KEY! +) + +/** + * Call this function after a request is received & API Token is identified + * + * @returns + */ +export function triggerUserRequestsSync(apiToken: string) { + return updateManager.queueUpdate(apiToken) +} diff --git a/packages/api/src/utils/userRequestsSync.ts b/packages/api/src/utils/userRequestsSync.ts deleted file mode 100644 index 9e659794..00000000 --- a/packages/api/src/utils/userRequestsSync.ts +++ /dev/null @@ -1,93 +0,0 @@ -import type { User } from './authMiddleware' -import { requestsStore } from './authCache' -import cron from 'node-cron' - -/** - * Send updates to DB with number of requests in the past hour per user - * Cron job runs at the start of every hour - * - */ -export function startUserRequestsSync() { - cron.schedule('0 * * * *', async () => { - console.time('[Data] User requests sync') - - let isUpdateSuccess = true - let skip = 0 - const take = 10_000 - - while (true) { - try { - const getResponse = await fetch( - `${process.env.SUPABASE_FETCH_ALL_USERS_URL}?skip=${skip}&take=${take}`, - { - method: 'GET', - headers: { - Authorization: `Bearer ${process.env.SUPABASE_SERVICE_ROLE_KEY}`, - 'Content-Type': 'application/json' - } - } - ) - - if (!getResponse.ok) { - throw new Error() - } - - const users = (await getResponse.json()).data as User[] - - if (users.length === 0) break - - const updateList: { id: string; requests: number }[] = [] - for (const user of users) { - const apiTokens = user.apiTokens ?? [] - let totalNewRequests = 0 - - for (const apiToken of apiTokens) { - const key = `apiToken:${apiToken}:newRequests` - const newRequests = Number(requestsStore.get(key)) || 0 - if (newRequests > 0) totalNewRequests += newRequests - requestsStore.del(key) - } - - if (totalNewRequests > 0) { - updateList.push({ - id: user.id, - requests: user.requests + totalNewRequests - }) - } - } - - if (updateList.length > 0) { - const postResponse = await fetch(`${process.env.SUPABASE_POST_REQUESTS_URL}`, { - method: 'POST', - headers: { - Authorization: `Bearer ${process.env.SUPABASE_SERVICE_ROLE_KEY}`, - 'Content-Type': 'application/json' - }, - body: JSON.stringify(updateList) - }) - - if (!postResponse.ok) { - throw new Error() - } - - for (const user of users) { - const apiTokens = user.apiTokens ?? [] - - for (const apiToken of apiTokens) { - requestsStore.del(`apiToken:${apiToken}:newRequests`) - } - } - - console.log(`[Data] User requests sync: size: ${updateList.length}`) - } - } catch { - isUpdateSuccess = false - } - - skip += take - } - - if (isUpdateSuccess) requestsStore.flushAll() // Delete remaining (stale) keys once full update is successful - console.timeEnd('[Data] User requests sync') - }) -}