Skip to content

Commit fcd8ea8

Browse files
authored
Add event_type to event_delivery table and use it to filter (frain-dev#1862)
1 parent cb94456 commit fcd8ea8

File tree

12 files changed

+56
-38
lines changed

12 files changed

+56
-38
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# vendor/
1616
.idea
1717

18-
/api/dashboard/ui/build/*
18+
/api/ui/build/*
1919
!/api/ui/build/go_test_stub.txt
2020
/data/mongo/
2121
.DS_Store

api/api.go

-9
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,7 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
183183
})
184184

185185
r.HandleFunc("/*", handler.RedirectToProjects)
186-
187186
})
188-
189187
})
190188

191189
// Dashboard API.
@@ -222,12 +220,10 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
222220
})
223221

224222
uiRouter.Route("/organisations", func(orgRouter chi.Router) {
225-
226223
orgRouter.Post("/", handler.CreateOrganisation)
227224
orgRouter.With(middleware.Pagination).Get("/", handler.GetOrganisationsPaged)
228225

229226
orgRouter.Route("/{orgID}", func(orgSubRouter chi.Router) {
230-
231227
orgSubRouter.Get("/", handler.GetOrganisation)
232228
orgSubRouter.Put("/", handler.UpdateOrganisation)
233229
orgSubRouter.Delete("/", handler.DeleteOrganisation)
@@ -253,7 +249,6 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
253249
projectRouter.Post("/", handler.CreateProject)
254250

255251
projectRouter.Route("/{projectID}", func(projectSubRouter chi.Router) {
256-
257252
projectSubRouter.Get("/", handler.GetProject)
258253
projectSubRouter.Put("/", handler.UpdateProject)
259254
projectSubRouter.Delete("/", handler.DeleteProject)
@@ -274,12 +269,10 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
274269
e.Put("/toggle_status", handler.ToggleEndpointStatus)
275270
e.Put("/expire_secret", handler.ExpireSecret)
276271
e.Put("/pause", handler.PauseEndpoint)
277-
278272
})
279273
})
280274

281275
projectSubRouter.Route("/events", func(eventRouter chi.Router) {
282-
283276
eventRouter.Post("/", handler.CreateEndpointEvent)
284277
eventRouter.Post("/fanout", handler.CreateEndpointFanoutEvent)
285278
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
@@ -320,7 +313,6 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
320313
})
321314

322315
projectSubRouter.Route("/sources", func(sourceRouter chi.Router) {
323-
324316
sourceRouter.Post("/", handler.CreateSource)
325317
sourceRouter.Get("/{sourceID}", handler.GetSource)
326318
sourceRouter.With(middleware.Pagination).Get("/", handler.LoadSourcesPaged)
@@ -351,7 +343,6 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
351343
})
352344
})
353345
})
354-
355346
})
356347

357348
uiRouter.Route("/configuration", func(configRouter chi.Router) {

database/postgres/event_delivery.go

+19-21
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ var (
3737

3838
const (
3939
createEventDelivery = `
40-
INSERT INTO convoy.event_deliveries (id,project_id,event_id,endpoint_id,device_id,subscription_id,headers,attempts,status,metadata,cli_metadata,description,url_query_params,idempotency_key,created_at,updated_at)
41-
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16);
40+
INSERT INTO convoy.event_deliveries (id,project_id,event_id,endpoint_id,device_id,subscription_id,headers,attempts,status,metadata,cli_metadata,description,url_query_params,idempotency_key,event_type,created_at,updated_at)
41+
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17);
4242
`
4343

4444
baseFetchEventDelivery = `
@@ -48,6 +48,7 @@ const (
4848
COALESCE(ed.url_query_params, '') AS url_query_params,
4949
COALESCE(ed.idempotency_key, '') AS idempotency_key,
5050
ed.description,ed.created_at,ed.updated_at,
51+
COALESCE(ed.event_type,'') AS "event_type",
5152
COALESCE(ed.device_id,'') AS "device_id",
5253
COALESCE(ed.endpoint_id,'') AS "endpoint_id",
5354
COALESCE(ep.id, '') AS "endpoint_metadata.id",
@@ -84,9 +85,7 @@ const (
8485
LIMIT :limit
8586
)
8687
87-
SELECT * FROM event_deliveries
88-
WHERE ("event_metadata.event_type" = :event_type OR :event_type = '')
89-
ORDER BY id %s
88+
SELECT * FROM event_deliveries ORDER BY id %s
9089
`
9190

9291
baseEventDeliveryPagedBackward = `
@@ -99,32 +98,27 @@ const (
9998
LIMIT :limit
10099
)
101100
102-
SELECT * FROM event_deliveries
103-
WHERE ("event_metadata.event_type" = :event_type OR :event_type = '')
104-
ORDER BY id %s
101+
SELECT * FROM event_deliveries ORDER BY id %s
105102
`
106103

107104
fetchEventDeliveryByID = baseFetchEventDelivery + ` AND ed.id = $1 AND ed.project_id = $2`
108105

109106
baseEventDeliveryFilter = ` AND (ed.project_id = :project_id OR :project_id = '')
110107
AND (ed.event_id = :event_id OR :event_id = '')
108+
AND (ed.event_type = :event_type OR :event_type = '')
111109
AND ed.created_at >= :start_date
112110
AND ed.created_at <= :end_date
113111
AND ed.deleted_at IS NULL`
114112

115113
countPrevEventDeliveries = `
116-
WITH event_deliveries AS (
117-
SELECT ed.id AS "id", ev.event_type AS "event_type"
118-
FROM convoy.event_deliveries ed
119-
LEFT JOIN convoy.events ev ON ed.event_id = ev.id
120-
WHERE ed.deleted_at IS NULL
121-
%s
122-
AND ed.id > :cursor
123-
GROUP BY ed.id, ev.id
124-
ORDER BY ed.id %s
125-
)
126-
127-
SELECT COUNT(DISTINCT("id")) AS count FROM event_deliveries WHERE ("event_type" = :event_type OR :event_type = '')
114+
SELECT COUNT(DISTINCT(ed.id))
115+
FROM convoy.event_deliveries ed
116+
LEFT JOIN convoy.events ev ON ed.event_id = ev.id
117+
WHERE ed.deleted_at IS NULL
118+
%s
119+
AND ed.id > :cursor
120+
GROUP BY ed.id, ev.id
121+
ORDER BY ed.id %s
128122
`
129123

130124
loadEventDeliveriesIntervals = `
@@ -153,6 +147,7 @@ const (
153147
COALESCE(ed.idempotency_key, '') AS idempotency_key,
154148
COALESCE(url_query_params, '') AS url_query_params,
155149
description,created_at,updated_at,
150+
COALESCE(event_type,'') AS "event_type",
156151
COALESCE(device_id,'') AS "device_id",
157152
COALESCE(endpoint_id,'') AS "endpoint_id"
158153
FROM convoy.event_deliveries ed
@@ -165,6 +160,7 @@ const (
165160
COALESCE(idempotency_key, '') AS idempotency_key,
166161
COALESCE(url_query_params, '') AS url_query_params,
167162
description,created_at,updated_at,
163+
COALESCE(event_type,'') AS "event_type",
168164
COALESCE(device_id,'') AS "device_id"
169165
FROM convoy.event_deliveries
170166
WHERE status=$1 AND project_id = $2 AND device_id = $3
@@ -217,7 +213,7 @@ func (e *eventDeliveryRepo) CreateEventDelivery(ctx context.Context, delivery *d
217213
ctx, createEventDelivery, delivery.UID, delivery.ProjectID,
218214
delivery.EventID, endpointID, deviceID,
219215
delivery.SubscriptionID, delivery.Headers, delivery.DeliveryAttempts, delivery.Status,
220-
delivery.Metadata, delivery.CLIMetadata, delivery.Description, delivery.URLQueryParams, delivery.IdempotencyKey,
216+
delivery.Metadata, delivery.CLIMetadata, delivery.Description, delivery.URLQueryParams, delivery.IdempotencyKey, delivery.EventType,
221217
delivery.CreatedAt, delivery.UpdatedAt,
222218
)
223219
if err != nil {
@@ -580,6 +576,7 @@ func (e *eventDeliveryRepo) LoadEventDeliveriesPaged(ctx context.Context, projec
580576
Headers: ev.Headers,
581577
URLQueryParams: ev.URLQueryParams,
582578
Latency: ev.Latency,
579+
EventType: ev.EventType,
583580
Endpoint: &datastore.Endpoint{
584581
UID: ev.Endpoint.UID.ValueOrZero(),
585582
ProjectID: ev.Endpoint.ProjectID.ValueOrZero(),
@@ -824,6 +821,7 @@ type EventDeliveryPaginated struct {
824821
URLQueryParams string `json:"url_query_params" db:"url_query_params"`
825822
IdempotencyKey string `json:"idempotency_key" db:"idempotency_key"`
826823
Latency string `json:"latency" db:"latency"`
824+
EventType datastore.EventType `json:"event_type,omitempty" db:"event_type"`
827825

828826
Endpoint *EndpointMetadata `json:"endpoint_metadata,omitempty" db:"endpoint_metadata"`
829827
Event *EventMetadata `json:"event_metadata,omitempty" db:"event_metadata"`

database/postgres/event_delivery_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func generateEventDelivery(project *datastore.Project, endpoint *datastore.Endpo
6161
EndpointID: endpoint.UID,
6262
DeviceID: device.UID,
6363
SubscriptionID: sub.UID,
64+
EventType: event.EventType,
6465
Headers: httpheader.HTTPHeader{"X-sig": []string{"3787 fmmfbf"}},
6566
DeliveryAttempts: []datastore.DeliveryAttempt{
6667
{UID: ulid.Make().String()},

datastore/models.go

+1
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ type EventDelivery struct {
892892
URLQueryParams string `json:"url_query_params" db:"url_query_params"`
893893
IdempotencyKey string `json:"idempotency_key" db:"idempotency_key"`
894894
Latency string `json:"latency" db:"latency"`
895+
EventType EventType `json:"event_type,omitempty" db:"event_type"`
895896

896897
Endpoint *Endpoint `json:"endpoint_metadata,omitempty" db:"endpoint_metadata"`
897898
Event *Event `json:"event_metadata,omitempty" db:"event_metadata"`

docs/docs.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
// Code generated by swaggo/swag at 2024-01-03 08:49:59.960617 -0800 PST m=+2.346577667. DO NOT EDIT
1+
// Package docs GENERATED BY SWAG; DO NOT EDIT
2+
// This file was generated by swaggo/swag at
3+
// 2024-01-08 10:38:08.928262 +0000 GMT m=+2.574129084
24
package docs
35

46
import "github.com/swaggo/swag"
@@ -6656,6 +6658,9 @@ const docTemplate = `{
66566658
"event_metadata": {
66576659
"$ref": "#/definitions/datastore.Event"
66586660
},
6661+
"event_type": {
6662+
"type": "string"
6663+
},
66596664
"headers": {
66606665
"$ref": "#/definitions/httpheader.HTTPHeader"
66616666
},

docs/swagger.json

+3
Original file line numberDiff line numberDiff line change
@@ -6653,6 +6653,9 @@
66536653
"event_metadata": {
66546654
"$ref": "#/definitions/datastore.Event"
66556655
},
6656+
"event_type": {
6657+
"type": "string"
6658+
},
66566659
"headers": {
66576660
"$ref": "#/definitions/httpheader.HTTPHeader"
66586661
},

docs/swagger.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,8 @@ definitions:
10381038
type: string
10391039
event_metadata:
10401040
$ref: '#/definitions/datastore.Event'
1041+
event_type:
1042+
type: string
10411043
headers:
10421044
$ref: '#/definitions/httpheader.HTTPHeader'
10431045
idempotency_key:

docs/v3/openapi3.json

+3
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,9 @@
15011501
"event_metadata": {
15021502
"$ref": "#/components/schemas/datastore.Event"
15031503
},
1504+
"event_type": {
1505+
"type": "string"
1506+
},
15041507
"headers": {
15051508
"$ref": "#/components/schemas/httpheader.HTTPHeader"
15061509
},

docs/v3/openapi3.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,8 @@ components:
10391039
type: string
10401040
event_metadata:
10411041
$ref: '#/components/schemas/datastore.Event'
1042+
event_type:
1043+
type: string
10421044
headers:
10431045
$ref: '#/components/schemas/httpheader.HTTPHeader'
10441046
idempotency_key:

sql/1704372039.sql

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- +migrate Up
2+
ALTER TABLE convoy.event_deliveries ADD COLUMN IF NOT EXISTS event_type TEXT;
3+
CREATE INDEX IF NOT EXISTS event_deliveries_event_type_1 ON convoy.event_deliveries(event_type);
4+
5+
-- +migrate Down
6+
ALTER TABLE convoy.event_deliveries DROP COLUMN IF EXISTS event_type;
7+
DROP INDEX IF EXISTS convoy.event_deliveries_event_type_1;

worker/task/process_event_creation.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ type CreateEvent struct {
4040
func ProcessEventCreation(
4141
endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository,
4242
eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer,
43-
subRepo datastore.SubscriptionRepository, deviceRepo datastore.DeviceRepository) func(context.Context, *asynq.Task) error {
43+
subRepo datastore.SubscriptionRepository, deviceRepo datastore.DeviceRepository,
44+
) func(context.Context, *asynq.Task) error {
4445
return func(ctx context.Context, t *asynq.Task) error {
4546
var createEvent CreateEvent
4647
var event *datastore.Event
@@ -164,6 +165,7 @@ func ProcessEventCreation(
164165
eventDelivery := &datastore.EventDelivery{
165166
UID: ulid.Make().String(),
166167
SubscriptionID: s.UID,
168+
EventType: event.EventType,
167169
Metadata: metadata,
168170
ProjectID: project.UID,
169171
EventID: event.UID,
@@ -227,7 +229,8 @@ func ProcessEventCreation(
227229
}
228230

229231
func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepository,
230-
subRepo datastore.SubscriptionRepository, project *datastore.Project, event *datastore.Event, shouldCreateSubscription bool) ([]datastore.Subscription, error) {
232+
subRepo datastore.SubscriptionRepository, project *datastore.Project, event *datastore.Event, shouldCreateSubscription bool,
233+
) ([]datastore.Subscription, error) {
231234
var subscriptions []datastore.Subscription
232235
var err error
233236

@@ -324,8 +327,8 @@ func matchSubscriptions(eventType string, subscriptions []datastore.Subscription
324327
}
325328

326329
func getEventDeliveryStatus(ctx context.Context, subscription *datastore.Subscription, endpoint *datastore.Endpoint,
327-
deviceRepo datastore.DeviceRepository) datastore.EventDeliveryStatus {
328-
330+
deviceRepo datastore.DeviceRepository,
331+
) datastore.EventDeliveryStatus {
329332
switch subscription.Type {
330333
case datastore.SubscriptionTypeAPI:
331334
if endpoint.Status != datastore.ActiveEndpointStatus {
@@ -367,7 +370,8 @@ func generateSubscription(project *datastore.Project, endpoint *datastore.Endpoi
367370
}
368371

369372
func buildEvent(ctx context.Context, eventRepo datastore.EventRepository, endpointRepo datastore.EndpointRepository,
370-
eventParams *CreateEventTaskParams, project *datastore.Project) (*datastore.Event, error) {
373+
eventParams *CreateEventTaskParams, project *datastore.Project,
374+
) (*datastore.Event, error) {
371375
var isDuplicate bool
372376
if !util.IsStringEmpty(eventParams.IdempotencyKey) {
373377
events, err := eventRepo.FindEventsByIdempotencyKey(ctx, project.UID, eventParams.IdempotencyKey)
@@ -426,7 +430,8 @@ func buildEvent(ctx context.Context, eventRepo datastore.EventRepository, endpoi
426430
}
427431

428432
func findEndpoints(ctx context.Context, endpointRepo datastore.EndpointRepository, newMessage *CreateEventTaskParams,
429-
project *datastore.Project) ([]datastore.Endpoint, error) {
433+
project *datastore.Project,
434+
) ([]datastore.Endpoint, error) {
430435
var endpoints []datastore.Endpoint
431436

432437
if !util.IsStringEmpty(newMessage.EndpointID) {

0 commit comments

Comments
 (0)