Skip to content

Commit d98fedb

Browse files
authored
feat: separate data plane from control plane (frain-dev#1917)
* feat: basic separation is ready * fix: removed unused chan in agent * fix: fixed failing tests * fix: fixed killing agent cmd * fix: fixed cmd package * fix: fixed failing tests in api package * fix: add routes back to control plane * fix: remove references to sourcepool from agent * fix: added back direction openapi reference * fix: bump openapi version
1 parent 216f6a9 commit d98fedb

21 files changed

+488
-129
lines changed

api/api.go

+170-9
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewApplicationHandler(a *types.APIOptions) (*ApplicationHandler, error) {
9999
return &ApplicationHandler{A: a, rm: rm}, nil
100100
}
101101

102-
func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
102+
func (a *ApplicationHandler) buildRouter() *chi.Mux {
103103
router := chi.NewMux()
104104

105105
router.Use(chiMiddleware.RequestID)
@@ -109,14 +109,22 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
109109
router.Use(middleware.LogHttpRequest(a.A))
110110
router.Use(chiMiddleware.Maybe(middleware.SetupCORS, shouldApplyCORS))
111111

112+
return router
113+
}
114+
115+
func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux {
116+
117+
router := a.buildRouter()
118+
119+
handler := &handlers.Handler{A: a.A, RM: a.rm}
120+
121+
// TODO(subomi): left this here temporarily till the data plane is stable.
112122
// Ingestion API.
113123
router.Route("/ingest", func(ingestRouter chi.Router) {
114124
ingestRouter.Get("/{maskID}", a.HandleCrcCheck)
115125
ingestRouter.Post("/{maskID}", a.IngestEvent)
116126
})
117127

118-
handler := &handlers.Handler{A: a.A, RM: a.rm}
119-
120128
// Public API.
121129
router.Route("/api", func(v1Router chi.Router) {
122130
v1Router.Route("/v1", func(r chi.Router) {
@@ -146,6 +154,7 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
146154
})
147155
})
148156

157+
// TODO(subomi): left this here temporarily till the data plane is stable.
149158
projectSubRouter.Route("/events", func(eventRouter chi.Router) {
150159
// TODO(all): should the InstrumentPath change?
151160
eventRouter.With(middleware.InstrumentPath("/events")).Post("/", handler.CreateEndpointEvent)
@@ -304,6 +313,7 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
304313
})
305314
})
306315

316+
// TODO(subomi): left this here temporarily till the data plane is stable.
307317
projectSubRouter.Route("/events", func(eventRouter chi.Router) {
308318
eventRouter.Post("/", handler.CreateEndpointEvent)
309319
eventRouter.Post("/fanout", handler.CreateEndpointFanoutEvent)
@@ -401,6 +411,7 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
401411
endpointRouter.With(handler.CanManageEndpoint()).Put("/{endpointID}/expire_secret", handler.ExpireSecret)
402412
})
403413

414+
// TODO(subomi): left this here temporarily till the data plane is stable.
404415
portalLinkRouter.Route("/events", func(eventRouter chi.Router) {
405416
eventRouter.Post("/", handler.CreateEndpointEvent)
406417
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
@@ -413,6 +424,23 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
413424
})
414425
})
415426

427+
portalLinkRouter.Route("/eventdeliveries", func(eventDeliveryRouter chi.Router) {
428+
eventDeliveryRouter.With(middleware.Pagination).Get("/", handler.GetEventDeliveriesPaged)
429+
eventDeliveryRouter.Post("/forceresend", handler.ForceResendEventDeliveries)
430+
eventDeliveryRouter.Post("/batchretry", handler.BatchRetryEventDelivery)
431+
eventDeliveryRouter.Get("/countbatchretryevents", handler.CountAffectedEventDeliveries)
432+
433+
eventDeliveryRouter.Route("/{eventDeliveryID}", func(eventDeliverySubRouter chi.Router) {
434+
eventDeliverySubRouter.Get("/", handler.GetEventDelivery)
435+
eventDeliverySubRouter.Put("/resend", handler.ResendEventDelivery)
436+
437+
eventDeliverySubRouter.Route("/deliveryattempts", func(deliveryRouter chi.Router) {
438+
deliveryRouter.Get("/", handler.GetDeliveryAttempts)
439+
deliveryRouter.Get("/{deliveryAttemptID}", handler.GetDeliveryAttempt)
440+
})
441+
})
442+
})
443+
416444
portalLinkRouter.Route("/subscriptions", func(subscriptionRouter chi.Router) {
417445
subscriptionRouter.Post("/", handler.CreateSubscription)
418446
subscriptionRouter.Post("/test_filter", handler.TestSubscriptionFilter)
@@ -422,6 +450,145 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
422450
subscriptionRouter.Put("/{subscriptionID}", handler.UpdateSubscription)
423451
})
424452

