diff --git a/README.md b/README.md index 58731aa..629ba0a 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,27 @@ function job:start() end ``` +## Rate (token bucket rateimiter) + +Implements classic Token Bucket algorithm limited with `burst` (integer non-negative value) and `rps` (floating value). + +If you need to limit your requests to resource with `rps`, you might want to create Token Bucket with `burst=1` and `rps=rps`. + +To limit your requests you better use common method `rate:wait()` it awaits only single token but not limited with timeout. + +When you need to wait rate-limiter at most `timeout` seconds then you can specify that as first argument: `rate:wait(timeout)`. + +Note, that rate-limiter returns immediately if token can't be awaited within provided `timeout` (noyield response). + +```lua +local rate = sync.rate.new("rlimit", 10/60, 1) -- 10 requests / 60 seconds, with burst=1 +assert(rate:wait()) -- infinitely wait for ratelimit +``` + +Read more about Token Bucket + +Inspired by golang's time/rate + ## More plans and ideas to implement There are several ideas may be implemented. PR's or proposals are welcome diff --git a/rockspecs/sync-scm-1.rockspec b/rockspecs/sync-scm-1.rockspec index f871194..2ef2d95 100644 --- a/rockspecs/sync-scm-1.rockspec +++ b/rockspecs/sync-scm-1.rockspec @@ -21,6 +21,7 @@ build = { ['sync.lock'] = 'sync/lock.lua'; ['sync.latch'] = 'sync/latch.lua'; ['sync.pool'] = 'sync/pool.lua'; + ['sync.rate'] = 'sync/rate.lua'; } } diff --git a/sync.lua b/sync.lua index ba85257..1ededb7 100644 --- a/sync.lua +++ b/sync.lua @@ -1,5 +1,5 @@ local sync = { - _VERSION = '0.10.1', + _VERSION = '0.11.0', } sync.cond = require 'sync.cond' @@ -8,5 +8,6 @@ sync.cv = sync.wg -- backward compatibility with old interface sync.lock = require 'sync.lock' sync.latch = require 'sync.latch' sync.pool = require 'sync.pool' +sync.rate = require 'sync.rate' return sync diff --git a/sync/rate.lua b/sync/rate.lua new file mode 100644 index 0000000..b1400b3 --- /dev/null +++ b/sync/rate.lua @@ -0,0 +1,268 @@ +local fiber = require "fiber" + +---@class sync.rate +---@field name string name of the limit +---@field rps number limit is events per second +---@field burst number burst is maximum number of tokens in limiter +---@field tokens number current tokens in the limiter +---@field last_ts number last timestamp when limiter was updated with tokens +---@field last_event number last timestamp of rate-limited event +local rate = {} + +rate.__index = rate +rate.__tostring = function (self) + return ("rate<%s [%.1f/%s:%0.1f/s]>"):format( + self.name or 'anon', + self.tokens or 0, self.burst or 0, self.rps or 0 + ) +end +setmetatable(rate, { __call = function (_, name, ...) return _.new(name, ...) end }) + +---@class sync.rate.reservation +---@field lim sync.rate +---@field tokens number +---@field timeToAct number +local reservation_mt = {} +reservation_mt.__index = reservation_mt + +---Cancels reservation. This means that requestor will not perform action under this reservation +---@param timestamp number? timestamp (default=now()) +function reservation_mt:cancel(timestamp) + timestamp = tonumber(timestamp) or fiber.time() + + local lim = self.lim + + -- limiter is infinite + if lim.rps == math.huge then return end + + -- no tokens to return + if self.tokens == 0 then return end + + -- time of action already passed (nothing can be returned) + if self.timeToAct < timestamp then return end + + local restore = self.tokens - (lim.last_event - self.timeToAct) * lim.rps + if restore <= 0 then return end + + local tokens + timestamp, tokens = lim:_advance(timestamp) + + tokens = math.min(lim.burst, tokens + restore) + lim.last_ts = timestamp + lim.tokens = tokens + + --- think about this: + if self.timeToAct == lim.last_event then + local prev_event = self.timeToAct + lim:_durationFromTokens(-self.tokens) + if prev_event >= timestamp then + lim.last_event = prev_event + end + end +end + + +---Creates new ratelimit +---@param name string? name of the ratelimit +---@param rps number float limit per second +---@param burst integer? allowed burst (default=0) +---@return sync.rate +function rate.new(name, rps, burst) + if name == rate then error("Usage: rate.new(name, [rps, burst]) or rate(name, [rps, burst]) (not rate:new(...))", 2) end + rps = tonumber(rps) or 0 + burst = math.floor(tonumber(burst) or 0) + + if rps < 0 then error("Usage: rate.new(name, [rps, burst]) rps must be non negative", 2) end + if burst < 0 then error("Usage: rate.new(name, [rps, burst]) burst must be non negative", 2) end + + return setmetatable({ + name = name; + rps = rps; + burst = burst or 0; + tokens = burst or 0; + last_ts = 0; + last_event = 0; + }, rate) +end + +---Calucalates number of tokens which will be available at time `t` +---@local +---@param timestamp number +function rate:_advance(timestamp) + timestamp = assert(tonumber(timestamp)) + + local elapsed = math.max(0, timestamp - self.last_ts) + + local delta + if self.rps <= 0 then + delta = 0 + else + delta = self.rps * elapsed + end + + return timestamp, math.min(self.burst, self.tokens + delta) +end + +---Returns duration in fractinal seconds from token +--- +---Can return `math.huge` if limit is non-positive +---@param tokens number +---@return number duration +function rate:_durationFromTokens(tokens) + if self.rps <= 0 then + return math.huge + end + + return tokens / self.rps +end + +---Reserves and advances limiter for requested tokens +---@local +---@param time number +---@param n number +---@param wait number +---@return boolean|sync.rate.reservation reservation, any? error_or_time_to_act +function rate:_reserve(time, n, wait) + if self.rps == math.huge then + return true + end + if self.rps == 0 then + if self.burst >= n then + self.burst = self.burst - n + return true + end + return false, "not enough burst" + end + + if self.burst < n then + return false, "not enough burst" + end + + local tokens + time, tokens = self:_advance(time) + + tokens = tokens - n + + local waitDuration = 0 + if tokens < 0 then + -- not enough tokens + waitDuration = self:_durationFromTokens(-tokens) + end + + if waitDuration > wait then + return false, "would exceed given timeout" + end + + local timeToAct = time+waitDuration + -- update state + self.last_ts = time + self.tokens = tokens + self.last_event = timeToAct + + return true, timeToAct +end + +---Awaits rate until `n` events allowed within given timeout (default timeout=infinity) +--- +---Can return instant `false` when required tokens can't be awaited in given `timeout` +--- +---**Usage:** +--- +--- -- wait for single event infinitely +--- assert(rate:wait()) +--- +--- -- await instant token (noyield) +--- if rate:wait(0) then +--- -- ratelimit granted +--- end +--- +--- -- await 1 token within 100ms +--- assert(rate:wait(0.1)) +--- +--- -- await 2 tokens within 100ms +--- assert(rate:wait(0.1, 2)) +---@async +---@param timeout number? timeout to wait +---@param n number? +---@return boolean success, string? error_message # true in case event was awaited, false otherwise +function rate:wait(timeout, n) + if getmetatable(self) ~= rate then + error("Usage: rate:wait() (not rate.wait())", 2) + end + + timeout = tonumber(timeout) or math.huge + n = tonumber(n) or 1 + + if n > self.burst and self.rps ~= math.huge then + return false, ("rate:wait(timeout=%s, n=%s) exceeds limiters burst=%s"):format(timeout, n, self.burst) + end + + local now = fiber.time() + local waitLim = math.min(timeout, math.huge) + + local ok, ret = self:_reserve(now, n, waitLim) + if not ok then + local err = ret + return false, ("rate:wait(timeout=%s, n=%s) %s"):format(timeout, n, err) + end + + local timeToAct = ret + local delay = math.max(0, timeToAct - now) + if delay > 0 then + fiber.sleep(delay) + end + + return true +end + +---Reports whether `n` events might be happen at time `timestamp`. +--- +---Does not reserves tokens in limiter +---@param timestamp number? timestamp in seconds (default=now()) +---@param n number? number of events required (default=1) +---@return boolean allowed, string? error_message +function rate:allow(timestamp, n) + if getmetatable(self) ~= rate then + error("Usage: rate:allow() (not rate.allow())", 2) + end + + timestamp = tonumber(timestamp) or fiber.time() + n = tonumber(n) or 1 + + local ok, ret = self:_reserve(timestamp, n, 0) + if not ok then + local err = ret + return false, err + end + return true +end + +---Reserves `n` tokens at time `timestamp` +---@param timestamp number? timestamp in seconds (default=now()) +---@param n? number number of events to be reserved (default=1) +---@return sync.rate.reservation|false, string? error_message +function rate:reserve(timestamp, n) + if getmetatable(self) ~= rate then + error("Usage: rate:reserve() (not rate.reserve())", 2) + end + + timestamp = tonumber(timestamp) or fiber.time() + n = tonumber(n) or 1 + + local ok, ret = self:_reserve(timestamp, n, math.huge) + if not ok then + local err = ret + return false, err + end + + local timeToAct = ret + + local r = setmetatable({ + lim = self, + tokens = n, + timeToAct = timeToAct, + }, reservation_mt) + + return r +end + +return rate diff --git a/test/06-limit.test.lua b/test/06-limit.test.lua new file mode 100644 index 0000000..4fb3c16 --- /dev/null +++ b/test/06-limit.test.lua @@ -0,0 +1,149 @@ +#!/usr/bin/env tarantool + +local fiber = require 'fiber' +local test = require('test.ex').test('rate') + +local sync = require('sync') +test:ok(sync.rate.new('rate', 1, 1), 'anon new by method') +test:ok(sync.rate(1, 1), 'anon new by call') + +test:raises(function() + sync.rate:new() +end, 'wrong constructor call') + +test:raises(function() + local rate = sync.rate() + rate.wait() +end, 'Usage: rate:wait%(%) %(not rate%.wait%(%)%)', 'static call wait on object') +test:raises(function() + local rate = sync.rate() + rate.allow() +end, 'Usage: rate:allow%(%) %(not rate%.allow%(%)%)', 'static call allow on object') +test:raises(function() + local rate = sync.rate() + rate.reserve() +end, 'Usage: rate:reserve%(%) %(not rate%.reserve%(%)%)', 'static call reserve on object') + +local function almost_gt(a, b, eps) + if a > b then + return true + elseif a > b - eps then + return true + else + return false + end +end + +test:deadline(function() + local rate = sync.rate("rate", 3, 1) -- 3 rps / 1 at a time + + -- rate is full + test:noyield(function() + test:ok(rate:wait(), "first wait always return true") + end) + + local s = fiber.time() + test:ok(rate:wait(), "second token awaited") + + local elapsed = fiber.time()-s + test:ok(elapsed >= 1/rate.rps, "second wait is above rps") + + s = fiber.time() + for i = 1, 6 do + test:ok(rate:wait(), "token " .. i .. " awaited") + end + local e = fiber.time() - s + + test:ok(almost_gt(e, 2, 0.01), "6 requests must take more than 2 seconds (rps=3)") + test:ok(almost_gt(2.1, e, 0.01), "6 requests must take less than 2.1 seconds (rps=3)") +end, 3, "rate:wait(inf, 1)") + +test:deadline(function() + local rate = sync.rate("rate", 3, 1) + + test:noyield(function() + test:ok(rate:wait(0.1), "token instantly awaited") + end) + + test:noyield(function () + local ok, err = rate:wait(0.1) + test:is(ok, false, "second token can't be available in 100ms (with rps=3)") + test:is(err, "rate:wait(timeout=0.1, n=1) would exceed given timeout", "error message is correct") + end) + + local s = fiber.time() + test:ok(rate:wait(0.35), "token can be awaited in 350ms (rps=3)") + local e = fiber.time()-s + test:ok(almost_gt(e, 0.33, 0.01), "token was awaited in aproximately 330ms±10ms") +end, 3, "rate:wait(0.1, 1)") + +test:deadline(function() + local rate = sync.rate("rate", 3, 1) + + test:noyield(function() + local ok, err = rate:wait(nil, 3) -- request more than burst + test:is(ok, false, "can't be taken more than burst tokens") + test:is(err, "rate:wait(timeout=inf, n=3) exceeds limiters burst=1", "correct error message") + end) + + test:noyield(function() + local ok, err = rate:allow(nil, 3) -- request more than burst + test:is(ok, false, "can't be taken more than burst tokens") + test:is(err, "not enough burst", "correct error message") + end) + + test:noyield(function() + local ok, err = rate:reserve(nil, 3) -- request more than burst + test:is(ok, false, "can't be taken more than burst tokens") + test:is(err, "not enough burst", "correct error message") + end) + +end, 3, "rate with n > burst") + +test:deadline(function() + local rate = sync.rate.new("rate", 3, 1) + print(rate) + test:ok(rate:allow(), "instant allow is okey") -- token was withdrawn + + test:noyield(function() + local reserv = rate:reserve() + assert(reserv) + test:ok(reserv, "reservation is returned") + + test:ok(almost_gt(reserv.timeToAct, fiber.time()+1/rate.rps, 0.01), "timeToAct ≥ now+1/rps") + test:ok(almost_gt(fiber.time()+2/rate.rps, reserv.timeToAct, 0.01), "timeToAct ≤ now+2/rps") + reserv:cancel() -- cancell reservation + + reserv = rate:reserve() + assert(reserv) + test:ok(reserv, "reservation is returned") + test:ok(almost_gt(reserv.timeToAct, fiber.time()+1/rate.rps, 0.01), "timeToAct ≥ now+1/rps") + test:ok(almost_gt(fiber.time()+2/rate.rps, reserv.timeToAct, 0.01), "timeToAct ≤ now+2/rps") + end) +end, 3, "instant allow and instant reservation") + +test:deadline(function() + local rate = sync.rate.new("rate", 0, 3) + test:noyield(function() + test:ok(rate:allow(), "token 1 - ok") + test:ok(rate:allow(), "token 2 - ok") + test:ok(rate:allow(), "token 3 - ok") + + local nok, err = rate:allow() + test:ok(not nok, "token 4 - nok (rate was drained)") + test:is(err, "not enough burst", "drained rate - not enough burst") + end) +end, 3, "rps=0") + +test:deadline(function() + local rate = sync.rate.new("rate", 1/0) + + test:noyield(function() + for _ = 1, 5 do + test:ok(rate:allow(), "infinite rate almost allows") + end + end) + +end, 3, "rps=inf") + +test:done_testing()