Skip to content

Commit c438ec8

Browse files
subomimekilishorlah
authored
Release 0.1 (frain-dev#74)
* added API to resend failed events (frain-dev#61) * added API to resend failed events frain-dev#56 * updated response description when event is retried * added golden files * updated retry handling for multiple endpoints * Fixed CI * refactored resend flow * resolved merge conflicts * Added resend to API Co-authored-by: Subomi Oluwalana <subomioluwalana71@gmail.com> * - Paginate events (frain-dev#67) - Change event table description to attempt count - Connect API for event retry - Connect event retry API * Fixed Docker build (frain-dev#71) * added events count to apps API (frain-dev#73) * Fixes (frain-dev#72) * - Paginate events - Change event table description to attempt count - Connect API for event retry - Connect event retry API * - update axios base url - add event response data to details section - connect go to docs to github repo - add number of endpoints to apps table - fix issue with app crashing on clicking scheduled events * update app meta data * add number of events to apps table * Update LICENSE to MPL 2.0 * Updated README.md Co-authored-by: Smart Mekiliuwa <st.nonso@gmail.com> Co-authored-by: Emmanuel AIna <emmanuel.ainaj@gmail.com>
1 parent c0f4247 commit c438ec8

38 files changed

+966
-306
lines changed

Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ FROM gcr.io/distroless/base
2727
COPY --from=build-env /go/bin/cmd /
2828
COPY --from=build-env /go/src/frain-dev/convoy/convoy.json /convoy.json
2929

30-
ENTRYPOINT ["/cmd", "server", "--config", "convoy.json"]
30+
ENTRYPOINT ["/cmd"]
31+
CMD [ "server", "--config", "convoy.json" ]
3132

3233
EXPOSE 8080

LICENSE

+354-21
Large diffs are not rendered by default.

README.md

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Convoy
2-
Convoy is a fast & secure webhooks service. It receives event data from a HTTP API and sends these event data to the configured endpoints.
2+
Convoy is a fast & secure webhooks service. It receives event data from a HTTP API and sends these event data to the configured endpoints. To get started download the [openapi spec](https://github.com/frain-dev/convoy/blob/main/openapi.yaml) into Postman or Insomnia.
33

44
Installation
55
-----------------
@@ -25,9 +25,6 @@ Concepts
2525
3. **Delivery Attempts:** A delivery attempt represents an attempt to send an event to it's respective app's endpoint. It contains the `event body`, `status code` and `response body` received on attempt. The amount of attempts on a failed delivery depends on your configured retry strategy.
2626

2727

28-
How it Works
29-
-----------------
30-
3128
Configuration
3229
-----------------
3330
Convoy is configured using a json file with a sample configuration below:

application.go

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type Application struct {
2929
UpdatedAt primitive.DateTime `json:"updated_at,omitempty" bson:"updated_at,omitempty"`
3030
DeletedAt primitive.DateTime `json:"deleted_at,omitempty" bson:"deleted_at,omitempty"`
3131

32+
Events int64 `json:"events" bson:"-"`
33+
3234
DocumentStatus DocumentStatus `json:"-" bson:"document_status"`
3335
}
3436

cmd/create.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func createMessageCommand(a *app) *cobra.Command {
305305
NumTrials: 0,
306306
IntervalSeconds: intervalSeconds,
307307
RetryLimit: retryLimit,
308-
NextSendTime: primitive.NewDateTimeFromTime(time.Now().Add(time.Duration(intervalSeconds) * time.Second)),
308+
NextSendTime: primitive.NewDateTimeFromTime(time.Now()),
309309
},
310310
AppMetadata: &convoy.AppMetadata{
311311
OrgID: appData.OrgID,

convoy-docker.json

+24-24
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
{
2-
"database": {
3-
"dsn": "mongodb://root:rootpassword@mongodb:27017"
4-
},
5-
"queue": {
6-
"type": "redis",
7-
"redis": {
8-
"dsn": "redis://redis_server:6379"
9-
}
10-
},
11-
"server": {
12-
"http": {
13-
"port": 5005
14-
}
15-
},
16-
"auth": {
17-
"type": "none"
18-
},
19-
"strategy": {
20-
"type": "default",
21-
"default": {
22-
"intervalSeconds": 120,
23-
"retryLimit": 10
24-
}
25-
}
2+
"database": {
3+
"dsn": "mongodb://root:rootpassword@mongodb:27017"
4+
},
5+
"queue": {
6+
"type": "redis",
7+
"redis": {
8+
"dsn": "redis://redis_server:6379"
9+
}
10+
},
11+
"server": {
12+
"http": {
13+
"port": 5005
14+
}
15+
},
16+
"auth": {
17+
"type": "none"
18+
},
19+
"strategy": {
20+
"type": "default",
21+
"default": {
22+
"intervalSeconds": 120,
23+
"retryLimit": 10
24+
}
25+
}
2626
}

convoy.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"database": {
3-
"dsn": "mongodb://root:rootpassword@localhost:27017"
3+
"dsn": "mongodb://root:rootpassword@localhost:27037"
44
},
55
"queue": {
66
"type": "redis",

datastore/application.go

+43
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,17 @@ func (db *appRepo) LoadApplications(ctx context.Context, orgId string) ([]convoy
7272
return apps, err
7373
}
7474

75+
msgCollection := db.innerDB.Collection(MsgCollection)
76+
for i, app := range apps {
77+
filter = bson.M{"app_id": app.UID, "document_status": bson.M{"$ne": convoy.DeletedDocumentStatus}}
78+
count, err := msgCollection.CountDocuments(ctx, filter)
79+
if err != nil {
80+
log.Errorf("failed to count events in %s. Reason: %s", app.UID, err)
81+
return apps, err
82+
}
83+
apps[i].Events = count
84+
}
85+
7586
return apps, nil
7687
}
7788

@@ -89,6 +100,17 @@ func (db *appRepo) LoadApplicationsPagedByOrgId(ctx context.Context, orgId strin
89100
applications = make([]convoy.Application, 0)
90101
}
91102

103+
msgCollection := db.innerDB.Collection(MsgCollection)
104+
for i, app := range applications {
105+
filter = bson.M{"app_id": app.UID, "document_status": bson.M{"$ne": convoy.DeletedDocumentStatus}}
106+
count, err := msgCollection.CountDocuments(ctx, filter)
107+
if err != nil {
108+
log.Errorf("failed to count events in %s. Reason: %s", app.UID, err)
109+
return applications, pager.PaginationData{}, err
110+
}
111+
applications[i].Events = count
112+
}
113+
92114
return applications, paginatedData.Pagination, nil
93115
}
94116

@@ -125,6 +147,17 @@ func (db *appRepo) SearchApplicationsByOrgId(ctx context.Context, orgId string,
125147
return apps, err
126148
}
127149

150+
msgCollection := db.innerDB.Collection(MsgCollection)
151+
for i, app := range apps {
152+
filter = bson.M{"app_id": app.UID, "document_status": bson.M{"$ne": convoy.DeletedDocumentStatus}}
153+
count, err := msgCollection.CountDocuments(ctx, filter)
154+
if err != nil {
155+
log.Errorf("failed to count events in %s. Reason: %s", app.UID, err)
156+
return apps, err
157+
}
158+
apps[i].Events = count
159+
}
160+
128161
return apps, nil
129162
}
130163

@@ -139,7 +172,17 @@ func (db *appRepo) FindApplicationByID(ctx context.Context,
139172
Decode(&app)
140173
if errors.Is(err, mongo.ErrNoDocuments) {
141174
err = convoy.ErrApplicationNotFound
175+
return app, err
176+
}
177+
178+
msgCollection := db.innerDB.Collection(MsgCollection)
179+
filter = bson.M{"app_id": app.UID, "document_status": bson.M{"$ne": convoy.DeletedDocumentStatus}}
180+
count, err := msgCollection.CountDocuments(ctx, filter)
181+
if err != nil {
182+
log.Errorf("failed to count events in %s. Reason: %s", app.UID, err)
183+
return app, err
142184
}
185+
app.Events = count
143186

144187
return app, err
145188
}

docker-compose.yml

+24-39
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,29 @@
11
version: "3"
22

33
services:
4-
web:
5-
build:
6-
context: .
7-
dockerfile: Dockerfile
8-
entrypoint: ["./cmd", "server", "--config", "convoy-docker.json"]
9-
ports:
10-
- 5005:5005
11-
links:
12-
- mongodb:localhost
13-
# - redis_server:localhost
14-
depends_on:
15-
- mongodb
16-
- redis_server
4+
web:
5+
build:
6+
context: .
7+
dockerfile: Dockerfile
8+
entrypoint: ["./cmd", "server", "--config", "convoy.json"]
9+
ports:
10+
- 5005:5005
11+
volumes:
12+
- ./convoy-docker.json:/convoy.json
13+
restart: on-failure
14+
depends_on:
15+
- mongodb
16+
- redis_server
1717

18-
# nats:
19-
# image: nats
20-
# ports:
21-
# - "8222:8222"
22-
# nats-1:
23-
# image: nats
24-
# command: "--cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
25-
# depends_on: ["nats"]
26-
# nats-2:
27-
# image: nats
28-
# command: "--cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
29-
# depends_on: ["nats"]
18+
mongodb:
19+
image: mongo:latest
20+
environment:
21+
MONGO_INITDB_ROOT_USERNAME: root
22+
MONGO_INITDB_ROOT_PASSWORD: rootpassword
23+
volumes:
24+
- ./data/mongo:/data/db
3025

31-
mongodb:
32-
image: mongo:latest
33-
environment:
34-
MONGO_INITDB_ROOT_USERNAME: root
35-
MONGO_INITDB_ROOT_PASSWORD: rootpassword
36-
ports:
37-
- "27037:27017"
38-
volumes:
39-
- ./data/mongo:/data/db
40-
41-
redis_server:
42-
image: redis:alpine
43-
ports:
44-
- "8379:6379"
26+
redis_server:
27+
image: redis:alpine
28+
ports:
29+
- "8379:6379"

server/application.go

+6
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ func (a *applicationHandler) GetAppMessage(w http.ResponseWriter, r *http.Reques
132132
*getMessageFromContext(r.Context()), http.StatusOK))
133133
}
134134

