Skip to content

Commit 01d680e

Browse files
jirevwehorlahOluwadaminiola
authored
Subscriptions Filtering (frain-dev#1096)
* feat: add endpoint to test request filter * feat: get all subscriptions that match the filter * chore: update test * chore: add repo tests * chore: add repo tests * create component for editing for snippet (frain-dev#1115) * Subscriptions filtering UI support (frain-dev#1116) * feat: UI support for subscriptions filtering * update: update monaco component value * update: update modal component to have id * update: ui updates * update: additional fixes * update: additional fixes * chore: update filte when subscription is updated Co-authored-by: Raymond Tukpe <rtukpe@gmail.com> * chore: use app logger * Update monaco (frain-dev#1124) * feat: UI support for subscriptions filtering * update: update monaco component value * update: update modal component to have id * update: ui updates * update: additional fixes * update config * remove console log Co-authored-by: oluwadaminiola <pelumioni25@gmail.com> * update: save filter only when test passes (frain-dev#1133) * update: save filter only when test passes * update: ui updates * fix: fix merge conflicts * update: ui updates * update: ui updates * feat: add support for %or and $add operators. Add migrations * chore: move flatten from internal/pkg to pkg. Use $in in migrations * chore: remove unused commented out function Co-authored-by: Emmanuel Aina <emmanuel.ainaj@gmail.com> Co-authored-by: Pelumi Muyiwa-Oni <Pelumioni25@gmail.com>
1 parent e08f583 commit 01d680e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+33357
-123
lines changed

cmd/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func buildServerCliConfiguration(cmd *cobra.Command) (*config.Configuration, err
433433
c.SMTP.Username = smtpUsername
434434
}
435435

436-
// CONVOY_SMTP_PASSWORDvar configFile string
436+
// CONVOY_SMTP_PASSWORD
437437
smtpPassword, err := cmd.Flags().GetString("smtp-password")
438438
if err != nil {
439439
return nil, err

datastore/db.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const (
2929
SourceCollection = "sources"
3030
UserCollection = "users"
3131
SubscriptionCollection = "subscriptions"
32+
FilterCollection = "filters"
33+
DataMigrationsCollection = "data_migrations"
3234
EventDeliveryCollection = "eventdeliveries"
3335
APIKeyCollection = "apiKeys"
3436
DeviceCollection = "devices"
@@ -537,8 +539,10 @@ func (d *MongoStore) retrieveCollection(ctx context.Context) (string, error) {
537539
return UserCollection, nil
538540
case "devices":
539541
return DeviceCollection, nil
542+
case "filters":
543+
return FilterCollection, nil
540544
case "data_migrations", nil:
541-
return "data_migrations", nil
545+
return DataMigrationsCollection, nil
542546
default:
543547
return "", ErrInvalidCollection
544548
}

datastore/models.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ type AppMetadata struct {
446446
SupportEmail string `json:"support_email" bson:"support_email"`
447447
}
448448

449-
// EventType is used to identify an specific event.
449+
// EventType is used to identify a specific event.
450450
// This could be "user.new"
451451
// This will be used for data indexing
452452
// Makes it easy to filter by a list of events
@@ -495,7 +495,7 @@ func (h HttpHeader) SetHeadersInRequest(r *http.Request) {
495495
}
496496

497497
const (
498-
// ScheduledEventStatus : when a Event has been scheduled for delivery
498+
// ScheduledEventStatus : when an Event has been scheduled for delivery
499499
ScheduledEventStatus EventDeliveryStatus = "Scheduled"
500500
ProcessingEventStatus EventDeliveryStatus = "Processing"
501501
DiscardedEventStatus EventDeliveryStatus = "Discarded"
@@ -527,8 +527,10 @@ type Metadata struct {
527527
// Data to be sent to endpoint.
528528
Data json.RawMessage `json:"data" bson:"data"`
529529
Strategy StrategyProvider `json:"strategy" bson:"strategy"`
530-
530+
// NextSendTime denotes the next time an Event will be published in
531+
// case it failed the first time
531532
NextSendTime primitive.DateTime `json:"next_send_time" bson:"next_send_time"`
533+
532534
// NumTrials: number of times we have tried to deliver this Event to
533535
// an application
534536
NumTrials uint64 `json:"num_trials" bson:"num_trials"`
@@ -580,7 +582,7 @@ type DeliveryAttempt struct {
580582
DeletedAt primitive.DateTime `json:"deleted_at,omitempty" bson:"deleted_at,omitempty" swaggertype:"string"`
581583
}
582584

583-
// Event defines a payload to be sent to an application
585+
// EventDelivery defines a payload to be sent to an application
584586
type EventDelivery struct {
585587
ID primitive.ObjectID `json:"-" bson:"_id"`
586588
UID string `json:"uid" bson:"uid"`
@@ -710,7 +712,8 @@ type AlertConfiguration struct {
710712
}
711713

712714
type FilterConfiguration struct {
713-
EventTypes []string `json:"event_types" bson:"event_types,omitempty"`
715+
EventTypes []string `json:"event_types" bson:"event_types,omitempty"`
716+
Filter map[string]interface{} `json:"filter" bson:"filter"`
714717
}
715718

716719
type ProviderConfig struct {
@@ -894,3 +897,10 @@ type (
894897
AppMap map[string]*Application
895898
EndpointMap map[string]*Endpoint
896899
)
900+
901+
type SubscriptionFilter struct {
902+
ID primitive.ObjectID `json:"-" bson:"_id"`
903+
UID string `json:"uid" bson:"uid"`
904+
Filter map[string]interface{} `json:"filter" bson:"filter"`
905+
DocumentStatus DocumentStatus `json:"-" bson:"document_status"`
906+
}

datastore/mongo/mongo.go

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func (c *Client) Database() *mongo.Database {
7676

7777
func (c *Client) ensureMongoIndices() {
7878
c.ensureIndex(datastore.GroupCollection, "uid", true, nil)
79+
c.ensureIndex(datastore.FilterCollection, "uid", true, nil)
7980

8081
c.ensureIndex(datastore.OrganisationCollection, "uid", true, nil)
8182

datastore/mongo/subscription.go

+105
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"time"
88

99
"github.com/frain-dev/convoy/datastore"
10+
"github.com/frain-dev/convoy/pkg/flatten"
1011
"github.com/frain-dev/convoy/util"
12+
"github.com/google/uuid"
1113
"go.mongodb.org/mongo-driver/bson"
1214
"go.mongodb.org/mongo-driver/bson/primitive"
1315
"go.mongodb.org/mongo-driver/mongo"
@@ -54,6 +56,7 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, groupId strin
5456
"endpoint_id": subscription.EndpointID,
5557

5658
"filter_config.event_types": subscription.FilterConfig.EventTypes,
59+
"filter_config.filter": subscription.FilterConfig.Filter,
5760
"alert_config": subscription.AlertConfig,
5861
"retry_config": subscription.RetryConfig,
5962
"disable_endpoint": subscription.DisableEndpoint,
@@ -344,10 +347,112 @@ func (s *subscriptionRepo) UpdateSubscriptionStatus(ctx context.Context, groupId
344347
return err
345348
}
346349

350+
func (s *subscriptionRepo) TestSubscriptionFilter(ctx context.Context, payload map[string]interface{}, filter map[string]interface{}) (bool, error) {
351+
ctx = context.WithValue(ctx, datastore.CollectionCtx, datastore.FilterCollection)
352+
isValid := false
353+
354+
err := s.store.WithTransaction(ctx, func(sessCtx mongo.SessionContext) error {
355+
f := datastore.SubscriptionFilter{
356+
ID: primitive.NewObjectID(),
357+
UID: uuid.NewString(),
358+
Filter: payload,
359+
DocumentStatus: datastore.ActiveDocumentStatus,
360+
}
361+
362+
// insert the desired request payload
363+
err := s.store.Save(sessCtx, f, nil)
364+
if err != nil {
365+
return err
366+
}
367+
368+
// compare the filter with the test request payload
369+
var q map[string]interface{}
370+
if len(filter) == 0 {
371+
filter = nil
372+
}
373+
374+
if filter != nil {
375+
q, err = flattenFilter(filter)
376+
if err != nil {
377+
return err
378+
}
379+
}
380+
381+
var filters []datastore.SubscriptionFilter
382+
err = s.store.FindAll(sessCtx, q, nil, nil, &filters)
383+
if err != nil {
384+
return err
385+
}
386+
387+
isValid = len(filters) > 0
388+
389+
err = s.store.DeleteByID(sessCtx, f.UID, true)
390+
if err != nil {
391+
return err
392+
}
393+
394+
return nil
395+
})
396+
397+
return isValid, err
398+
}
399+
347400
func (s *subscriptionRepo) setCollectionInContext(ctx context.Context) context.Context {
348401
return context.WithValue(ctx, datastore.CollectionCtx, datastore.SubscriptionCollection)
349402
}
350403

404+
func flattenFilter(f map[string]interface{}) (map[string]interface{}, error) {
405+
isAndOr := false
406+
var operator string
407+
408+
for k := range f {
409+
if k == "$or" {
410+
if len(f) > 1 {
411+
return nil, flatten.ErrTopLevelElementOr
412+
}
413+
operator = k
414+
isAndOr = true
415+
break
416+
}
417+
418+
if k == "$and" {
419+
if len(f) > 1 {
420+
return nil, flatten.ErrTopLevelElementAnd
421+
}
422+
isAndOr = true
423+
break
424+
}
425+
}
426+
427+
if isAndOr {
428+
if a, ok := f[operator].([]interface{}); ok {
429+
if !ok {
430+
return nil, flatten.ErrOrAndMustBeArray
431+
}
432+
433+
for i := range a {
434+
t, err := flatten.FlattenWithPrefix("filter", a[i].(map[string]interface{}))
435+
if err != nil {
436+
return nil, err
437+
}
438+
439+
a[i] = t
440+
}
441+
442+
f[operator] = a
443+
return f, nil
444+
}
445+
}
446+
447+
query := map[string]interface{}{"filter": f}
448+
q, err := flatten.Flatten(query)
449+
if err != nil {
450+
return nil, err
451+
}
452+
453+
return q, nil
454+
}
455+
351456
// getSkip returns calculated skip value for the query
352457
func getSkip(page, limit int) int {
353458
skip := (page - 1) * limit

0 commit comments

Comments
 (0)