From d9440017b442880a61addb84d56e96e80c0a2316 Mon Sep 17 00:00:00 2001 From: Michael McCreedy Date: Mon, 10 Feb 2025 12:10:45 +0100 Subject: [PATCH 1/3] Add metrics which can be used to track our Notification Queue --- backend/Makefile | 3 + backend/pkg/api/api_test.go | 2 + backend/pkg/commons/metrics/metrics.go | 16 ++ backend/pkg/consapi/client_node_test.go | 2 + backend/pkg/notification/sending.go | 189 ++++++++++++++++++++++ backend/pkg/notification/sending_test.go | 195 +++++++++++++++++++++++ 6 files changed, 407 insertions(+) create mode 100644 backend/pkg/notification/sending_test.go diff --git a/backend/Makefile b/backend/Makefile index a52da69002..6eb5464807 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -18,6 +18,9 @@ clean: test: go test ./... +test-integration: + go test -v -tags integration + lint: go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.60.1 golangci-lint run --timeout 5m diff --git a/backend/pkg/api/api_test.go b/backend/pkg/api/api_test.go index 8e952a1ce5..9a40fef449 100644 --- a/backend/pkg/api/api_test.go +++ b/backend/pkg/api/api_test.go @@ -1,3 +1,5 @@ +//go:build integration + package api_test import ( diff --git a/backend/pkg/commons/metrics/metrics.go b/backend/pkg/commons/metrics/metrics.go index 2470124bba..a787e0aeb5 100644 --- a/backend/pkg/commons/metrics/metrics.go +++ b/backend/pkg/commons/metrics/metrics.go @@ -78,6 +78,22 @@ var ( Name: "counter", Help: "Generic counter of events with name in labels", }, []string{"name"}) + NotificationsQueue_Event_Size = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "notifications_queue_event_pending_size", + Help: "Number of pending notifications in the queue by event type", + }, []string{"event_type", "status"}) + NotificationsQueue_Channel_Size = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "notifications_queue_channel_pending_size", + Help: "Number of pending notifications in the queue by channel", + }, []string{"channel", "status"}) + NotificationsQueue_Pending_Time = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "notifications_queue_pending_duration_milliseconds", + Help: "Duration of still pending notifications in the queue", + }, []string{"channel", "event_type"}) + NotificationsQueue_Sent_Time = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "notifications_queue_sent_duration_milliseconds", + Help: "Amount of time notification took to be successfully sent", + }, []string{"channel", "event_type"}) ) func init() { diff --git a/backend/pkg/consapi/client_node_test.go b/backend/pkg/consapi/client_node_test.go index 40c22ca9d5..544a0766c9 100644 --- a/backend/pkg/consapi/client_node_test.go +++ b/backend/pkg/consapi/client_node_test.go @@ -1,3 +1,5 @@ +//go:build integration + package consapi_test import ( diff --git a/backend/pkg/notification/sending.go b/backend/pkg/notification/sending.go index 2c6b57e9eb..e6c6888d5b 100644 --- a/backend/pkg/notification/sending.go +++ b/backend/pkg/notification/sending.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "strings" @@ -62,6 +63,9 @@ func notificationSender() { log.Error(err, "error dispatching notifications", 0) } + // Record metrics related to Notification Queue like size of queue and duration of pending notifications + collectNotificationQueueMetrics() + err = garbageCollectNotificationQueue() if err != nil { log.Error(err, "error garbage collecting notification queue", 0) @@ -528,3 +532,188 @@ func SendTestWebhookNotification(ctx context.Context, userId types.UserId, webho } return nil } + +type Notification struct { + Id *uint64 `db:"id"` + Created *time.Time `db:"created"` + Sent *time.Time `db:"sent"` + Channel string `db:"channel"` + Content string `db:"content"` +} + +type NotificationStatus string + +const ( + Sent NotificationStatus = "sent" + Pending NotificationStatus = "pending" + + // Metrics label for EventType where the event could not be mapped + UnknownEvent string = "unknown_event" +) + +/** + * Get all notifications which were marked as sent. + */ +func GetSentNotifications() ([]Notification, error) { + notificationRecords := []Notification{} + + err := db.ReaderDb.Select(¬ificationRecords, + `SELECT id, created, sent, channel, content + FROM notification_queue + WHERE sent IS NOT NULL`) + if err != nil { + return nil, fmt.Errorf("error querying sent notifications: %w", err) + } + + return notificationRecords, nil +} + +/** + * Get all notifications which have not yet been sent. + */ +func GetPendingNotifications() ([]Notification, error) { + notificationRecords := []Notification{} + + err := db.ReaderDb.Select(¬ificationRecords, + `SELECT id, created, sent, channel, content + FROM notification_queue + WHERE sent IS NULL`) + if err != nil { + return nil, fmt.Errorf("error querying pending notifications: %w", err) + } + + return notificationRecords, nil +} + +/** + * Collects metrics for the Notification system, specifically about its queue. + * Provides metrics related to how many notifications are pending and sent, and of what event type they are. + * Also provides metrics related to how long the notifications have been in the queue + */ +func collectNotificationQueueMetrics() { + sentNotifications, err := GetSentNotifications() + if err != nil { + log.Error(err, "Error retrieving sent notifications. Will skip sending metrics", 0) + return // Don't return an error, we don't want to disrupt actual notification sending simply because we couldn't record metrics + } + pendingNotifications, err := GetPendingNotifications() + if err != nil { + log.Error(err, "Error retrieving pending notifications. Will skip sending metrics", 0) + return // Don't return an error, we don't want to disrupt actual notification sending simply because we couldn't record metrics + } + + now := time.Now() // Checking the time once so that it is consistent across all metrics for this collection attempt + + // Record for each sent notification how long it took to send. Honestly, can probably remove this later since this metric can be sent once when the notification itself is delivered. + for _, notification := range sentNotifications { + eventType := GetEventLabelForNotification(notification) + + // Record the amount of time records that were sent (and that still exist in the queue) took to sent + metrics.NotificationsQueue_Sent_Time.WithLabelValues(notification.Channel, eventType).Observe(GetTimeDiffMilliseconds(*notification.Sent, *notification.Created)) + } + + // Record for each pending notification how long it has been in the queue + for _, notification := range pendingNotifications { + eventType := GetEventLabelForNotification(notification) + + // Record the amount of time these records have been waiting to been sent + metrics.NotificationsQueue_Pending_Time.WithLabelValues(notification.Channel, eventType).Observe(GetTimeDiffMilliseconds(*notification.Created, now)) + } + + // Count number of pending notifications in the queue by event type + eventTypeCount := CountByEventType(pendingNotifications) + for eventType, numNotifications := range eventTypeCount { + metrics.NotificationsQueue_Event_Size.WithLabelValues(eventType, string(Pending)).Set(float64(numNotifications)) + } + + // Count number of sent notifications in the queue by event type + eventTypeCount = CountByEventType(sentNotifications) + for eventType, numNotifications := range eventTypeCount { + metrics.NotificationsQueue_Event_Size.WithLabelValues(eventType, string(Sent)).Set(float64(numNotifications)) + } + + // Count number of pending notifications in the queue by channel + channelCount := CountByChannel(pendingNotifications) + for channelType, numNotifications := range channelCount { + metrics.NotificationsQueue_Channel_Size.WithLabelValues(channelType, string(Pending)).Set(float64(numNotifications)) + } + + // Count number of sent notifications in the queue by channel + channelCount = CountByChannel(sentNotifications) + for channelType, numNotifications := range channelCount { + metrics.NotificationsQueue_Channel_Size.WithLabelValues(channelType, string(Sent)).Set(float64(numNotifications)) + } +} + +/** + * Simple wrapper that enables submitting metrics for notifications with unknown event names. + */ +func GetEventLabelForNotification(notification Notification) string { + eventName, err := ExtractEventNameFromNotification(notification) + if err != nil { + return UnknownEvent + } + + return string(*eventName) +} + +/** + * Because we don't record the event type when recording notifications, we have to do some work to extract them from the + * notification message that is eventually sent to the user. + */ +func ExtractEventNameFromNotification(notification Notification) (*types.EventName, error) { + for eventName, eventDescription := range types.EventLabel { + if strings.Contains(notification.Content, eventDescription) { + return &eventName, nil + } + } + + return nil, fmt.Errorf("no EventName found for notification %d matching any event descriptions", notification.Id) +} + +/** + * Given a collection of notifications, count the number of notifications with each distinct event type. + */ +func CountByEventType(notifications []Notification) map[string]int { + eventTypeCountMap := make(map[string]int, len(types.EventLabel)+1) // +1 to account for the "unknown" event type + // Initialize the map with all EventLabel types, with the Count set to 0 + // Must be pre-initialized, because we still want to submit metrics for 0-count EventTypes, so they must exist in this map. + for eventType := range types.EventLabel { + eventTypeCountMap[string(eventType)] = 0 + } + eventTypeCountMap[UnknownEvent] = 0 // include unknown, which indicates an EventType which we couldn't parse from the Notification's content field + + // Now iterate over the list of events, and increment the value in the map + for _, notification := range notifications { + eventType := GetEventLabelForNotification(notification) + eventTypeCountMap[eventType] = eventTypeCountMap[eventType] + 1 + } + return eventTypeCountMap +} + +/** + * Given a collection of notifications, count the number of notifications with each distinct channel type. + */ +func CountByChannel(notifications []Notification) map[string]int { + channelCountMap := make(map[string]int, len(types.NotificationChannels)) + // Initialize the map with the Channel types, with the Count set to 0. + // Must be pre-initialized, because we still want to submit metrics for 0-count Channels, so they must exist in this map. + for _, channelType := range types.NotificationChannels { + channelCountMap[string(channelType)] = 0 + } + + // Now iterate over the list of notifications, and increment the value for each channel + for _, notification := range notifications { + channelCountMap[notification.Channel] = channelCountMap[notification.Channel] + 1 + } + return channelCountMap +} + +/** + * Returns the amount of milliseconds between two timestamps. Always returns a positive + * duration, so you don't have to worry about date ordering + */ +func GetTimeDiffMilliseconds(time1 time.Time, time2 time.Time) float64 { + duration := time1.Sub(time2) + return math.Abs(float64(duration.Milliseconds())) +} diff --git a/backend/pkg/notification/sending_test.go b/backend/pkg/notification/sending_test.go new file mode 100644 index 0000000000..bcff702b00 --- /dev/null +++ b/backend/pkg/notification/sending_test.go @@ -0,0 +1,195 @@ +package notification + +import ( + "testing" + "time" + + "github.com/gobitfly/beaconchain/pkg/commons/types" + "github.com/google/go-cmp/cmp" +) + +// getTimeDiffMilliseconds - Positive diff +// getTimeDiffMilliseconds - Negative diff +// countByChannel - All Notifications, no notifications, unknown notification type +// +// countByEventType +// extractEventTypeFromNotification - All event types, grab from DB + +func TestGetTimeDiffMilliseconds(t *testing.T) { + tests := map[string]struct { + time1 time.Time + time2 time.Time + want float64 + }{ + "no time difference": {time1: time.UnixMilli(9000000000), time2: time.UnixMilli(9000000000), want: 0.0}, + "positive time difference": {time1: time.UnixMilli(9000001000), time2: time.UnixMilli(9000000000), want: 1000.0}, + "negative time difference": {time1: time.UnixMilli(9000000000), time2: time.UnixMilli(9000001000), want: 1000.0}, + "sub-second time difference": {time1: time.UnixMilli(9000001000), time2: time.UnixMilli(9000000500), want: 500.0}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got := GetTimeDiffMilliseconds(tc.time1, tc.time2) + diff := cmp.Diff(tc.want, got) + if diff != "" { + t.Fatal(diff) + } + }) + } +} + +func TestExtractEventNameStringFromNotificationForMetrics(t *testing.T) { + tests := map[string]struct { + notification Notification + want *types.EventName + wantsError bool + }{ + "Basic Notification": { + notification: Notification{Content: "notification\": {\"body\": \"Attestation missed: 1 validator (999999)\", \"title\": \"Info for epoch 344581\"}}"}, + want: ptr(types.ValidatorMissedAttestationEventName), + wantsError: false, + }, + "Unknown EventName": { + notification: Notification{Content: "Im Mr.Meeseeks look at me"}, + want: nil, + wantsError: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got, err := ExtractEventNameFromNotification(tc.notification) + if tc.wantsError { + if err == nil { + t.Fatalf("expected an error but got nil") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + diff := cmp.Diff(tc.want, got) + if diff != "" { + t.Fatal(diff) + } + } + }) + } +} + +// Cant create a pointer to a constant, so this function helps us refer to a "nil" EventName in the case where an error is returned +func ptr(e types.EventName) *types.EventName { + return &e +} + +func TestGetEventLabelForNotification(t *testing.T) { + tests := map[string]struct { + notification Notification + want string + }{ + "Basic Notification": { + notification: Notification{Content: "notification\": {\"body\": \"Attestation missed: 1 validator (999999)\", \"title\": \"Info for epoch 344581\"}}"}, + want: string(types.ValidatorMissedAttestationEventName), + }, + "Unknown EventName": { + notification: Notification{Content: "Im Mr.Meeseeks look at me"}, + want: UnknownEvent, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got := GetEventLabelForNotification(tc.notification) + diff := cmp.Diff(tc.want, got) + if diff != "" { + t.Fatal(diff) + } + }) + } +} + +func TestCountByEventType(t *testing.T) { + tests := map[string]struct { + notifications []Notification + expectedCount map[string]int + }{ + "Single event type": { + notifications: []Notification{ + {Content: types.EventLabel[types.ValidatorMissedAttestationEventName]}, + }, + expectedCount: map[string]int{ + string(types.ValidatorMissedAttestationEventName): 1, + UnknownEvent: 0, + }, + }, + "Multiple event types": { + notifications: []Notification{ + {Content: types.EventLabel[types.ValidatorMissedAttestationEventName]}, + {Content: types.EventLabel[types.ValidatorMissedAttestationEventName]}, + {Content: types.EventLabel[types.ValidatorIsOfflineEventName]}, + }, + expectedCount: map[string]int{ + string(types.ValidatorMissedAttestationEventName): 2, + string(types.ValidatorIsOfflineEventName): 1, + UnknownEvent: 0, + }, + }, + "Unknown event type": { + notifications: []Notification{ + {Content: "Im Mr.Meeseeks look at me"}, + }, + expectedCount: map[string]int{ + string(types.ValidatorMissedAttestationEventName): 0, + UnknownEvent: 1, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got := CountByEventType(tc.notifications) + for eventType, expectedCount := range tc.expectedCount { + if got[eventType] != expectedCount { + t.Fatalf("expected %d for event type %s, got %d", expectedCount, eventType, got[eventType]) + } + } + }) + } +} + +func TestCountByChannelType(t *testing.T) { + tests := map[string]struct { + notifications []Notification + expectedCount map[string]int + }{ + "Single channel type": { + notifications: []Notification{ + {Channel: string(types.EmailNotificationChannel)}, + }, + expectedCount: map[string]int{ + string(types.EmailNotificationChannel): 1, + }, + }, + "Multiple channel types": { + notifications: []Notification{ + {Channel: string(types.EmailNotificationChannel)}, + {Channel: string(types.EmailNotificationChannel)}, + {Channel: string(types.PushNotificationChannel)}, + }, + expectedCount: map[string]int{ + string(types.EmailNotificationChannel): 2, + string(types.PushNotificationChannel): 1, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got := CountByChannel(tc.notifications) + for channelType, expectedCount := range tc.expectedCount { + if got[channelType] != expectedCount { + t.Fatalf("expected %d for channel type %s, got %d", expectedCount, channelType, got[channelType]) + } + } + }) + } +} From 9c5ce87e70ea92a8d32f814485e5ea874b2f64a5 Mon Sep 17 00:00:00 2001 From: Michael McCreedy Date: Mon, 10 Feb 2025 13:11:04 +0100 Subject: [PATCH 2/3] Re-add integration tests to github deployment workflow --- .github/workflows/backend-integration-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backend-integration-test.yml b/.github/workflows/backend-integration-test.yml index c95d43c709..5bff8cf0aa 100644 --- a/.github/workflows/backend-integration-test.yml +++ b/.github/workflows/backend-integration-test.yml @@ -35,7 +35,7 @@ jobs: working-directory: backend run: go install github.com/swaggo/swag/cmd/swag@latest && swag init --ot json -o ./pkg/api/docs -d ./pkg/api/ -g ./handlers/public.go - go test -failfast ./pkg/api/... -config "${{ secrets.CI_CONFIG_PATH }}" + go test -failfast -tags integration -config "${{ secrets.CI_CONFIG_PATH }}" From 9d090653d335ec84054ffdd8c3719b9d868f4644 Mon Sep 17 00:00:00 2001 From: Sasha Zezulinsky <188990923+sasha-bitfly@users.noreply.github.com> Date: Fri, 7 Feb 2025 20:01:31 +0000 Subject: [PATCH 3/3] use ubuntu-latest instead of self-hosted runner --- .github/workflows/backend-converted-types-check.yml | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/.github/workflows/backend-converted-types-check.yml b/.github/workflows/backend-converted-types-check.yml index b502e296e6..7ad789dc2b 100644 --- a/.github/workflows/backend-converted-types-check.yml +++ b/.github/workflows/backend-converted-types-check.yml @@ -13,6 +13,12 @@ on: - 'frontend/types/api/**' branches: - '*' + workflow_dispatch: + inputs: + branch: + description: 'Branch to run the workflow on' + required: true + default: 'staging' concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -26,13 +32,14 @@ permissions: jobs: build: name: converted-types-check - runs-on: self-hosted + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-go@v4 + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: go-version-file: 'backend/go.mod' cache-dependency-path: 'backend/go.sum' + cache: true - name: Check if all backend-types have been converted to frontend-types working-directory: backend run: |