Skip to content

Commit aebfe9c

Browse files
Feature - Add support for RabbitMQ source (frain-dev#1911)
* feat(amqp): adding support to amqp * feat(amqp): adding support to amqp * feat(amqp): adding support to amqp Cleared unrelevant files * feat(amqp): adding support to amqp - Fixed failing tests * feat(amqp): adding support to amqp - Fixed failing tests * feat(amqp): adding support to amqp - Fixed lint - Added go_test_stub.txt * feat(amqp): adding support to amqp - Reverted docs.go * task(webhooks): add webhooks support - Update required field on host and queue * task(webhooks): add webhooks support - Fixed PR comments - Added DLQ in case of failure - Ack / Nack messages - Updated UI * feat(amqp): adding support to amqp - Reverted docs.go * feat(amqp): adding support to amqp - Reverted api/ui/build/go_test_stub.txt * feat(amqp): adding support to amqp - Cleaned files * feat(amqp): adding support to amqp - Using consume with ctx - Checking k.ctx is error and exiting function to release connections * feat(amqp): adding support to amqp - Removed check for ctx in msgs loop * task(webhooks): add webhooks support - Fixed linter --------- Co-authored-by: Nitzan Goldfeder <n88holy@gmail.com>
1 parent b909653 commit aebfe9c

File tree

20 files changed

+6602
-2670
lines changed

20 files changed

+6602
-2670
lines changed

api/models/source.go

+48
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ type PubSubConfig struct {
273273
Sqs *SQSPubSubConfig `json:"sqs"`
274274
Google *GooglePubSubConfig `json:"google"`
275275
Kafka *KafkaPubSubConfig `json:"kafka"`
276+
Amqp *AmqpPubSubconfig `json:"amqp"`
276277
}
277278

278279
func (pc *PubSubConfig) Transform() *datastore.PubSubConfig {
@@ -286,6 +287,7 @@ func (pc *PubSubConfig) Transform() *datastore.PubSubConfig {
286287
Sqs: pc.Sqs.transform(),
287288
Google: pc.Google.transform(),
288289
Kafka: pc.Kafka.transform(),
290+
Amqp: pc.Amqp.transform(),
289291
}
290292
}
291293

@@ -334,6 +336,52 @@ type KafkaPubSubConfig struct {
334336
Auth *KafkaAuth `json:"auth"`
335337
}
336338

339+
type AmqpPubSubconfig struct {
340+
Schema string `json:"schema"`
341+
Host string `json:"host"`
342+
Port string `json:"port"`
343+
Auth *AmqpAuth `json:"auth"`
344+
Queue string `json:"queue"`
345+
BindedExchange *AmqpExchange `json:"bindExchange"`
346+
DeadLetterExchange *string `json:"deadLetterExchange"`
347+
}
348+
349+
type AmqpAuth struct {
350+
User string `json:"user"`
351+
Password string `json:"password"`
352+
}
353+
354+
type AmqpExchange struct {
355+
Exchange *string `json:"exchange"`
356+
RoutingKey *string `json:"routingKey"`
357+
}
358+
359+
func (ac *AmqpPubSubconfig) transform() *datastore.AmqpPubSubConfig {
360+
if ac == nil {
361+
return nil
362+
}
363+
364+
bind := AmqpExchange{
365+
Exchange: nil,
366+
RoutingKey: nil,
367+
}
368+
369+
if ac.BindedExchange != nil {
370+
bind = *ac.BindedExchange
371+
}
372+
373+
return &datastore.AmqpPubSubConfig{
374+
Schema: ac.Schema,
375+
Host: ac.Host,
376+
Port: ac.Port,
377+
Queue: ac.Queue,
378+
BindedExchange: bind.Exchange,
379+
RoutingKey: *bind.RoutingKey,
380+
Auth: (*datastore.AmqpCredentials)(ac.Auth),
381+
DeadLetterExchange: ac.DeadLetterExchange,
382+
}
383+
}
384+
337385
func (kc *KafkaPubSubConfig) transform() *datastore.KafkaPubSubConfig {
338386
if kc == nil {
339387
return nil

datastore/models.go

+18
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ const (
193193
SqsPubSub PubSubType = "sqs"
194194
GooglePubSub PubSubType = "google"
195195
KafkaPubSub PubSubType = "kafka"
196+
AmqpPubSub PubSubType = "amqp"
196197
)
197198

198199
func (s SourceProvider) IsValid() bool {
@@ -1068,6 +1069,7 @@ type PubSubConfig struct {
10681069
Sqs *SQSPubSubConfig `json:"sqs" db:"sqs"`
10691070
Google *GooglePubSubConfig `json:"google" db:"google"`
10701071
Kafka *KafkaPubSubConfig `json:"kafka" db:"kafka"`
1072+
Amqp *AmqpPubSubConfig `json:"amqp" db:"amqp"`
10711073
}
10721074

10731075
func (p *PubSubConfig) Scan(value interface{}) error {
@@ -1115,6 +1117,22 @@ type KafkaPubSubConfig struct {
11151117
Auth *KafkaAuth `json:"auth" db:"auth"`
11161118
}
11171119

1120+
type AmqpPubSubConfig struct {
1121+
Schema string `json:"schema" db:"schema"`
1122+
Host string `json:"host" db:"host"`
1123+
Port string `json:"port" db:"port"`
1124+
Queue string `json:"queue" db:"queue"`
1125+
Auth *AmqpCredentials `json:"auth" db:"auth"`
1126+
BindedExchange *string `json:"bindedExchange" db:"binded_exchange"`
1127+
RoutingKey string `json:"routingKey" db:"routing_key"`
1128+
DeadLetterExchange *string `json:"deadLetterExchange" db:"dead_letter_exchange"`
1129+
}
1130+
1131+
type AmqpCredentials struct {
1132+
User string `json:"user" db:"user"`
1133+
Password string `json:"password" db:"password"`
1134+
}
1135+
11181136
type KafkaAuth struct {
11191137
Type string `json:"type" db:"type"`
11201138
Hash string `json:"hash" db:"hash"`

docs/swagger.json

+98-2
Original file line numberDiff line numberDiff line change
@@ -5274,6 +5274,46 @@
52745274
}
52755275
}
52765276
},
5277+
"datastore.AmqpCredentials": {
5278+
"type": "object",
5279+
"properties": {
5280+
"password": {
5281+
"type": "string"
5282+
},
5283+
"user": {
5284+
"type": "string"
5285+
}
5286+
}
5287+
},
5288+
"datastore.AmqpPubSubConfig": {
5289+
"type": "object",
5290+
"properties": {
5291+
"auth": {
5292+
"$ref": "#/definitions/datastore.AmqpCredentials"
5293+
},
5294+
"bindedExchange": {
5295+
"type": "string"
5296+
},
5297+
"deadLetterExchange": {
5298+
"type": "string"
5299+
},
5300+
"host": {
5301+
"type": "string"
5302+
},
5303+
"port": {
5304+
"type": "string"
5305+
},
5306+
"queue": {
5307+
"type": "string"
5308+
},
5309+
"routingKey": {
5310+
"type": "string"
5311+
},
5312+
"schema": {
5313+
"type": "string"
5314+
}
5315+
}
5316+
},
52775317
"datastore.ApiKey": {
52785318
"type": "object",
52795319
"properties": {
@@ -5809,6 +5849,9 @@
58095849
"datastore.PubSubConfig": {
58105850
"type": "object",
58115851
"properties": {
5852+
"amqp": {
5853+
"$ref": "#/definitions/datastore.AmqpPubSubConfig"
5854+
},
58125855
"google": {
58135856
"$ref": "#/definitions/datastore.GooglePubSubConfig"
58145857
},
@@ -5831,12 +5874,14 @@
58315874
"enum": [
58325875
"sqs",
58335876
"google",
5834-
"kafka"
5877+
"kafka",
5878+
"amqp"
58355879
],
58365880
"x-enum-varnames": [
58375881
"SqsPubSub",
58385882
"GooglePubSub",
5839-
"KafkaPubSub"
5883+
"KafkaPubSub",
5884+
"AmqpPubSub"
58405885
]
58415886
},
58425887
"datastore.RateLimitConfiguration": {
@@ -6081,6 +6126,54 @@
60816126
}
60826127
}
60836128
},
6129+
"models.AmqpAuth": {
6130+
"type": "object",
6131+
"properties": {
6132+
"password": {
6133+
"type": "string"
6134+
},
6135+
"user": {
6136+
"type": "string"
6137+
}
6138+
}
6139+
},
6140+
"models.AmqpExchange": {
6141+
"type": "object",
6142+
"properties": {
6143+
"exchange": {
6144+
"type": "string"
6145+
},
6146+
"routingKey": {
6147+
"type": "string"
6148+
}
6149+
}
6150+
},
6151+
"models.AmqpPubSubconfig": {
6152+
"type": "object",
6153+
"properties": {
6154+
"auth": {
6155+
"$ref": "#/definitions/models.AmqpAuth"
6156+
},
6157+
"bindExchange": {
6158+
"$ref": "#/definitions/models.AmqpExchange"
6159+
},
6160+
"deadLetterExchange": {
6161+
"type": "string"
6162+
},
6163+
"host": {
6164+
"type": "string"
6165+
},
6166+
"port": {
6167+
"type": "string"
6168+
},
6169+
"queue": {
6170+
"type": "string"
6171+
},
6172+
"schema": {
6173+
"type": "string"
6174+
}
6175+
}
6176+
},
60846177
"models.ApiKey": {
60856178
"type": "object",
60866179
"properties": {
@@ -6819,6 +6912,9 @@
68196912
"models.PubSubConfig": {
68206913
"type": "object",
68216914
"properties": {
6915+
"amqp": {
6916+
"$ref": "#/definitions/models.AmqpPubSubconfig"
6917+
},
68226918
"google": {
68236919
"$ref": "#/definitions/models.GooglePubSubConfig"
68246920
},

docs/swagger.yaml

+63
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,32 @@ definitions:
77
threshold:
88
type: string
99
type: object
10+
datastore.AmqpCredentials:
11+
properties:
12+
password:
13+
type: string
14+
user:
15+
type: string
16+
type: object
17+
datastore.AmqpPubSubConfig:
18+
properties:
19+
auth:
20+
$ref: '#/definitions/datastore.AmqpCredentials'
21+
bindedExchange:
22+
type: string
23+
deadLetterExchange:
24+
type: string
25+
host:
26+
type: string
27+
port:
28+
type: string
29+
queue:
30+
type: string
31+
routingKey:
32+
type: string
33+
schema:
34+
type: string
35+
type: object
1036
datastore.ApiKey:
1137
properties:
1238
header_name:
@@ -371,6 +397,8 @@ definitions:
371397
type: object
372398
datastore.PubSubConfig:
373399
properties:
400+
amqp:
401+
$ref: '#/definitions/datastore.AmqpPubSubConfig'
374402
google:
375403
$ref: '#/definitions/datastore.GooglePubSubConfig'
376404
kafka:
@@ -387,11 +415,13 @@ definitions:
387415
- sqs
388416
- google
389417
- kafka
418+
- amqp
390419
type: string
391420
x-enum-varnames:
392421
- SqsPubSub
393422
- GooglePubSub
394423
- KafkaPubSub
424+
- AmqpPubSub
395425
datastore.RateLimitConfiguration:
396426
properties:
397427
count:
@@ -558,6 +588,37 @@ definitions:
558588
threshold:
559589
type: string
560590
type: object
591+
models.AmqpAuth:
592+
properties:
593+
password:
594+
type: string
595+
user:
596+
type: string
597+
type: object
598+
models.AmqpExchange:
599+
properties:
600+
exchange:
601+
type: string
602+
routingKey:
603+
type: string
604+
type: object
605+
models.AmqpPubSubconfig:
606+
properties:
607+
auth:
608+
$ref: '#/definitions/models.AmqpAuth'
609+
bindExchange:
610+
$ref: '#/definitions/models.AmqpExchange'
611+
deadLetterExchange:
612+
type: string
613+
host:
614+
type: string
615+
port:
616+
type: string
617+
queue:
618+
type: string
619+
schema:
620+
type: string
621+
type: object
561622
models.ApiKey:
562623
properties:
563624
header_name:
@@ -1073,6 +1134,8 @@ definitions:
10731134
type: object
10741135
models.PubSubConfig:
10751136
properties:
1137+
amqp:
1138+
$ref: '#/definitions/models.AmqpPubSubconfig'
10761139
google:
10771140
$ref: '#/definitions/models.GooglePubSubConfig'
10781141
kafka:

0 commit comments

Comments
 (0)