453+
})
454+
455+
router.Handle("/queue/monitoring/*", a.A.Queue.(*redisqueue.RedisQueue).Monitor())
456+
router.Handle("/metrics", promhttp.HandlerFor(metrics.Reg(), promhttp.HandlerOpts{}))
457+
router.HandleFunc("/*", reactRootHandler)
458+
459+
metrics.RegisterQueueMetrics(a.A.Queue)
460+
prometheus.MustRegister(metrics.RequestDuration())
461+
a.Router = router
462+
463+
return router
464+
}
465+
466+
func (a *ApplicationHandler) BuildDataPlaneRoutes() *chi.Mux {
467+
router := a.buildRouter()
468+
469+
// Ingestion API.
470+
router.Route("/ingest", func(ingestRouter chi.Router) {
471+
ingestRouter.Get("/{maskID}", a.HandleCrcCheck)
472+
ingestRouter.Post("/{maskID}", a.IngestEvent)
473+
})
474+
475+
handler := &handlers.Handler{A: a.A, RM: a.rm}
476+
477+
// Public API.
478+
router.Route("/api", func(v1Router chi.Router) {
479+
v1Router.Route("/v1", func(r chi.Router) {
480+
r.Use(chiMiddleware.AllowContentType("application/json"))
481+
r.Use(middleware.JsonResponse)
482+
r.Use(middleware.RequireAuth())
483+
484+
r.Route("/projects", func(projectRouter chi.Router) {
485+
projectRouter.Route("/{projectID}", func(projectSubRouter chi.Router) {
486+
projectSubRouter.Route("/events", func(eventRouter chi.Router) {
487+
// TODO(all): should the InstrumentPath change?
488+
eventRouter.With(middleware.InstrumentPath("/events")).Post("/", handler.CreateEndpointEvent)
489+
eventRouter.Post("/fanout", handler.CreateEndpointFanoutEvent)
490+
eventRouter.Post("/broadcast", handler.CreateBroadcastEvent)
491+
eventRouter.Post("/dynamic", handler.CreateDynamicEvent)
492+
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
493+
eventRouter.Post("/batchreplay", handler.BatchReplayEvents)
494+
495+
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
496+
eventSubRouter.Get("/", handler.GetEndpointEvent)
497+
eventSubRouter.Put("/replay", handler.ReplayEndpointEvent)
498+
})
499+
})
500+
501+
projectSubRouter.Route("/eventdeliveries", func(eventDeliveryRouter chi.Router) {
502+
eventDeliveryRouter.With(middleware.Pagination).Get("/", handler.GetEventDeliveriesPaged)
503+
eventDeliveryRouter.Post("/forceresend", handler.ForceResendEventDeliveries)
504+
eventDeliveryRouter.Post("/batchretry", handler.BatchRetryEventDelivery)
505+
506+
eventDeliveryRouter.Route("/{eventDeliveryID}", func(eventDeliverySubRouter chi.Router) {
507+
eventDeliverySubRouter.Get("/", handler.GetEventDelivery)
508+
eventDeliverySubRouter.Put("/resend", handler.ResendEventDelivery)
509+
510+
eventDeliverySubRouter.Route("/deliveryattempts", func(deliveryRouter chi.Router) {
511+
deliveryRouter.Get("/", handler.GetDeliveryAttempts)
512+
deliveryRouter.Get("/{deliveryAttemptID}", handler.GetDeliveryAttempt)
513+
})
514+
})
515+
})
516+
})
517+
})
518+
})
519+
})
520+
521+
// Dashboard API.
522+
router.Route("/ui", func(uiRouter chi.Router) {
523+
uiRouter.Use(middleware.JsonResponse)
524+
uiRouter.Use(chiMiddleware.Maybe(middleware.RequireAuth(), shouldAuthRoute))
525+
526+
// TODO(subomi): added these back for the tests to pass.
527+
// What should we do in the future?
528+
uiRouter.Route("/auth", func(authRouter chi.Router) {
529+
authRouter.Post("/login", handler.LoginUser)
530+
authRouter.Post("/register", handler.RegisterUser)
531+
authRouter.Post("/token/refresh", handler.RefreshToken)
532+
authRouter.Post("/logout", handler.LogoutUser)
533+
})
534+
535+
uiRouter.Route("/organisations", func(orgRouter chi.Router) {
536+
orgRouter.Route("/{orgID}", func(orgSubRouter chi.Router) {
537+
orgSubRouter.Route("/projects", func(projectRouter chi.Router) {
538+
projectRouter.Route("/{projectID}", func(projectSubRouter chi.Router) {
539+
projectSubRouter.Route("/events", func(eventRouter chi.Router) {
540+
eventRouter.Post("/", handler.CreateEndpointEvent)
541+
eventRouter.Post("/fanout", handler.CreateEndpointFanoutEvent)
542+
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
543+
eventRouter.Post("/batchreplay", handler.BatchReplayEvents)
544+
eventRouter.Get("/countbatchreplayevents", handler.CountAffectedEvents)
545+
546+
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
547+
eventSubRouter.Get("/", handler.GetEndpointEvent)
548+
eventSubRouter.Put("/replay", handler.ReplayEndpointEvent)
549+
})
550+
})
551+
552+
projectSubRouter.Route("/eventdeliveries", func(eventDeliveryRouter chi.Router) {
553+
eventDeliveryRouter.With(middleware.Pagination).Get("/", handler.GetEventDeliveriesPaged)
554+
eventDeliveryRouter.Post("/forceresend", handler.ForceResendEventDeliveries)
555+
eventDeliveryRouter.Post("/batchretry", handler.BatchRetryEventDelivery)
556+
eventDeliveryRouter.Get("/countbatchretryevents", handler.CountAffectedEventDeliveries)
557+
558+
eventDeliveryRouter.Route("/{eventDeliveryID}", func(eventDeliverySubRouter chi.Router) {
559+
eventDeliverySubRouter.Get("/", handler.GetEventDelivery)
560+
eventDeliverySubRouter.Put("/resend", handler.ResendEventDelivery)
561+
562+
eventDeliverySubRouter.Route("/deliveryattempts", func(deliveryRouter chi.Router) {
563+
deliveryRouter.Get("/", handler.GetDeliveryAttempts)
564+
deliveryRouter.Get("/{deliveryAttemptID}", handler.GetDeliveryAttempt)
565+
})
566+
})
567+
})
568+
})
569+
})
570+
})
571+
})
572+
})
573+
574+
// Portal Link API.
575+
router.Route("/portal-api", func(portalLinkRouter chi.Router) {
576+
portalLinkRouter.Use(middleware.JsonResponse)
577+
portalLinkRouter.Use(middleware.SetupCORS)
578+
portalLinkRouter.Use(middleware.RequireAuth())
579+
580+
portalLinkRouter.Route("/events", func(eventRouter chi.Router) {
581+
eventRouter.Post("/", handler.CreateEndpointEvent)
582+
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
583+
eventRouter.Post("/batchreplay", handler.BatchReplayEvents)
584+
eventRouter.Get("/countbatchreplayevents", handler.CountAffectedEvents)
585+
586+
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
587+
eventSubRouter.Get("/", handler.GetEndpointEvent)
588+
eventSubRouter.Put("/replay", handler.ReplayEndpointEvent)
589+
})
590+
})
591+
425592
portalLinkRouter.Route("/eventdeliveries", func(eventDeliveryRouter chi.Router) {
426593
eventDeliveryRouter.With(middleware.Pagination).Get("/", handler.GetEventDeliveriesPaged)
427594
eventDeliveryRouter.Post("/forceresend", handler.ForceResendEventDeliveries)
@@ -440,12 +607,6 @@ func (a *ApplicationHandler) BuildRoutes() *chi.Mux {
440607
})
441608
})
442609