135+
func (a *applicationHandler) ResendAppMessage(w http.ResponseWriter, r *http.Request) {
136+
137+
_ = render.Render(w, r, newServerResponse("App event processed for retry successfully",
138+
*getMessageFromContext(r.Context()), http.StatusOK))
139+
}
140+
135141
func (a *applicationHandler) GetAppMessagesPaged(w http.ResponseWriter, r *http.Request) {
136142

137143
_ = render.Render(w, r, newServerResponse("App events fetched successfully",

server/message.go

+33-21
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func ensureNewMessage(appRepo convoy.ApplicationRepository, msgRepo convoy.Messa
9393
NumTrials: 0,
9494
IntervalSeconds: intervalSeconds,
9595
RetryLimit: retryLimit,
96-
NextSendTime: primitive.NewDateTimeFromTime(time.Now().Add(time.Duration(intervalSeconds) * time.Second)),
96+
NextSendTime: primitive.NewDateTimeFromTime(time.Now()),
9797
},
9898
MessageAttempts: make([]convoy.MessageAttempt, 0),
9999
CreatedAt: primitive.NewDateTimeFromTime(time.Now()),
@@ -141,30 +141,12 @@ func fetchAllMessages(msgRepo convoy.MessageRepository) func(next http.Handler)
141141
}
142142
}
143143

144-
func fetchAppMessages(appRepo convoy.ApplicationRepository, msgRepo convoy.MessageRepository) func(next http.Handler) http.Handler {
144+
func fetchAppMessages(msgRepo convoy.MessageRepository) func(next http.Handler) http.Handler {
145145
return func(next http.Handler) http.Handler {
146-
147146
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148147

149148
pageable := getPageableFromContext(r.Context())
150-
151-
appID := chi.URLParam(r, "appID")
152-
app, err := appRepo.FindApplicationByID(r.Context(), appID)
153-
if err != nil {
154-
155-
msg := "an error occurred while retrieving app details"
156-
statusCode := http.StatusInternalServerError
157-
158-
if errors.Is(err, convoy.ErrApplicationNotFound) {
159-
msg = err.Error()
160-
statusCode = http.StatusNotFound
161-
}
162-
163-
log.Errorln("error while fetching app - ", err)
164-
165-
_ = render.Render(w, r, newErrorResponse(msg, statusCode))
166-
return
167-
}
149+
app := getApplicationFromContext(r.Context())
168150

169151
m, paginationData, err := msgRepo.LoadMessagesPagedByAppId(r.Context(), app.UID, pageable)
170152
if err != nil {
@@ -250,3 +232,33 @@ func findMessageDeliveryAttempt(attempts *[]convoy.MessageAttempt, id string) (*
250232
}
251233
return nil, convoy.ErrMessageDeliveryAttemptNotFound
252234
}
235+
236+
func resendMessage(msgRepo convoy.MessageRepository) func(next http.Handler) http.Handler {
237+
return func(next http.Handler) http.Handler {
238+
239+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
240+
241+
msg := getMessageFromContext(r.Context())
242+
243+
if msg.Status == convoy.SuccessMessageStatus {
244+
_ = render.Render(w, r, newErrorResponse("event already sent", http.StatusBadRequest))
245+
return
246+
}
247+
248+
if msg.Status != convoy.FailureMessageStatus {
249+
_ = render.Render(w, r, newErrorResponse("cannot resend event that did not fail previously", http.StatusBadRequest))
250+
return
251+
}
252+
253+
msg.Status = convoy.ScheduledMessageStatus
254+
err := msgRepo.UpdateStatusOfMessages(r.Context(), []convoy.Message{*msg}, convoy.ScheduledMessageStatus)
255+
if err != nil {
256+
_ = render.Render(w, r, newErrorResponse("an error occurred while trying to resend event", http.StatusInternalServerError))
257+
return
258+
}
259+
260+
r = r.WithContext(setMessageInContext(r.Context(), msg))
261+
next.ServeHTTP(w, r)
262+
})
263+
}
264+
}

0 commit comments

Comments
 (0)