Skip to content

Commit 2dec36a

Browse files
authored
Add broadcast event api (frain-dev#1913)
* feat: add CreateBroadcastEventProcessor * feat: - add writeEventDeliveriesToQueue - add ProcessBroadcastEventCreation * feat: - remove create_event.go & create_event_test.go - add CreateBroadcastEventService * feat: - register task.ProcessBroadcastEventCreation * feat: - add FindSubscriptionByEventType - add Test_FindSubscriptionByEventType * feat: - add CreateBraodcastEvent api handler * fix: remove q declaration * fix: change args build in FindSubscriptionByEventType * fix: change dynamic to broadcast in err msg * fix: call closeFn in defer * fix: go mod * fix: remove toolchain * fix: fetch all subscriptions * fix: remove FindSubscriptionByEventType * fix: fix CreateBroadcastEvent typo * fix: go generate * fix: fix TestProcessBroadcastEventCreation * fix: return err * fix: remove json unmarshalling * fix: fix msgpack encoding * fix: revert go version to 1.20 * fix: revert go version to 1.20
1 parent 0aa2a0b commit 2dec36a

24 files changed

+976
-786
lines changed

api/api.go

+1
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
150150
// TODO(all): should the InstrumentPath change?
151151
eventRouter.With(middleware.InstrumentPath("/events")).Post("/", handler.CreateEndpointEvent)
152152
eventRouter.Post("/fanout", handler.CreateEndpointFanoutEvent)
153+
eventRouter.Post("/broadcast", handler.CreateBroadcastEvent)
153154
eventRouter.Post("/dynamic", handler.CreateDynamicEvent)
154155
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
155156
eventRouter.Post("/batchreplay", handler.BatchReplayEvents)

api/handlers/event.go

+48
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,54 @@ func (h *Handler) CreateEndpointEvent(w http.ResponseWriter, r *http.Request) {
101101
_ = render.Render(w, r, util.NewServerResponse("Event queued successfully", 200, http.StatusCreated))
102102
}
103103

104+
// CreateBroadcastEvent
105+
//
106+
// @Summary Create an event
107+
// @Description This endpoint creates an endpoint event
108+
// @Tags Events
109+
// @Accept json
110+
// @Produce json
111+
// @Param projectID path string true "Project ID"
112+
// @Param event body models.BroadcastEvent true "Broadcast Event Details"
113+
// @Success 200 {object} util.ServerResponse{data=models.EventResponse}
114+
// @Failure 400,401,404 {object} util.ServerResponse{data=Stub}
115+
// @Security ApiKeyAuth
116+
// @Router /v1/projects/{projectID}/events [post]
117+
func (h *Handler) CreateBroadcastEvent(w http.ResponseWriter, r *http.Request) {
118+
var newMessage models.BroadcastEvent
119+
err := util.ReadJSON(r, &newMessage)
120+
if err != nil {
121+
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
122+
return
123+
}
124+
125+
err = newMessage.Validate()
126+
if err != nil {
127+
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
128+
return
129+
}
130+
131+
project, err := h.retrieveProject(r)
132+
if err != nil {
133+
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
134+
return
135+
}
136+
137+
cbe := services.CreateBroadcastEventService{
138+
Queue: h.A.Queue,
139+
BroadcastEvent: &newMessage,
140+
Project: project,
141+
}
142+
143+
err = cbe.Run(r.Context())
144+
if err != nil {
145+
_ = render.Render(w, r, util.NewServiceErrResponse(err))
146+
return
147+
}
148+
149+
_ = render.Render(w, r, util.NewServerResponse("Broadcast event created successfully", nil, http.StatusCreated))
150+
}
151+
104152
// CreateEndpointFanoutEvent
105153
//
106154
// @Summary Fan out an event

api/models/event.go

+15
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ func (ds *DynamicEventStub) Validate() error {
9494
return util.Validate(ds)
9595
}
9696

97+
type BroadcastEvent struct {
98+
EventType string `json:"event_type" valid:"required~please provide an event type"`
99+
ProjectID string `json:"project_id" swaggerignore:"true"`
100+
101+
// Data is an arbitrary JSON value that gets sent as the body of the
102+
// webhook to the endpoints
103+
Data json.RawMessage `json:"data" valid:"required~please provide your data"`
104+
CustomHeaders map[string]string `json:"custom_headers"`
105+
IdempotencyKey string `json:"idempotency_key"`
106+
}
107+
108+
func (bs *BroadcastEvent) Validate() error {
109+
return util.Validate(bs)
110+
}
111+
97112
type FanoutEvent struct {
98113
OwnerID string `json:"owner_id" valid:"required~please provide an owner id"`
99114
EventType string `json:"event_type" valid:"required~please provide an event type"`

api/public_integration_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,36 @@ func (s *PublicEventIntegrationTestSuite) Test_CreateDynamicEvent() {
700700
require.Equal(s.T(), expectedStatusCode, w.Code)
701701
}
702702

703+
func (s *PublicEventIntegrationTestSuite) Test_CreateBroadcastEvent() {
704+
endpointID := ulid.Make().String()
705+
expectedStatusCode := http.StatusCreated
706+
707+
// Just Before.
708+
endpoint, _ := testdb.SeedEndpoint(s.ConvoyApp.A.DB, s.DefaultProject, endpointID, "", "", false, datastore.ActiveEndpointStatus)
709+
710+
_, _ = testdb.SeedSubscription(s.ConvoyApp.A.DB, s.DefaultProject, ulid.Make().String(), datastore.OutgoingProject, &datastore.Source{}, endpoint, &datastore.RetryConfiguration{}, &datastore.AlertConfiguration{}, &datastore.FilterConfiguration{
711+
EventTypes: []string{"some.event"},
712+
Filter: datastore.FilterSchema{Headers: datastore.M{}, Body: datastore.M{}},
713+
})
714+
715+
bodyStr := `{
716+
"event_type":"*",
717+
"data": {"name":"daniel"},
718+
"idempotency_key": "idem-key-1"
719+
}
720+
}`
721+
body := serialize(bodyStr, endpointID)
722+
723+
url := fmt.Sprintf("/api/v1/projects/%s/events/broadcast", s.DefaultProject.UID)
724+
req := createRequest(http.MethodPost, url, s.APIKey, body)
725+
w := httptest.NewRecorder()
726+
// Act.
727+
s.Router.ServeHTTP(w, req)
728+
729+
// Assert.
730+
require.Equal(s.T(), expectedStatusCode, w.Code)
731+
}
732+
703733
func (s *PublicEventIntegrationTestSuite) Test_CreateFanoutEvent_MultipleEndpoints() {
704734
endpointID := ulid.Make().String()
705735
expectedStatusCode := http.StatusCreated

cmd/worker/worker.go

+9
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ func AddWorkerCommand(a *cli.App) *cobra.Command {
117117
subRepo,
118118
deviceRepo))
119119

120+
consumer.RegisterHandlers(convoy.CreateBroadcastEventProcessor, task.ProcessBroadcastEventCreation(
121+
endpointRepo,
122+
eventRepo,
123+
projectRepo,
124+
eventDeliveryRepo,
125+
a.Queue,
126+
subRepo,
127+
deviceRepo))
128+
120129
consumer.RegisterHandlers(convoy.CreateDynamicEventProcessor, task.ProcessDynamicEventCreation(
121130
endpointRepo,
122131
eventRepo,

database/postgres/subscription.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
89
"github.com/dop251/goja"
910
"github.com/frain-dev/convoy"
1011
"github.com/frain-dev/convoy/cache"
@@ -454,7 +455,6 @@ func (s *subscriptionRepo) FindSubscriptionByID(ctx context.Context, projectID s
454455

455456
return subscription, nil
456457
})
457-
458458
if err != nil {
459459
return nil, err
460460
}
@@ -475,7 +475,6 @@ func (s *subscriptionRepo) FindSubscriptionsBySourceID(ctx context.Context, proj
475475

476476
return scanSubscriptions(rows)
477477
})
478-
479478
if err != nil {
480479
return nil, err
481480
}
@@ -496,7 +495,6 @@ func (s *subscriptionRepo) FindSubscriptionsByEndpointID(ctx context.Context, pr
496495

497496
return scanSubscriptions(rows)
498497
})
499-
500498
if err != nil {
501499
return nil, err
502500
}
@@ -520,7 +518,6 @@ func (s *subscriptionRepo) FindSubscriptionByDeviceID(ctx context.Context, proje
520518

521519
return subscription, nil
522520
})
523-
524521
if err != nil {
525522
return nil, err
526523
}
@@ -537,7 +534,6 @@ func (s *subscriptionRepo) FindCLISubscriptions(ctx context.Context, projectID s
537534

538535
return scanSubscriptions(rows)
539536
})
540-
541537
if err != nil {
542538
return nil, err
543539
}

