Skip to content

Commit 1cac88b

Browse files
authored
feat: add cache to the org and api key repositories, add profiling route (frain-dev#1822)
* chore: remove context from tests * chore: remove context from queue.Write * chore: add cache to apikey and user repos * feat: added profiling to server * Implement TinyLFU Caching in Redis client * feat: add cache to source repo * Remove unused import and parameter in source_loader * Remove unnecessary goroutine from event creation * fix: undo removal of project id from source repo functions * feat: properly close rows after reading from the db * chore: fix export query
1 parent 8ad79f3 commit 1cac88b

File tree

76 files changed

+687
-505
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+687
-505
lines changed

api/dashboard/source.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package dashboard
22

33
import (
4+
"errors"
45
"fmt"
5-
"github.com/frain-dev/convoy"
66
"net/http"
77

88
"github.com/frain-dev/convoy/pkg/log"
@@ -80,7 +80,7 @@ func (a *DashboardHandler) GetSourceByID(w http.ResponseWriter, r *http.Request)
8080

8181
source, err := postgres.NewSourceRepo(a.A.DB, a.A.Cache).FindSourceByID(r.Context(), project.UID, chi.URLParam(r, "sourceID"))
8282
if err != nil {
83-
if err == datastore.ErrSourceNotFound {
83+
if errors.Is(err, datastore.ErrSourceNotFound) {
8484
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound))
8585
return
8686
}
@@ -134,7 +134,7 @@ func (a *DashboardHandler) UpdateSource(w http.ResponseWriter, r *http.Request)
134134

135135
source, err := postgres.NewSourceRepo(a.A.DB, a.A.Cache).FindSourceByID(r.Context(), project.UID, chi.URLParam(r, "sourceID"))
136136
if err != nil {
137-
if err == datastore.ErrSourceNotFound {
137+
if errors.Is(err, datastore.ErrSourceNotFound) {
138138
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound))
139139
return
140140
}
@@ -192,7 +192,7 @@ func (a *DashboardHandler) DeleteSource(w http.ResponseWriter, r *http.Request)
192192

193193
source, err := sourceRepo.FindSourceByID(r.Context(), project.UID, chi.URLParam(r, "sourceID"))
194194
if err != nil {
195-
if err == datastore.ErrSourceNotFound {
195+
if errors.Is(err, datastore.ErrSourceNotFound) {
196196
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound))
197197
return
198198
}
@@ -207,15 +207,6 @@ func (a *DashboardHandler) DeleteSource(w http.ResponseWriter, r *http.Request)
207207
return
208208
}
209209

210-
if source.Provider == datastore.TwitterSourceProvider {
211-
sourceCacheKey := convoy.SourceCacheKey.Get(source.MaskID).String()
212-
err = a.A.Cache.Delete(r.Context(), sourceCacheKey)
213-
if err != nil {
214-
_ = render.Render(w, r, util.NewErrorResponse("failed to delete source cache", http.StatusBadRequest))
215-
return
216-
}
217-
}
218-
219210
_ = render.Render(w, r, util.NewServerResponse("Source deleted successfully", nil, http.StatusOK))
220211
}
221212

api/ingest.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
188188
Delay: 0,
189189
}
190190

191-
err = a.A.Queue.Write(r.Context(), convoy.CreateEventProcessor, convoy.CreateEventQueue, job)
191+
err = a.A.Queue.Write(convoy.CreateEventProcessor, convoy.CreateEventQueue, job)
192192
if err != nil {
193193
a.A.Logger.WithError(err).Error("Error occurred sending new event to the queue")
194194
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))

api/public/event.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (a *PublicHandler) CreateEndpointEvent(w http.ResponseWriter, r *http.Reque
7878
Delay: 0,
7979
}
8080