443-
router.Handle("/queue/monitoring/*", a.A.Queue.(*redisqueue.RedisQueue).Monitor())
444-
router.Handle("/metrics", promhttp.HandlerFor(metrics.Reg(), promhttp.HandlerOpts{}))
445-
router.HandleFunc("/*", reactRootHandler)
446-
447-
metrics.RegisterQueueMetrics(a.A.Queue)
448-
prometheus.MustRegister(metrics.RequestDuration())
449610
a.Router = router
450611

451612
return router

api/dashboard_integration_test.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type AuthIntegrationTestSuite struct {
4646
func (u *AuthIntegrationTestSuite) SetupSuite() {
4747
u.DB = getDB()
4848
u.ConvoyApp = buildServer()
49-
u.Router = u.ConvoyApp.BuildRoutes()
49+
u.Router = u.ConvoyApp.BuildControlPlaneRoutes()
5050
}
5151

5252
func (u *AuthIntegrationTestSuite) SetupTest() {
@@ -323,7 +323,7 @@ type DashboardIntegrationTestSuite struct {
323323
func (s *DashboardIntegrationTestSuite) SetupSuite() {
324324
s.DB = getDB()
325325
s.ConvoyApp = buildServer()
326-
s.Router = s.ConvoyApp.BuildRoutes()
326+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
327327
}
328328

329329
func (s *DashboardIntegrationTestSuite) SetupTest() {
@@ -604,7 +604,7 @@ type EndpointIntegrationTestSuite struct {
604604
func (s *EndpointIntegrationTestSuite) SetupSuite() {
605605
s.DB = getDB()
606606
s.ConvoyApp = buildServer()
607-
s.Router = s.ConvoyApp.BuildRoutes()
607+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
608608
}
609609

610610
func (s *EndpointIntegrationTestSuite) SetupTest() {
@@ -1048,7 +1048,7 @@ type EventIntegrationTestSuite struct {
10481048
func (s *EventIntegrationTestSuite) SetupSuite() {
10491049
s.DB = getDB()
10501050
s.ConvoyApp = buildServer()
1051-
s.Router = s.ConvoyApp.BuildRoutes()
1051+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
10521052
}
10531053

10541054
func (s *EventIntegrationTestSuite) SetupTest() {
@@ -1600,7 +1600,7 @@ type OrganisationIntegrationTestSuite struct {
16001600
func (s *OrganisationIntegrationTestSuite) SetupSuite() {
16011601
s.DB = getDB()
16021602
s.ConvoyApp = buildServer()
1603-
s.Router = s.ConvoyApp.BuildRoutes()
1603+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
16041604
}
16051605

16061606
func (s *OrganisationIntegrationTestSuite) SetupTest() {
@@ -1905,7 +1905,7 @@ type OrganisationInviteIntegrationTestSuite struct {
19051905
func (s *OrganisationInviteIntegrationTestSuite) SetupSuite() {
19061906
s.DB = getDB()
19071907
s.ConvoyApp = buildServer()
1908-
s.Router = s.ConvoyApp.BuildRoutes()
1908+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
19091909
}
19101910

19111911
func (s *OrganisationInviteIntegrationTestSuite) SetupTest() {
@@ -2333,7 +2333,7 @@ type OrganisationMemberIntegrationTestSuite struct {
23332333
func (s *OrganisationMemberIntegrationTestSuite) SetupSuite() {
23342334
s.DB = getDB()
23352335
s.ConvoyApp = buildServer()
2336-
s.Router = s.ConvoyApp.BuildRoutes()
2336+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
23372337
}
23382338

23392339
func (s *OrganisationMemberIntegrationTestSuite) SetupTest() {
@@ -2574,7 +2574,7 @@ type PortalLinkIntegrationTestSuite struct {
25742574
func (s *PortalLinkIntegrationTestSuite) SetupSuite() {
25752575
s.DB = getDB()
25762576
s.ConvoyApp = buildServer()
2577-
s.Router = s.ConvoyApp.BuildRoutes()
2577+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
25782578
}
25792579

25802580
func (s *PortalLinkIntegrationTestSuite) SetupTest() {
@@ -2851,7 +2851,7 @@ type ProjectIntegrationTestSuite struct {
28512851
func (s *ProjectIntegrationTestSuite) SetupSuite() {
28522852
s.DB = getDB()
28532853
s.ConvoyApp = buildServer()
2854-
s.Router = s.ConvoyApp.BuildRoutes()
2854+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
28552855
}
28562856

28572857
func (s *ProjectIntegrationTestSuite) SetupTest() {
@@ -3162,7 +3162,7 @@ type SourceIntegrationTestSuite struct {
31623162
func (s *SourceIntegrationTestSuite) SetupSuite() {
31633163
s.DB = getDB()
31643164
s.ConvoyApp = buildServer()
3165-
s.Router = s.ConvoyApp.BuildRoutes()
3165+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
31663166
}
31673167

31683168
func (s *SourceIntegrationTestSuite) SetupTest() {
@@ -3496,7 +3496,7 @@ type SubscriptionIntegrationTestSuite struct {
34963496
func (s *SubscriptionIntegrationTestSuite) SetupSuite() {
34973497
s.DB = getDB()
34983498
s.ConvoyApp = buildServer()
3499-
s.Router = s.ConvoyApp.BuildRoutes()
3499+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
35003500
}
35013501

35023502
func (s *SubscriptionIntegrationTestSuite) SetupTest() {
@@ -3986,7 +3986,7 @@ type UserIntegrationTestSuite struct {
39863986
func (u *UserIntegrationTestSuite) SetupSuite() {
39873987
u.DB = getDB()
39883988
u.ConvoyApp = buildServer()
3989-
u.Router = u.ConvoyApp.BuildRoutes()
3989+
u.Router = u.ConvoyApp.BuildControlPlaneRoutes()
39903990
}
39913991

39923992
func (u *UserIntegrationTestSuite) SetupTest() {
@@ -4475,7 +4475,7 @@ type MetaEventIntegrationTestSuite struct {
44754475
func (s *MetaEventIntegrationTestSuite) SetupSuite() {
44764476
s.DB = getDB()
44774477
s.ConvoyApp = buildServer()
4478-
s.Router = s.ConvoyApp.BuildRoutes()
4478+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
44794479
}
44804480

44814481
func (s *MetaEventIntegrationTestSuite) SetupTest() {

api/handlers/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package handlers
33
// This is the main doc file, swag cli needs it to be named main.go
44

55
// @title Convoy API Reference
6-
// @version 0.9.0
6+
// @version 24.1.4
77
// @description Convoy is a fast and secure webhooks proxy. This document contains datastore.s API specification.
88
// @termsOfService https://getconvoy.io/terms
99

api/ingest_integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type IngestIntegrationTestSuite struct {
3333
func (i *IngestIntegrationTestSuite) SetupSuite() {
3434
i.DB = getDB()
3535
i.ConvoyApp = buildServer()
36-
i.Router = i.ConvoyApp.BuildRoutes()
36+
i.Router = i.ConvoyApp.BuildControlPlaneRoutes()
3737
}
3838

3939
func (i *IngestIntegrationTestSuite) SetupTest() {

api/portal_api_integration_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type PortalEndpointIntegrationTestSuite struct {
3838
func (s *PortalEndpointIntegrationTestSuite) SetupSuite() {
3939
s.DB = getDB()
4040
s.ConvoyApp = buildServer()
41-
s.Router = s.ConvoyApp.BuildRoutes()
41+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
4242
}
4343

4444
func (s *PortalEndpointIntegrationTestSuite) SetupTest() {
@@ -186,7 +186,7 @@ type PortalEventIntegrationTestSuite struct {
186186
func (s *PortalEventIntegrationTestSuite) SetupSuite() {
187187
s.DB = getDB()
188188
s.ConvoyApp = buildServer()
189-
s.Router = s.ConvoyApp.BuildRoutes()
189+
s.Router = s.ConvoyApp.BuildControlPlaneRoutes()
190190
}
191191

192192
func (s *PortalEventIntegrationTestSuite) SetupTest() {

0 commit comments

Comments
 (0)