Skip to content

Commit a798da5

Browse files
authoredMar 9, 2024
Re-implement rate limiter using postgres (frain-dev#1937)
* feat: reimplement rate limiter using postgres * feat: add bucket size (time duration) for rate limit * chore: update tests * chore: revert postgres pkg changes * chore: go mod tidy * feat: update rate when endpoint is updated * feat: set default rate limit value to 0 * chore: update function name, add benchmarks * chore: update test * chore: update changes from main, remove dependency on the subscription repo
1 parent acfaac9 commit a798da5

21 files changed

+306
-384
lines changed
 

‎cmd/agent/agent.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414
"github.com/frain-dev/convoy/database/postgres"
1515
"github.com/frain-dev/convoy/internal/pkg/cli"
1616
"github.com/frain-dev/convoy/internal/pkg/fflag"
17+
"github.com/frain-dev/convoy/internal/pkg/limiter"
1718
"github.com/frain-dev/convoy/internal/pkg/memorystore"
1819
"github.com/frain-dev/convoy/internal/pkg/pubsub"
1920
"github.com/frain-dev/convoy/internal/pkg/server"
20-
"github.com/frain-dev/convoy/limiter"
2121
"github.com/frain-dev/convoy/pkg/log"
2222
"github.com/frain-dev/convoy/worker"
2323
"github.com/frain-dev/convoy/worker/task"
@@ -202,16 +202,12 @@ func startWorkerComponent(ctx context.Context, a *cli.App) error {
202202
subRepo := postgres.NewSubscriptionRepo(a.DB, a.Cache)
203203
deviceRepo := postgres.NewDeviceRepo(a.DB, a.Cache)
204204

205-
rateLimiter, err := limiter.NewLimiter(cfg.Redis)
206-
if err != nil {
207-
a.Logger.Debug("Failed to initialise rate limiter")
208-
}
205+
rateLimiter := limiter.NewLimiter(a.DB)
209206

210207
consumer.RegisterHandlers(convoy.EventProcessor, task.ProcessEventDelivery(
211208
endpointRepo,
212209
eventDeliveryRepo,
213210
projectRepo,
214-
subRepo,
215211
a.Queue,
216212
rateLimiter))
217213

‎const.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ const (
88
)
99

1010
const (
11-
RATE_LIMIT = 5000
12-
RATE_LIMIT_DURATION = 60
13-
RATE_LIMIT_DURATION_IN_DURATION = time.Duration(RATE_LIMIT_DURATION) * time.Second
14-
HTTP_TIMEOUT = 10
15-
HTTP_TIMEOUT_IN_DURATION = time.Duration(HTTP_TIMEOUT) * time.Second
11+
RATE_LIMIT = 0 // should be deleted
12+
RATE_LIMIT_DURATION = 0 // should be deleted
13+
HTTP_TIMEOUT = 10
14+
HTTP_TIMEOUT_IN_DURATION = time.Duration(HTTP_TIMEOUT) * time.Second
1615
)

‎database/postgres/postgres.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@ import (
44
"database/sql"
55
"errors"
66
"fmt"
7+
"github.com/uptrace/opentelemetry-go-extra/otelsql"
8+
"github.com/uptrace/opentelemetry-go-extra/otelsqlx"
79
"io"
810
"time"
911

12+
"github.com/frain-dev/convoy/config"
1013
"github.com/frain-dev/convoy/database/hooks"
1114
"github.com/frain-dev/convoy/pkg/log"
12-
"github.com/uptrace/opentelemetry-go-extra/otelsql"
13-
"github.com/uptrace/opentelemetry-go-extra/otelsqlx"
14-
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
15-
16-
"github.com/frain-dev/convoy/config"
1715
"github.com/jmoiron/sqlx"
1816
_ "github.com/newrelic/go-agent/v3/integrations/nrpq"
17+
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
1918
)
2019

2120
const pkgName = "postgres"

‎datastore/models.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -398,13 +398,13 @@ type Endpoint struct {
398398
SupportEmail string `json:"support_email,omitempty" db:"support_email"`
399399
AppID string `json:"-" db:"app_id"` // Deprecated but necessary for backward compatibility
400400

401-
HttpTimeout uint64 `json:"http_timeout" db:"http_timeout"`
402-
RateLimit int `json:"rate_limit" db:"rate_limit"`
403-
Events int64 `json:"events,omitempty" db:"event_count"`
404-
Status EndpointStatus `json:"status" db:"status"`
401+
Status EndpointStatus `json:"status" db:"status"`
402+
HttpTimeout uint64 `json:"http_timeout" db:"http_timeout"`
403+
Events int64 `json:"events,omitempty" db:"event_count"`
404+
Authentication *EndpointAuthentication `json:"authentication" db:"authentication"`
405405

406-
RateLimitDuration uint64 `json:"rate_limit_duration" db:"rate_limit_duration"`
407-
Authentication *EndpointAuthentication `json:"authentication" db:"authentication"`
406+
RateLimit int `json:"rate_limit" db:"rate_limit"`
407+
RateLimitDuration uint64 `json:"rate_limit_duration" db:"rate_limit_duration"`
408408

409409
CreatedAt time.Time `json:"created_at,omitempty" db:"created_at,omitempty" swaggertype:"string"`
410410
UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at,omitempty" swaggertype:"string"`

‎generate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package convoy
22

33
//go:generate mockgen --source datastore/repository.go --destination mocks/repository.go -package mocks
44
//go:generate mockgen --source queue/queue.go --destination mocks/queue.go -package mocks
5-
//go:generate mockgen --source limiter/limiter.go --destination mocks/limiter.go -package mocks
5+
//go:generate mockgen --source internal/pkg/limiter/limiter.go --destination mocks/limiter.go -package mocks
66
//go:generate mockgen --source cache/cache.go --destination mocks/cache.go -package mocks
77
//go:generate mockgen --source internal/pkg/smtp/smtp.go --destination mocks/smtp.go -package mocks
88
//go:generate mockgen --source internal/pkg/socket/socket.go --destination mocks/socket.go -package mocks

‎go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ require (
2020
github.com/go-chi/chi/v5 v5.0.10
2121
github.com/go-chi/render v1.0.1
2222
github.com/go-redis/cache/v9 v9.0.0
23-
github.com/go-redis/redis_rate/v10 v10.0.1
2423
github.com/go-redsync/redsync/v4 v4.8.1
2524
github.com/golang-jwt/jwt v3.2.2+incompatible
2625
github.com/golang/mock v1.6.0

‎go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7W
215215
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
216216
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
217217
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
218-
github.com/go-redis/redis_rate/v10 v10.0.1 h1:calPxi7tVlxojKunJwQ72kwfozdy25RjA0bCj1h0MUo=
219-
github.com/go-redis/redis_rate/v10 v10.0.1/go.mod h1:EMiuO9+cjRkR7UvdvwMO7vbgqJkltQHtwbdIQvaBKIU=
220218
github.com/go-redsync/redsync/v4 v4.8.1 h1:rq2RvdTI0obznMdxKUWGdmmulo7lS9yCzb8fgDKOlbM=
221219
github.com/go-redsync/redsync/v4 v4.8.1/go.mod h1:LmUAsQuQxhzZAoGY7JS6+dNhNmZyonMZiiEDY9plotM=
222220
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=

‎internal/pkg/limiter/limiter.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package limiter
2+
3+
import (
4+
"context"
5+
"github.com/frain-dev/convoy/database"
6+
"github.com/frain-dev/convoy/internal/pkg/limiter/pg"
7+
)
8+
9+
type RateLimiter interface {
10+
// Allow rate limits outgoing events to endpoints based on a rate in a specified time duration by the endpoint id
11+
Allow(ctx context.Context, key string, rate int, duration int) error
12+
}
13+
14+
func NewLimiter(db database.Database) RateLimiter {
15+
ra := pg.NewRateLimiter(db)
16+
return ra
17+
}

‎internal/pkg/limiter/pg/client.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package pg
2+
3+
import (
4+
"context"
5+
"errors"
6+
"github.com/frain-dev/convoy/database"
7+
"github.com/frain-dev/convoy/pkg/log"
8+
)
9+
10+
type SlidingWindowRateLimiter struct {
11+
db database.Database
12+
}
13+
14+
func NewRateLimiter(db database.Database) *SlidingWindowRateLimiter {
15+
return &SlidingWindowRateLimiter{db: db}
16+
}
17+
18+
func (p *SlidingWindowRateLimiter) Allow(ctx context.Context, key string, rate int, bucketSize int) error {
19+
return p.takeToken(ctx, key, rate, bucketSize)
20+
}
21+
22+
// TakeToken is a sliding window rate limiter that tries to take a token from the bucket
23+
//
24+
// Creates the bucket if it doesn't exist and returns false if it is not successful.
25+
// Returns true otherwise
26+
func (p *SlidingWindowRateLimiter) takeToken(ctx context.Context, key string, rate int, windowSize int) error {
27+
// if one of rate and bucket size if zero, we skip processing
28+
if rate == 0 || windowSize == 0 {
29+
return nil
30+
}
31+
32+
tx, err := p.db.GetDB().BeginTxx(ctx, nil)
33+
if err != nil {
34+
return err
35+
}
36+
37+
var allowed bool
38+
err = tx.QueryRowContext(ctx, `select convoy.take_token($1, $2, $3)::bool;`, key, rate, windowSize).Scan(&allowed)
39+
if err != nil {
40+
return err
41+
}
42+
43+
err = tx.Commit()
44+
if err != nil {
45+
if rollbackErr := tx.Rollback(); rollbackErr != nil {
46+
log.Infof("update failed: %v, unable to rollback: %v", err, rollbackErr)
47+
}
48+
return err
49+
}
50+
51+
if !allowed {
52+
return errors.New("rate limit error")
53+
}
54+
55+
return nil
56+
}
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package pg
2+
3+
import (
4+
"context"
5+
"github.com/frain-dev/convoy/config"
6+
"github.com/frain-dev/convoy/database/postgres"
7+
"github.com/frain-dev/convoy/pkg/log"
8+
"github.com/stretchr/testify/require"
9+
"os"
10+
"testing"
11+
)
12+
13+
func getConfig() config.Configuration {
14+
_ = os.Setenv("CONVOY_DB_HOST", os.Getenv("TEST_REDIS_HOST"))
15+
_ = os.Setenv("CONVOY_REDIS_SCHEME", os.Getenv("TEST_REDIS_SCHEME"))
16+
_ = os.Setenv("CONVOY_REDIS_PORT", os.Getenv("TEST_REDIS_PORT"))
17+
18+
_ = os.Setenv("CONVOY_DB_HOST", os.Getenv("TEST_DB_HOST"))
19+
_ = os.Setenv("CONVOY_DB_SCHEME", os.Getenv("TEST_DB_SCHEME"))
20+
_ = os.Setenv("CONVOY_DB_USERNAME", os.Getenv("TEST_DB_USERNAME"))
21+
_ = os.Setenv("CONVOY_DB_PASSWORD", os.Getenv("TEST_DB_PASSWORD"))
22+
_ = os.Setenv("CONVOY_DB_DATABASE", os.Getenv("TEST_DB_DATABASE"))
23+
_ = os.Setenv("CONVOY_DB_PORT", os.Getenv("TEST_DB_PORT"))
24+
25+
err := config.LoadConfig("")
26+
if err != nil {
27+
log.Fatal(err)
28+
}
29+
30+
cfg, err := config.Get()
31+
if err != nil {
32+
log.Fatal(err)
33+
}
34+
35+
return cfg
36+
}
37+
38+
func BenchmarkTakeToken_TakeOneToken(b *testing.B) {
39+
db, err := postgres.NewDB(getConfig())
40+
require.NoError(b, err)
41+
42+
b.ResetTimer()
43+
b.ReportAllocs()
44+
45+
for i := 0; i < b.N; i++ {
46+
rateLimiter := NewRateLimiter(db)
47+
tokenErr := rateLimiter.takeToken(context.Background(), "test", 100_000_000, 1)
48+
require.NoError(b, tokenErr)
49+
}
50+
}
51+
52+
func BenchmarkTakeToken_TakeNone(b *testing.B) {
53+
db, err := postgres.NewDB(getConfig())
54+
require.NoError(b, err)
55+
56+
b.ResetTimer()
57+
b.ReportAllocs()
58+
59+
for i := 0; i < b.N; i++ {
60+
rateLimiter := NewRateLimiter(db)
61+
tokenErr := rateLimiter.takeToken(context.Background(), "test-2", 0, 0)
62+
require.NoError(b, tokenErr)
63+
}
64+
}

‎limiter/limiter.go

-22
This file was deleted.

‎limiter/noop/client.go

-36
This file was deleted.

‎limiter/redis/client.go

-76
This file was deleted.

0 commit comments

Comments
 (0)