81-
err = a.A.Queue.Write(r.Context(), convoy.CreateEventProcessor, convoy.CreateEventQueue, job)
81+
err = a.A.Queue.Write(convoy.CreateEventProcessor, convoy.CreateEventQueue, job)
8282
if err != nil {
8383
log.FromContext(r.Context()).Errorf("Error occurred sending new event to the queue %s", err)
8484
}
@@ -192,8 +192,8 @@ func (a *PublicHandler) CreateDynamicEvent(w http.ResponseWriter, r *http.Reques
192192
// @Summary Replay event
193193
// @Description This endpoint replays an event afresh assuming it is a new event.
194194
// @Tags Events
195-
// @Accept json
196-
// @Produce json
195+
// @Accept json
196+
// @Produce json
197197
// @Param projectID path string true "Project ID"
198198
// @Param eventID path string true "event id"
199199
// @Success 200 {object} util.ServerResponse{data=models.EventResponse}

api/public/source.go

+6-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package public
22

33
import (
4+
"errors"
45
"fmt"
56
"net/http"
67

7-
"github.com/frain-dev/convoy"
8-
98
"github.com/frain-dev/convoy/pkg/log"
109

1110
"github.com/frain-dev/convoy/api/models"
@@ -100,7 +99,7 @@ func (a *PublicHandler) GetSourceByID(w http.ResponseWriter, r *http.Request) {
10099

101100
source, err := postgres.NewSourceRepo(a.A.DB, a.A.Cache).FindSourceByID(r.Context(), project.UID, chi.URLParam(r, "sourceID"))
102101
if err != nil {
103-
if err == datastore.ErrSourceNotFound {
102+
if errors.Is(err, datastore.ErrSourceNotFound) {
104103
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound))
105104
return
106105
}
@@ -162,7 +161,7 @@ func (a *PublicHandler) UpdateSource(w http.ResponseWriter, r *http.Request) {
162161

163162
source, err := postgres.NewSourceRepo(a.A.DB, a.A.Cache).FindSourceByID(r.Context(), project.UID, chi.URLParam(r, "sourceID"))
164163
if err != nil {
165-
if err == datastore.ErrSourceNotFound {
164+
if errors.Is(err, datastore.ErrSourceNotFound) {
166165
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound))
167166
return
168167
}
@@ -217,17 +216,17 @@ func (a *PublicHandler) UpdateSource(w http.ResponseWriter, r *http.Request) {
217216
// @Security ApiKeyAuth
218217
// @Router /v1/projects/{projectID}/sources/{sourceID} [delete]
219218
func (a *PublicHandler) DeleteSource(w http.ResponseWriter, r *http.Request) {
219+
sourceRepo := postgres.NewSourceRepo(a.A.DB, a.A.Cache)
220+
220221
project, err := a.retrieveProject(r)
221222
if err != nil {
222223
_ = render.Render(w, r, util.NewServiceErrResponse(err))
223224
return
224225
}
225226

226-
sourceRepo := postgres.NewSourceRepo(a.A.DB, a.A.Cache)
227-
228227
source, err := sourceRepo.FindSourceByID(r.Context(), project.UID, chi.URLParam(r, "sourceID"))
229228
if err != nil {
230-
if err == datastore.ErrSourceNotFound {
229+
if errors.Is(err, datastore.ErrSourceNotFound) {
231230
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound))
232231
return
233232
}
@@ -242,15 +241,6 @@ func (a *PublicHandler) DeleteSource(w http.ResponseWriter, r *http.Request) {
242241
return
243242
}
244243

245-
if source.Provider == datastore.TwitterSourceProvider {
246-
sourceCacheKey := convoy.SourceCacheKey.Get(source.MaskID).String()
247-
err = a.A.Cache.Delete(r.Context(), sourceCacheKey)
248-
if err != nil {
249-
_ = render.Render(w, r, util.NewErrorResponse("failed to delete source cache", http.StatusBadRequest))
250-
return
251-
}
252-
}
253-
254244
_ = render.Render(w, r, util.NewServerResponse("Source deleted successfully", nil, http.StatusOK))
255245
}
256246

api/server_suite_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"bytes"
88
"encoding/json"
99
"fmt"
10+
ncache "github.com/frain-dev/convoy/cache/noop"
1011
"io"
1112
"math/rand"
1213
"net/http"
@@ -111,10 +112,7 @@ func buildServer() *ApplicationHandler {
111112
logger = log.NewLogger(os.Stderr)
112113
logger.SetLevel(log.FatalLevel)
113114

114-
noopCache, err := cache.NewCache(getConfig().Redis)
115-
if err != nil {
116-
log.Fatal(fmt.Sprintf("failed to connect to redis: %v", err))
117-
}
115+
noopCache := ncache.NewNoopCache()
118116

119117
ah, _ := NewApplicationHandler(
120118
&types.APIOptions{

auth/realm/jwt/jwt_realm_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ func TestJwtRealm_Authenticate(t *testing.T) {
115115
},
116116
},
117117
dbFn: func(userRepo *mocks.MockUserRepository) {
118-
userRepo.EXPECT().FindUserByID(gomock.Any(), gomock.Any()).Return(nil, ErrInvalidToken)
119118
},
120119
want: nil,
121120
blacklist: true,

cache/redis/client.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,21 @@ import (
99
"github.com/go-redis/cache/v9"
1010
)
1111

12+
const cacheSize = 128000
13+
1214
type RedisCache struct {
1315
cache *cache.Cache
1416
}
1517

1618
func NewRedisCache(addresses []string) (*RedisCache, error) {
17-
rdb, err := rdb.NewClient(addresses)
19+
client, err := rdb.NewClient(addresses)
1820
if err != nil {
1921
return nil, err
2022
}
2123

2224
c := cache.New(&cache.Options{
23-
Redis: rdb.Client(),
25+
Redis: client.Client(),
26+
LocalCache: cache.NewTinyLFU(cacheSize, 1*time.Minute),
2427
})
2528

2629
r := &RedisCache{cache: c}

cmd/hooks/hooks.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"os"
89
"time"
910

@@ -359,6 +360,7 @@ func checkPendingMigrations(db database.Database) error {
359360
if err != nil {
360361
return err
361362
}
363+
defer closeWithError(rows)
362364

363365
for rows.Next() {
364366
var id ID
@@ -378,7 +380,7 @@ func checkPendingMigrations(db database.Database) error {
378380
return postgres.ErrPendingMigrationsFound
379381
}
380382

381-
return rows.Close()
383+
return nil
382384
}
383385

384386
func shouldCheckMigration(cmd *cobra.Command) bool {
@@ -454,3 +456,10 @@ func ensureDefaultUser(ctx context.Context, a *cli.App) error {
454456

455457
return nil
456458
}
459+
460+
func closeWithError(closer io.Closer) {
461+
err := closer.Close()
462+
if err != nil {
463+
fmt.Printf("%v, an error occurred while closing the client", err)
464+
}
465+
}

cmd/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func main() {
5555
var redisUsername string
5656
var redisPassword string
5757
var redisDatabase string
58+
var enableProfiling bool
5859

5960
var configFile string
6061

@@ -80,6 +81,7 @@ func main() {
8081
c.Flags().IntVar(&redisPort, "redis-port", 0, "Redis Port")
8182

8283
c.Flags().StringVar(&fflag, "feature-flag", "", "Enable feature flags (experimental)")
84+
c.Flags().BoolVar(&enableProfiling, "enable-profiling", false, "Enable profiling")
8385

8486
c.PersistentPreRunE(hooks.PreRun(app, db))
8587
c.PersistentPostRunE(hooks.PostRun(app, db))

cmd/server/server.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"errors"
5+
_ "net/http/pprof"
56
"time"
67

78
"github.com/frain-dev/convoy/internal/pkg/fflag"
@@ -131,7 +132,7 @@ func StartConvoyServer(a *cli.App) error {
131132
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
132133
}
133134

134-
fFlag := fflag.NewFFlag()
135+
flag := fflag.NewFFlag()
135136
if err != nil {
136137
a.Logger.WithError(err).Fatal("failed to create fflag controller")
137138
}
@@ -153,7 +154,7 @@ func StartConvoyServer(a *cli.App) error {
153154

154155
handler, err := api.NewApplicationHandler(
155156
&types.APIOptions{
156-
FFlag: fFlag,
157+
FFlag: flag,
157158
DB: a.DB,
158159
Queue: a.Queue,
159160
Logger: lo,
@@ -174,7 +175,7 @@ func StartConvoyServer(a *cli.App) error {
174175
// initialize scheduler
175176
s := worker.NewScheduler(a.Queue, lo)
176177

177-
// register daily analytic task
178+
// register tasks
178179
s.RegisterTask("58 23 * * *", convoy.ScheduleQueue, convoy.DeleteArchivedTasksProcessor)
179180
s.RegisterTask("30 * * * *", convoy.ScheduleQueue, convoy.MonitorTwitterSources)
180181
s.RegisterTask("0 0 * * *", convoy.ScheduleQueue, convoy.RetentionPolicies)

config/config.go

+2-11
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ var DefaultConfiguration = Configuration{
7575
},
7676
},
7777
ConsumerPoolSize: 100,
78+
EnableProfiling: false,
7879
}
7980

8081
type DatabaseConfiguration struct {
@@ -219,16 +220,6 @@ type NewRelicConfiguration struct {
219220
DistributedTracerEnabled bool `json:"distributed_tracer_enabled" envconfig:"CONVOY_NEWRELIC_DISTRIBUTED_TRACER_ENABLED"`
220221
}
221222

222-
type SearchConfiguration struct {
223-
Type SearchProvider `json:"type" envconfig:"CONVOY_SEARCH_TYPE"`
224-
Typesense TypesenseConfiguration `json:"typesense"`
225-
}
226-
227-
type TypesenseConfiguration struct {
228-
Host string `json:"host" envconfig:"CONVOY_TYPESENSE_HOST"`
229-
ApiKey string `json:"api_key" envconfig:"CONVOY_TYPESENSE_API_KEY"`
230-
}
231-
232223
type AnalyticsConfiguration struct {
233224
IsEnabled bool `json:"enabled" envconfig:"CONVOY_ANALYTICS_ENABLED"`
234225
}
@@ -320,11 +311,11 @@ type Configuration struct {
320311
Tracer TracerConfiguration `json:"tracer"`
321312
Host string `json:"host" envconfig:"CONVOY_HOST"`
322313
CustomDomainSuffix string `json:"custom_domain_suffix" envconfig:"CONVOY_CUSTOM_DOMAIN_SUFFIX"`
323-
Search SearchConfiguration `json:"search"`
324314
FeatureFlag FlagLevel `json:"feature_flag" envconfig:"CONVOY_FEATURE_FLAG"`
325315
Analytics AnalyticsConfiguration `json:"analytics"`
326316
StoragePolicy StoragePolicyConfiguration `json:"storage_policy"`
327317
ConsumerPoolSize int `json:"consumer_pool_size" envconfig:"CONVOY_CONSUMER_POOL_SIZE"`
318+
EnableProfiling bool `json:"enable_profiling" envconfig:"CONVOY_ENABLE_PROFILING"`
328319
}
329320

330321
// Get fetches the application configuration. LoadConfig must have been called

config/config_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ func TestLoadConfig(t *testing.T) {
118118
Host: "localhost",
119119
Port: 8379,
120120
},
121-
Search: DefaultConfiguration.Search,
122121
Server: ServerConfiguration{
123122
HTTP: HTTPServerConfiguration{
124123
Port: 80,
@@ -177,7 +176,6 @@ func TestLoadConfig(t *testing.T) {
177176
Port: 6379,
178177
Addresses: "localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005,localhost:7006",
179178
},
180-
Search: DefaultConfiguration.Search,
181179
Server: ServerConfiguration{
182180
HTTP: HTTPServerConfiguration{
183181
Port: 80,
@@ -235,7 +233,6 @@ func TestLoadConfig(t *testing.T) {
235233
Host: "localhost",
236234
Port: 8379,
237235
},
238-
Search: DefaultConfiguration.Search,
239236
Server: ServerConfiguration{
240237
HTTP: HTTPServerConfiguration{
241238
Port: 80,

database/listener/project_listener.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package listener
22

33
import (
4-
"context"
54
"encoding/json"
65
"github.com/frain-dev/convoy"
76
"github.com/frain-dev/convoy/datastore"
@@ -59,7 +58,7 @@ func (e *ProjectListener) run(eventType string, data interface{}, changelog inte
5958
Delay: 1 * time.Second,
6059
}
6160

62-
err = e.queue.Write(context.Background(), convoy.TokenizeSearchForProject, convoy.ScheduleQueue, job)
61+
err = e.queue.Write(convoy.TokenizeSearchForProject, convoy.ScheduleQueue, job)
6362
if err != nil {
6463
log.WithError(err).Error("an error occurred writing the job to the queue")
6564
return

0 commit comments

Comments
 (0)