docs/docs.go

+125-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Package docs Code generated by swaggo/swag at 2024-02-07 21:23:57.74884 +0000 GMT m=+2.717224376. DO NOT EDIT
1+
// Package docs Code generated by swaggo/swag at 2024-02-13 10:04:04.243863 +0000 GMT m=+5.458650126. DO NOT EDIT
22
package docs
33

44
import "github.com/swaggo/swag"
@@ -2072,12 +2072,12 @@ const docTemplate = `{
20722072
"required": true
20732073
},
20742074
{
2075-
"description": "Event Details",
2075+
"description": "Broadcast Event Details",
20762076
"name": "event",
20772077
"in": "body",
20782078
"required": true,
20792079
"schema": {
2080-
"$ref": "#/definitions/models.CreateEvent"
2080+
"$ref": "#/definitions/models.BroadcastEvent"
20812081
}
20822082
}
20832083
],
@@ -5277,6 +5277,46 @@ const docTemplate = `{
52775277
}
52785278
}
52795279
},
5280+
"datastore.AmqpCredentials": {
5281+
"type": "object",
5282+
"properties": {
5283+
"password": {
5284+
"type": "string"
5285+
},
5286+
"user": {
5287+
"type": "string"
5288+
}
5289+
}
5290+
},
5291+
"datastore.AmqpPubSubConfig": {
5292+
"type": "object",
5293+
"properties": {
5294+
"auth": {
5295+
"$ref": "#/definitions/datastore.AmqpCredentials"
5296+
},
5297+
"bindedExchange": {
5298+
"type": "string"
5299+
},
5300+
"deadLetterExchange": {
5301+
"type": "string"
5302+
},
5303+
"host": {
5304+
"type": "string"
5305+
},
5306+
"port": {
5307+
"type": "string"
5308+
},
5309+
"queue": {
5310+
"type": "string"
5311+
},
5312+
"routingKey": {
5313+
"type": "string"
5314+
},
5315+
"schema": {
5316+
"type": "string"
5317+
}
5318+
}
5319+
},
52805320
"datastore.ApiKey": {
52815321
"type": "object",
52825322
"properties": {
@@ -5812,6 +5852,9 @@ const docTemplate = `{
58125852
"datastore.PubSubConfig": {
58135853
"type": "object",
58145854
"properties": {
5855+
"amqp": {
5856+
"$ref": "#/definitions/datastore.AmqpPubSubConfig"
5857+
},
58155858
"google": {
58165859
"$ref": "#/definitions/datastore.GooglePubSubConfig"
58175860
},
@@ -5834,12 +5877,14 @@ const docTemplate = `{
58345877
"enum": [
58355878
"sqs",
58365879
"google",
5837-
"kafka"
5880+
"kafka",
5881+
"amqp"
58385882
],
58395883
"x-enum-varnames": [
58405884
"SqsPubSub",
58415885
"GooglePubSub",
5842-
"KafkaPubSub"
5886+
"KafkaPubSub",
5887+
"AmqpPubSub"
58435888
]
58445889
},
58455890
"datastore.RateLimitConfiguration": {
@@ -6084,6 +6129,54 @@ const docTemplate = `{
60846129
}
60856130
}
60866131
},
6132+
"models.AmqpAuth": {
6133+
"type": "object",
6134+
"properties": {
6135+
"password": {
6136+
"type": "string"
6137+
},
6138+
"user": {
6139+
"type": "string"
6140+
}
6141+
}
6142+
},
6143+
"models.AmqpExchange": {
6144+
"type": "object",
6145+
"properties": {
6146+
"exchange": {
6147+
"type": "string"
6148+
},
6149+
"routingKey": {
6150+
"type": "string"
6151+
}
6152+
}
6153+
},
6154+
"models.AmqpPubSubconfig": {
6155+
"type": "object",
6156+
"properties": {
6157+
"auth": {
6158+
"$ref": "#/definitions/models.AmqpAuth"
6159+
},
6160+
"bindExchange": {
6161+
"$ref": "#/definitions/models.AmqpExchange"
6162+
},
6163+
"deadLetterExchange": {
6164+
"type": "string"
6165+
},
6166+
"host": {
6167+
"type": "string"
6168+
},
6169+
"port": {
6170+
"type": "string"
6171+
},
6172+
"queue": {
6173+
"type": "string"
6174+
},
6175+
"schema": {
6176+
"type": "string"
6177+
}
6178+
}
6179+
},
60876180
"models.ApiKey": {
60886181
"type": "object",
60896182
"properties": {
@@ -6106,6 +6199,30 @@ const docTemplate = `{
61066199
}
61076200
}
61086201
},
6202+
"models.BroadcastEvent": {
6203+
"type": "object",
6204+
"properties": {
6205+
"custom_headers": {
6206+
"type": "object",
6207+
"additionalProperties": {
6208+
"type": "string"
6209+
}
6210+
},
6211+
"data": {
6212+
"description": "Data is an arbitrary JSON value that gets sent as the body of the\nwebhook to the endpoints",
6213+
"type": "array",
6214+
"items": {
6215+
"type": "integer"
6216+
}
6217+
},
6218+
"event_type": {
6219+
"type": "string"
6220+
},
6221+
"idempotency_key": {
6222+
"type": "string"
6223+
}
6224+
}
6225+
},
61096226
"models.CreateEndpoint": {
61106227
"type": "object",
61116228
"properties": {
@@ -6822,6 +6939,9 @@ const docTemplate = `{
68226939
"models.PubSubConfig": {
68236940
"type": "object",
68246941
"properties": {
6942+
"amqp": {
6943+
"$ref": "#/definitions/models.AmqpPubSubconfig"
6944+
},
68256945
"google": {
68266946
"$ref": "#/definitions/models.GooglePubSubConfig"
68276947
},

0 commit comments

Comments
 (0)