Skip to content

Commit 908b57f

Browse files
jirevweDotunjsubomi
authored
Newrelic integration (frain-dev#1621)
* feat: add sqlx newrelic integration * fix: optimise migration queries (frain-dev#1601) * update changelog (frain-dev#1602) * feat: update changelog * feat: update changelog * Bump version to v23.05.5 * feat: add apm to worker logging middleware * feat: add apm to stream server * feat: add apm to ingest server * feat: add cli flag overrides to other top level commands * feat: update stream client tests * feat: update pubsub tests * chore: use rdb in rate limiter --------- Co-authored-by: Dotun Jolaoso <dotunjolaosho@gmail.com> Co-authored-by: Subomi Oluwalana <subomioluwalana71@gmail.com>
1 parent 1bc8f3f commit 908b57f

File tree

22 files changed

+411
-184
lines changed

22 files changed

+411
-184
lines changed

cmd/ingest/ingest.go

+72-1
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,38 @@
11
package ingest
22

33
import (
4+
"context"
45
"github.com/frain-dev/convoy/config"
56
"github.com/frain-dev/convoy/database/postgres"
67
"github.com/frain-dev/convoy/internal/pkg/cli"
78
"github.com/frain-dev/convoy/internal/pkg/pubsub"
89
"github.com/frain-dev/convoy/pkg/log"
10+
"github.com/frain-dev/convoy/util"
911
"github.com/spf13/cobra"
1012
)
1113

1214
func AddIngestCommand(a *cli.App) *cobra.Command {
1315
var interval int
16+
17+
var newRelicApp string
18+
var newRelicKey string
19+
var newRelicTracerEnabled bool
20+
var newRelicConfigEnabled bool
21+
1422
cmd := &cobra.Command{
1523
Use: "ingest",
1624
Short: "Ingest webhook events from Pub/Sub streams",
1725
RunE: func(cmd *cobra.Command, args []string) error {
26+
// override config with cli flags
27+
cliConfig, err := buildCliFlagConfiguration(cmd)
28+
if err != nil {
29+
return err
30+
}
31+
32+
if err = config.Override(cliConfig); err != nil {
33+
return err
34+
}
35+
1836
cfg, err := config.Get()
1937
if err != nil {
2038
a.Logger.Errorf("Failed to retrieve config: %v", err)
@@ -38,12 +56,65 @@ func AddIngestCommand(a *cli.App) *cobra.Command {
3856
sourcePool := pubsub.NewSourcePool(lo)
3957
sourceLoader := pubsub.NewSourceLoader(endpointRepo, sourceRepo, projectRepo, a.Queue, sourcePool, lo)
4058

41-
sourceLoader.Run(interval)
59+
sourceLoader.Run(context.Background(), interval)
4260

4361
return nil
4462
},
4563
}
4664

4765
cmd.Flags().IntVar(&interval, "interval", 300, "the time interval, measured in seconds, at which the database should be polled for new pub sub sources")
66+
cmd.Flags().BoolVar(&newRelicConfigEnabled, "new-relic-config-enabled", false, "Enable new-relic config")
67+
cmd.Flags().BoolVar(&newRelicTracerEnabled, "new-relic-tracer-enabled", false, "Enable new-relic distributed tracer")
68+
cmd.Flags().StringVar(&newRelicApp, "new-relic-app", "", "NewRelic application name")
69+
cmd.Flags().StringVar(&newRelicKey, "new-relic-key", "", "NewRelic application license key")
70+
4871
return cmd
4972
}
73+
74+
func buildCliFlagConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
75+
c := &config.Configuration{}
76+
77+
// CONVOY_NEWRELIC_APP_NAME
78+
newReplicApp, err := cmd.Flags().GetString("new-relic-app")
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
if !util.IsStringEmpty(newReplicApp) {
84+
c.Tracer.NewRelic.AppName = newReplicApp
85+
}
86+
87+
// CONVOY_NEWRELIC_LICENSE_KEY
88+
newReplicKey, err := cmd.Flags().GetString("new-relic-key")
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
if !util.IsStringEmpty(newReplicKey) {
94+
c.Tracer.NewRelic.LicenseKey = newReplicKey
95+
}
96+
97+
// CONVOY_NEWRELIC_CONFIG_ENABLED
98+
isNRCESet := cmd.Flags().Changed("new-relic-config-enabled")
99+
if isNRCESet {
100+
newReplicConfigEnabled, err := cmd.Flags().GetBool("new-relic-config-enabled")
101+
if err != nil {
102+
return nil, err
103+
}
104+
105+
c.Tracer.NewRelic.ConfigEnabled = newReplicConfigEnabled
106+
}
107+
108+
// CONVOY_NEWRELIC_DISTRIBUTED_TRACER_ENABLED
109+
isNRTESet := cmd.Flags().Changed("new-relic-tracer-enabled")
110+
if isNRTESet {
111+
newReplicTracerEnabled, err := cmd.Flags().GetBool("new-relic-tracer-enabled")
112+
if err != nil {
113+
return nil, err
114+
}
115+
116+
c.Tracer.NewRelic.DistributedTracerEnabled = newReplicTracerEnabled
117+
}
118+
119+
return c, nil
120+
}

cmd/server/server.go

+8-10
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ func AddServerCommand(a *cli.App) *cobra.Command {
3737
var smtpPassword string
3838
var smtpReplyTo string
3939
var smtpFrom string
40-
var newReplicApp string
41-
var newReplicKey string
40+
var newRelicApp string
41+
var newRelicKey string
4242
var typesenseApiKey string
4343
var promaddr string
4444

@@ -49,10 +49,9 @@ func AddServerCommand(a *cli.App) *cobra.Command {
4949
var ssl bool
5050
var disableEndpoint bool
5151
var replayAttacks bool
52-
var multipleTenants bool
5352
var nativeRealmEnabled bool
54-
var newReplicTracerEnabled bool
55-
var newReplicConfigEnabled bool
53+
var newRelicTracerEnabled bool
54+
var newRelicConfigEnabled bool
5655

5756
var port uint32
5857
var smtpPort uint32
@@ -107,8 +106,8 @@ func AddServerCommand(a *cli.App) *cobra.Command {
107106
cmd.Flags().StringVar(&smtpPassword, "smtp-password", "", "SMTP authentication password")
108107
cmd.Flags().StringVar(&smtpFrom, "smtp-from", "", "Sender email address")
109108
cmd.Flags().StringVar(&smtpReplyTo, "smtp-reply-to", "", "Email address to reply to")
110-
cmd.Flags().StringVar(&newReplicApp, "new-relic-app", "", "NewRelic application name")
111-
cmd.Flags().StringVar(&newReplicKey, "new-relic-key", "", "NewRelic application license key")
109+
cmd.Flags().StringVar(&newRelicApp, "new-relic-app", "", "NewRelic application name")
110+
cmd.Flags().StringVar(&newRelicKey, "new-relic-key", "", "NewRelic application license key")
112111
cmd.Flags().StringVar(&searcher, "searcher", "", "Searcher")
113112
cmd.Flags().StringVar(&typesenseHost, "typesense-host", "", "Typesense Host")
114113
cmd.Flags().StringVar(&typesenseApiKey, "typesense-api-key", "", "Typesense Api Key")
@@ -118,9 +117,8 @@ func AddServerCommand(a *cli.App) *cobra.Command {
118117
cmd.Flags().BoolVar(&nativeRealmEnabled, "native", false, "Enable native-realm authentication")
119118
cmd.Flags().BoolVar(&disableEndpoint, "disable-endpoint", false, "Disable all application endpoints")
120119
cmd.Flags().BoolVar(&replayAttacks, "replay-attacks", false, "Enable feature to prevent replay attacks")
121-
cmd.Flags().BoolVar(&newReplicConfigEnabled, "new-relic-config-enabled", false, "Enable new-relic config")
122-
cmd.Flags().BoolVar(&multipleTenants, "multi-tenant", false, "Start convoy in single- or multi-tenant mode")
123-
cmd.Flags().BoolVar(&newReplicTracerEnabled, "new-relic-tracer-enabled", false, "Enable new-relic distributed tracer")
120+
cmd.Flags().BoolVar(&newRelicConfigEnabled, "new-relic-config-enabled", false, "Enable new-relic config")
121+
cmd.Flags().BoolVar(&newRelicTracerEnabled, "new-relic-tracer-enabled", false, "Enable new-relic distributed tracer")
124122

125123
cmd.Flags().Uint32Var(&port, "port", 0, "Server port")
126124
cmd.Flags().Uint32Var(&smtpPort, "smtp-port", 0, "Server port")

cmd/stream/stream.go

+80-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package stream
22

33
import (
4+
"context"
45
"fmt"
6+
"github.com/frain-dev/convoy/util"
57

68
"github.com/frain-dev/convoy"
79
"github.com/frain-dev/convoy/auth/realm_chain"
@@ -19,13 +21,28 @@ func AddStreamCommand(a *cli.App) *cobra.Command {
1921
var socketPort uint32
2022
var logLevel string
2123

24+
var newRelicApp string
25+
var newRelicKey string
26+
var newRelicTracerEnabled bool
27+
var newRelicConfigEnabled bool
28+
2229
cmd := &cobra.Command{
2330
Use: "stream",
2431
Short: "Start a websocket server to pipe events to a convoy cli instance",
2532
RunE: func(cmd *cobra.Command, args []string) error {
26-
c, err := config.Get()
33+
// override config with cli flags
34+
cliConfig, err := buildCliFlagConfiguration(cmd)
2735
if err != nil {
28-
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
36+
return err
37+
}
38+
39+
if err = config.Override(cliConfig); err != nil {
40+
return err
41+
}
42+
43+
cfg, err := config.Get()
44+
if err != nil {
45+
a.Logger.Errorf("Failed to retrieve config: %v", err)
2946
return err
3047
}
3148

@@ -60,19 +77,19 @@ func AddStreamCommand(a *cli.App) *cobra.Command {
6077
EventDeliveryRepo: eventDeliveryRepo,
6178
}
6279

63-
h := socket.NewHub()
64-
h.Start()
65-
6680
lo := a.Logger.(*log.Logger)
6781
lo.SetPrefix("stream server")
6882

69-
lvl, err := log.ParseLevel(c.Logger.Level)
83+
lvl, err := log.ParseLevel(cfg.Logger.Level)
7084
if err != nil {
7185
return err
7286
}
7387
lo.SetLevel(lvl)
7488

75-
handler := socket.BuildRoutes(h, r)
89+
h := socket.NewHub()
90+
h.Start(context.Background())
91+
92+
handler := socket.BuildRoutes(r)
7693

7794
consumer := worker.NewConsumer(a.Queue, lo)
7895
consumer.RegisterHandlers(convoy.StreamCliEventsProcessor, h.EventDeliveryCLiHandler(r))
@@ -81,8 +98,8 @@ func AddStreamCommand(a *cli.App) *cobra.Command {
8198
fmt.Println("Registering Stream Server Consumer...")
8299
consumer.Start()
83100

84-
if c.Server.HTTP.SocketPort != 0 {
85-
socketPort = c.Server.HTTP.SocketPort
101+
if cfg.Server.HTTP.SocketPort != 0 {
102+
socketPort = cfg.Server.HTTP.SocketPort
86103
}
87104

88105
srv := server.NewServer(socketPort, func() { h.Stop() })
@@ -98,5 +115,59 @@ func AddStreamCommand(a *cli.App) *cobra.Command {
98115

99116
cmd.Flags().Uint32Var(&socketPort, "socket-port", 5008, "Socket port")
100117
cmd.Flags().StringVar(&logLevel, "log-level", "error", "stream log level")
118+
119+
cmd.Flags().BoolVar(&newRelicConfigEnabled, "new-relic-config-enabled", false, "Enable new-relic config")
120+
cmd.Flags().BoolVar(&newRelicTracerEnabled, "new-relic-tracer-enabled", false, "Enable new-relic distributed tracer")
121+
cmd.Flags().StringVar(&newRelicApp, "new-relic-app", "", "NewRelic application name")
122+
cmd.Flags().StringVar(&newRelicKey, "new-relic-key", "", "NewRelic application license key")
123+
101124
return cmd
102125
}
126+
127+
func buildCliFlagConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
128+
c := &config.Configuration{}
129+
130+
// CONVOY_NEWRELIC_APP_NAME
131+
newReplicApp, err := cmd.Flags().GetString("new-relic-app")
132+
if err != nil {
133+
return nil, err
134+
}
135+
136+
if !util.IsStringEmpty(newReplicApp) {
137+
c.Tracer.NewRelic.AppName = newReplicApp
138+
}
139+
140+
// CONVOY_NEWRELIC_LICENSE_KEY
141+
newReplicKey, err := cmd.Flags().GetString("new-relic-key")
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
if !util.IsStringEmpty(newReplicKey) {
147+
c.Tracer.NewRelic.LicenseKey = newReplicKey
148+
}
149+
150+
// CONVOY_NEWRELIC_CONFIG_ENABLED
151+
isNRCESet := cmd.Flags().Changed("new-relic-config-enabled")
152+
if isNRCESet {
153+
newReplicConfigEnabled, err := cmd.Flags().GetBool("new-relic-config-enabled")
154+
if err != nil {
155+
return nil, err
156+
}
157+
158+
c.Tracer.NewRelic.ConfigEnabled = newReplicConfigEnabled
159+
}
160+
161+
// CONVOY_NEWRELIC_DISTRIBUTED_TRACER_ENABLED
162+
isNRTESet := cmd.Flags().Changed("new-relic-tracer-enabled")
163+
if isNRTESet {
164+
newReplicTracerEnabled, err := cmd.Flags().GetBool("new-relic-tracer-enabled")
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
c.Tracer.NewRelic.DistributedTracerEnabled = newReplicTracerEnabled
170+
}
171+
172+
return c, nil
173+
}

cmd/worker/worker.go

+51
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ func AddWorkerCommand(a *cli.App) *cobra.Command {
2727
var workerPort uint32
2828
var logLevel string
2929

30+
var newRelicApp string
31+
var newRelicKey string
32+
var newRelicTracerEnabled bool
33+
var newRelicConfigEnabled bool
34+
3035
cmd := &cobra.Command{
3136
Use: "worker",
3237
Short: "Start worker instance",
@@ -165,6 +170,10 @@ func AddWorkerCommand(a *cli.App) *cobra.Command {
165170

166171
cmd.Flags().Uint32Var(&workerPort, "worker-port", 5006, "Worker port")
167172
cmd.Flags().StringVar(&logLevel, "log-level", "", "scheduler log level")
173+
cmd.Flags().BoolVar(&newRelicConfigEnabled, "new-relic-config-enabled", false, "Enable new-relic config")
174+
cmd.Flags().BoolVar(&newRelicTracerEnabled, "new-relic-tracer-enabled", false, "Enable new-relic distributed tracer")
175+
cmd.Flags().StringVar(&newRelicApp, "new-relic-app", "", "NewRelic application name")
176+
cmd.Flags().StringVar(&newRelicKey, "new-relic-key", "", "NewRelic application license key")
168177

169178
return cmd
170179
}
@@ -192,5 +201,47 @@ func buildWorkerCliConfiguration(cmd *cobra.Command) (*config.Configuration, err
192201

193202
c.Server.HTTP.WorkerPort = workerPort
194203

204+
// CONVOY_NEWRELIC_APP_NAME
205+
newReplicApp, err := cmd.Flags().GetString("new-relic-app")
206+
if err != nil {
207+
return nil, err
208+
}
209+
210+
if !util.IsStringEmpty(newReplicApp) {
211+
c.Tracer.NewRelic.AppName = newReplicApp
212+
}
213+
214+
// CONVOY_NEWRELIC_LICENSE_KEY
215+
newReplicKey, err := cmd.Flags().GetString("new-relic-key")
216+
if err != nil {
217+
return nil, err
218+
}
219+
220+
if !util.IsStringEmpty(newReplicKey) {
221+
c.Tracer.NewRelic.LicenseKey = newReplicKey
222+
}
223+
224+
// CONVOY_NEWRELIC_CONFIG_ENABLED
225+
isNRCESet := cmd.Flags().Changed("new-relic-config-enabled")
226+
if isNRCESet {
227+
newReplicConfigEnabled, err := cmd.Flags().GetBool("new-relic-config-enabled")
228+
if err != nil {
229+
return nil, err
230+
}
231+
232+
c.Tracer.NewRelic.ConfigEnabled = newReplicConfigEnabled
233+
}
234+
235+
// CONVOY_NEWRELIC_DISTRIBUTED_TRACER_ENABLED
236+
isNRTESet := cmd.Flags().Changed("new-relic-tracer-enabled")
237+
if isNRTESet {
238+
newReplicTracerEnabled, err := cmd.Flags().GetBool("new-relic-tracer-enabled")
239+
if err != nil {
240+
return nil, err
241+
}
242+
243+
c.Tracer.NewRelic.DistributedTracerEnabled = newReplicTracerEnabled
244+
}
245+
195246
return c, nil
196247
}

database/postgres/postgres.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
"github.com/frain-dev/convoy/config"
1313
"github.com/jmoiron/sqlx"
14-
_ "github.com/lib/pq"
14+
_ "github.com/newrelic/go-agent/v3/integrations/nrpq"
1515
)
1616

1717
const pkgName = "postgres"
@@ -27,7 +27,7 @@ type Postgres struct {
2727

2828
func NewDB(cfg config.Configuration) (*Postgres, error) {
2929
dbConfig := cfg.Database
30-
db, err := sqlx.Connect("postgres", dbConfig.BuildDsn())
30+
db, err := sqlx.Connect("nrpostgres", dbConfig.BuildDsn())
3131
if err != nil {
3232
return nil, fmt.Errorf("[%s]: failed to open database - %v", pkgName, err)
3333
}

datastore/models.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datastore
22

33
import (
4+
"context"
45
"database/sql/driver"
56
"encoding/json"
67
"errors"
@@ -124,7 +125,7 @@ type (
124125
StorageType string
125126
KeyType string
126127
PubSubType string
127-
PubSubHandler func(*Source, string) error
128+
PubSubHandler func(context.Context, *Source, string) error
128129
MetaEventType string
129130
HookEventType string
130131
)

0 commit comments

Comments
 (0)