Skip to content

Commit c0c9656

Browse files
jirevweFausto Alonso
and
Fausto Alonso
authored
Performance tweaks (frain-dev#1765) (frain-dev#1783)
* Performance tweaks (frain-dev#1765) * Adding cache for the source and project configuration in the ingestion handler * feat: Introduced caching to the repository layer for better performance * Refactor `Find` methods in `endpoint` and `subscription` to use cache. * Introduce fallback for cache and reduce direct cache usage outside repos * Refactor event processing and creation logic * chore: update tests with cache. remove searcher package * Add error handling for cache set operations in project.go and subscription.go * chore: re-add stub file * Removed unused msgpack import in source_loader.go * Update docker Compose configurations and tweak application code * Refactor queue operations to use context * Update Write() function signatures in tests. * Remove OwnerID from process event creation --------- Co-authored-by: Fausto Alonso <falonso@poligonocapital.com.br>
1 parent 994cbfb commit c0c9656

File tree

148 files changed

+2308
-3216
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

148 files changed

+2308
-3216
lines changed

analytics/analytics.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/base64"
66
"errors"
77
"fmt"
8+
"github.com/frain-dev/convoy/cache"
89
"math"
910
"time"
1011

@@ -92,13 +93,13 @@ func newAnalytics(Repo *Repo, cfg config.Configuration) (*Analytics, error) {
9293
return a, nil
9394
}
9495

95-
func TrackDailyAnalytics(db database.Database, cfg config.Configuration, rd *rdb.Redis) func(context.Context, *asynq.Task) error {
96+
func TrackDailyAnalytics(db database.Database, cache cache.Cache, cfg config.Configuration, rd *rdb.Redis) func(context.Context, *asynq.Task) error {
9697
repo := &Repo{
9798
ConfigRepo: postgres.NewConfigRepo(db),
98-
EventRepo: postgres.NewEventRepo(db),
99-
projectRepo: postgres.NewProjectRepo(db),
100-
OrgRepo: postgres.NewOrgRepo(db),
101-
UserRepo: postgres.NewUserRepo(db),
99+
EventRepo: postgres.NewEventRepo(db, cache),
100+
projectRepo: postgres.NewProjectRepo(db, cache),
101+
OrgRepo: postgres.NewOrgRepo(db, cache),
102+
UserRepo: postgres.NewUserRepo(db, cache),
102103
}
103104

104105
// Create a pool with go-redis

api/api.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (a *ApplicationHandler) RegisterPolicy() error {
111111
err = a.A.Authz.RegisterPolicy(func() authz.Policy {
112112
po := &policies.OrganisationPolicy{
113113
BasePolicy: authz.NewBasePolicy(),
114-
OrganisationMemberRepo: postgres.NewOrgMemberRepo(a.A.DB),
114+
OrganisationMemberRepo: postgres.NewOrgMemberRepo(a.A.DB, a.A.Cache),
115115
}
116116

117117
po.SetRule("manage", authz.RuleFunc(po.Manage))
@@ -126,8 +126,8 @@ func (a *ApplicationHandler) RegisterPolicy() error {
126126
err = a.A.Authz.RegisterPolicy(func() authz.Policy {
127127
po := &policies.ProjectPolicy{
128128
BasePolicy: authz.NewBasePolicy(),
129-
OrganisationRepo: postgres.NewOrgRepo(a.A.DB),
130-
OrganisationMemberRepo: postgres.NewOrgMemberRepo(a.A.DB),
129+
OrganisationRepo: postgres.NewOrgRepo(a.A.DB, a.A.Cache),
130+
OrganisationMemberRepo: postgres.NewOrgMemberRepo(a.A.DB, a.A.Cache),
131131
}
132132

133133
po.SetRule("manage", authz.RuleFunc(po.Manage))

api/dashboard/auth.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ func (a *DashboardHandler) LoginUser(w http.ResponseWriter, r *http.Request) {
2929
return
3030
}
3131

32-
config, err := config.Get()
32+
configuration, err := config.Get()
3333
if err != nil {
3434
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
3535
return
3636
}
3737

3838
lu := services.LoginUserService{
39-
UserRepo: postgres.NewUserRepo(a.A.DB),
39+
UserRepo: postgres.NewUserRepo(a.A.DB, a.A.Cache),
4040
Cache: a.A.Cache,
41-
JWT: jwt.NewJwt(&config.Auth.Jwt, a.A.Cache),
41+
JWT: jwt.NewJwt(&configuration.Auth.Jwt, a.A.Cache),
4242
Data: &newUser,
4343
}
4444

@@ -68,15 +68,15 @@ func (a *DashboardHandler) RefreshToken(w http.ResponseWriter, r *http.Request)
6868
return
6969
}
7070

71-
config, err := config.Get()
71+
configuration, err := config.Get()
7272
if err != nil {
7373
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
7474
return
7575
}
7676

7777
rf := services.RefreshTokenService{
78-
UserRepo: postgres.NewUserRepo(a.A.DB),
79-
JWT: jwt.NewJwt(&config.Auth.Jwt, a.A.Cache),
78+
UserRepo: postgres.NewUserRepo(a.A.DB, a.A.Cache),
79+
JWT: jwt.NewJwt(&configuration.Auth.Jwt, a.A.Cache),
8080
Data: &refreshToken,
8181
}
8282

@@ -96,15 +96,15 @@ func (a *DashboardHandler) LogoutUser(w http.ResponseWriter, r *http.Request) {
9696
return
9797
}
9898

99-
config, err := config.Get()
99+
configuration, err := config.Get()
100100
if err != nil {
101101
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
102102
return
103103
}
104104

105105
lg := services.LogoutUserService{
106-
UserRepo: postgres.NewUserRepo(a.A.DB),
107-
JWT: jwt.NewJwt(&config.Auth.Jwt, a.A.Cache),
106+
UserRepo: postgres.NewUserRepo(a.A.DB, a.A.Cache),
107+
JWT: jwt.NewJwt(&configuration.Auth.Jwt, a.A.Cache),
108108
Token: auth.Token,
109109
}
110110

api/dashboard/dashboard.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (a *DashboardHandler) GetDashboardSummary(w http.ResponseWriter, r *http.Re
9797
return
9898
}
9999

100-
apps, err := postgres.NewEndpointRepo(a.A.DB).CountProjectEndpoints(r.Context(), project.UID)
100+
apps, err := postgres.NewEndpointRepo(a.A.DB, a.A.Cache).CountProjectEndpoints(r.Context(), project.UID)
101101
if err != nil {
102102
log.WithError(err).Error("failed to count project endpoints")
103103
_ = render.Render(w, r, util.NewErrorResponse("an error occurred while searching apps", http.StatusInternalServerError))
@@ -130,7 +130,7 @@ func (a *DashboardHandler) GetDashboardSummary(w http.ResponseWriter, r *http.Re
130130
func (a *DashboardHandler) computeDashboardMessages(ctx context.Context, projectID string, searchParams datastore.SearchParams, period datastore.Period) (uint64, []datastore.EventInterval, error) {
131131
var messagesSent uint64
132132

133-
eventDeliveryRepo := postgres.NewEventDeliveryRepo(a.A.DB)
133+
eventDeliveryRepo := postgres.NewEventDeliveryRepo(a.A.DB, a.A.Cache)
134134
messages, err := eventDeliveryRepo.LoadEventDeliveriesIntervals(ctx, projectID, searchParams, period)
135135
if err != nil {
136136
log.FromContext(ctx).WithError(err).Error("failed to load message intervals - ")
@@ -151,7 +151,7 @@ func (a *DashboardHandler) retrieveOrganisation(r *http.Request) (*datastore.Org
151151
orgID = r.URL.Query().Get("orgID")
152152
}
153153

154-
orgRepo := postgres.NewOrgRepo(a.A.DB)
154+
orgRepo := postgres.NewOrgRepo(a.A.DB, a.A.Cache)
155155
return orgRepo.FetchOrganisationByID(r.Context(), orgID)
156156
}
157157

@@ -176,19 +176,25 @@ func (a *DashboardHandler) retrieveMembership(r *http.Request) (*datastore.Organ
176176
return &datastore.OrganisationMember{}, err
177177
}
178178

179-
orgMemberRepo := postgres.NewOrgMemberRepo(a.A.DB)
179+
orgMemberRepo := postgres.NewOrgMemberRepo(a.A.DB, a.A.Cache)
180180
return orgMemberRepo.FetchOrganisationMemberByUserID(r.Context(), user.UID, org.UID)
181181
}
182182

183183
func (a *DashboardHandler) retrieveProject(r *http.Request) (*datastore.Project, error) {
184184
projectID := chi.URLParam(r, "projectID")
185185

186186
if util.IsStringEmpty(projectID) {
187-
return &datastore.Project{}, errors.New("Project ID not present in request")
187+
return nil, errors.New("project id not present in request")
188188
}
189189

190-
projectRepo := postgres.NewProjectRepo(a.A.DB)
191-
return projectRepo.FetchProjectByID(r.Context(), projectID)
190+
var project *datastore.Project
191+
projectRepo := postgres.NewProjectRepo(a.A.DB, a.A.Cache)
192+
project, err := projectRepo.FetchProjectByID(r.Context(), projectID)
193+
if err != nil {
194+
return nil, err
195+
}
196+
197+
return project, nil
192198
}
193199

194200
func (a *DashboardHandler) retrieveHost() (string, error) {

api/dashboard/endpoint.go

+12-19
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package dashboard
22

33
import (
4-
"github.com/frain-dev/convoy"
54
"github.com/frain-dev/convoy/pkg/log"
65
"net/http"
76

@@ -48,9 +47,9 @@ func (a *DashboardHandler) CreateEndpoint(w http.ResponseWriter, r *http.Request
4847

4948
ce := services.CreateEndpointService{
5049
Cache: a.A.Cache,
51-
EndpointRepo: postgres.NewEndpointRepo(a.A.DB),
52-
ProjectRepo: postgres.NewProjectRepo(a.A.DB),
53-
PortalLinkRepo: postgres.NewPortalLinkRepo(a.A.DB),
50+
EndpointRepo: postgres.NewEndpointRepo(a.A.DB, a.A.Cache),
51+
ProjectRepo: postgres.NewProjectRepo(a.A.DB, a.A.Cache),
52+
PortalLinkRepo: postgres.NewPortalLinkRepo(a.A.DB, a.A.Cache),
5453
E: e,
5554
ProjectID: project.UID,
5655
}
@@ -85,7 +84,7 @@ func (a *DashboardHandler) GetEndpoints(w http.ResponseWriter, r *http.Request)
8584
}
8685

8786
data := q.Transform(r)
88-
endpoints, paginationData, err := postgres.NewEndpointRepo(a.A.DB).LoadEndpointsPaged(r.Context(), project.UID, data.Filter, data.Pageable)
87+
endpoints, paginationData, err := postgres.NewEndpointRepo(a.A.DB, a.A.Cache).LoadEndpointsPaged(r.Context(), project.UID, data.Filter, data.Pageable)
8988
if err != nil {
9089
a.A.Logger.WithError(err).Error("failed to load endpoints")
9190
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
@@ -133,8 +132,8 @@ func (a *DashboardHandler) UpdateEndpoint(w http.ResponseWriter, r *http.Request
133132

134133
ce := services.UpdateEndpointService{
135134
Cache: a.A.Cache,
136-
EndpointRepo: postgres.NewEndpointRepo(a.A.DB),
137-
ProjectRepo: postgres.NewProjectRepo(a.A.DB),
135+
EndpointRepo: postgres.NewEndpointRepo(a.A.DB, a.A.Cache),
136+
ProjectRepo: postgres.NewProjectRepo(a.A.DB, a.A.Cache),
138137
E: e,
139138
Endpoint: endpoint,
140139
Project: project,
@@ -168,19 +167,13 @@ func (a *DashboardHandler) DeleteEndpoint(w http.ResponseWriter, r *http.Request
168167
return
169168
}
170169

171-
err = postgres.NewEndpointRepo(a.A.DB).DeleteEndpoint(r.Context(), endpoint, project.UID)
170+
err = postgres.NewEndpointRepo(a.A.DB, a.A.Cache).DeleteEndpoint(r.Context(), endpoint, project.UID)
172171
if err != nil {
173172
log.WithError(err).Error("failed to delete endpoint")
174173
_ = render.Render(w, r, util.NewErrorResponse("failed to delete endpoint", http.StatusBadRequest))
175174
return
176175
}
177176

178-
endpointCacheKey := convoy.EndpointsCacheKey.Get(endpoint.UID).String()
179-
err = a.A.Cache.Delete(r.Context(), endpointCacheKey)
180-
if err != nil {
181-
a.A.Logger.WithError(err).Error("failed to delete endpoint cache")
182-
}
183-
184177
_ = render.Render(w, r, util.NewServerResponse("Endpoint deleted successfully", nil, http.StatusOK))
185178
}
186179

@@ -212,8 +205,8 @@ func (a *DashboardHandler) ExpireSecret(w http.ResponseWriter, r *http.Request)
212205
xs := services.ExpireSecretService{
213206
Queuer: a.A.Queue,
214207
Cache: a.A.Cache,
215-
EndpointRepo: postgres.NewEndpointRepo(a.A.DB),
216-
ProjectRepo: postgres.NewProjectRepo(a.A.DB),
208+
EndpointRepo: postgres.NewEndpointRepo(a.A.DB, a.A.Cache),
209+
ProjectRepo: postgres.NewProjectRepo(a.A.DB, a.A.Cache),
217210
S: e,
218211
Endpoint: endpoint,
219212
Project: project,
@@ -243,7 +236,7 @@ func (a *DashboardHandler) ToggleEndpointStatus(w http.ResponseWriter, r *http.R
243236
}
244237

245238
te := services.ToggleEndpointStatusService{
246-
EndpointRepo: postgres.NewEndpointRepo(a.A.DB),
239+
EndpointRepo: postgres.NewEndpointRepo(a.A.DB, a.A.Cache),
247240
ProjectID: project.UID,
248241
EndpointId: chi.URLParam(r, "endpointID"),
249242
}
@@ -271,7 +264,7 @@ func (a *DashboardHandler) PauseEndpoint(w http.ResponseWriter, r *http.Request)
271264
}
272265

273266
ps := services.PauseEndpointService{
274-
EndpointRepo: postgres.NewEndpointRepo(a.A.DB),
267+
EndpointRepo: postgres.NewEndpointRepo(a.A.DB, a.A.Cache),
275268
ProjectID: project.UID,
276269
EndpointId: chi.URLParam(r, "endpointID"),
277270
}
@@ -292,7 +285,7 @@ func (a *DashboardHandler) retrieveEndpoint(r *http.Request) (*datastore.Endpoin
292285
return &datastore.Endpoint{}, err
293286
}
294287

295-
endpointRepo := postgres.NewEndpointRepo(a.A.DB)
288+
endpointRepo := postgres.NewEndpointRepo(a.A.DB, a.A.Cache)
296289
endpointID := chi.URLParam(r, "endpointID")
297290
return endpointRepo.FindEndpointByID(r.Context(), endpointID, project.UID)
298291
}

0 commit comments

Comments
 (0)