Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rabbitmq): Add support for simulating RPC servers. #823

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions executors/rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Venom - Executor RabbitMQ

Three types of execution are supported:
Four types of execution are supported:
- **publisher**: publish a message to a queue or to an exchange.
- **subscriber**: bind to a queue or an exchange (using routing key) and wait for message(s) to be consumed.
- **client**: publish a message to a queue or to an exchange and wait for the response message to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue.
- **client**: publish a request message to a queue or to an exchange and wait for the reply to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue.
- **server**: bind to a queue or an exchange (using routing key) and send a reply over the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue when a request is received.

Steps to use publish / subscribe on a RabbitMQ:

Expand Down Expand Up @@ -128,6 +129,8 @@ vars:
addrs: 'amqp://localhost:5672'
user:
password:
qName: 'consumer-queue'
testcases:
- name: RabbitMQ subscribe testcase
steps:
- type: rabbitmq
Expand All @@ -136,6 +139,9 @@ vars:
password: "{{.password}}"
clientType: subscriber
exchange: exchange_test
exchangeType: direct
durable: true
qName: "{{.qName}}"
routingKey: pubsub_test
messageLimit: 1
assertions:
Expand All @@ -162,9 +168,49 @@ testcases:
password: "{{.password}}"
clientType: client
exchange: exchange_test
routingKey: pubsub_test
exchangeType: direct
durable: true
routingKey: order-query
messages:
- value: '{"a": "b"}'
- value: '{"OrderId": "ORDER-12345"}'
contentType: application/json
contentEncoding: utf8
persistent: false
headers:
myCustomHeader: value
myCustomHeader2: value2
messageLimit: 1
assertions:
- result.bodyjson.bodyjson0 ShouldContainKey OrderStatus
- result.bodyjson.bodyjson0.OrderStatus ShouldEqual Pending
```

### Server (pubsub RPC)
Use the _assertions_ to validate the request. Note the reply will be sent regardless of validation result.
Use the _messages_ to define reply payload(s).
For convenience, each reply message includes an _x-request-messageid_ header populated with the _MessageId_ property of the request message.
```yaml
name: TestSuite RabbitMQ
vars:
addrs: 'amqp://localhost:5672'
user:
password:
qName: 'order-query-handler'
testcases:
- name: RabbitMQ request/reply
steps:
- type: rabbitmq
addrs: "{{.addrs}}"
user: "{{.user}}"
password: "{{.password}}"
qName: "{{.qName}}"
clientType: server
exchange: exchange_test
exchangeType: direct
durable: true
routingKey: order-query
messages:
- value: '{"Status": "OK", "OrderDate": "2024/11/07", "OrderStatus": "Pending"}'
contentType: application/json
contentEncoding: utf8
persistent: false
Expand All @@ -173,6 +219,5 @@ testcases:
myCustomHeader2: value2
messageLimit: 1
assertions:
- result.bodyjson.bodyjson0 ShouldContainKey Status
- result.bodyjson.bodyjson0.Status ShouldEqual Succeeded
- result.bodyjson.bodyjson0 ShouldContainKey OrderId
```
127 changes: 83 additions & 44 deletions executors/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ type Executor struct {
// ExchangeType represents the type of exchange (fanout, etc..)
RoutingKey string `json:"routing_key" yaml:"routingKey"`

// Represents the limit of message will be read. After limit, consumer stop read message
// Represents the limit of messages that will be read. After limit, consumer stops reading messages
MessageLimit int `json:"message_limit" yaml:"messageLimit"`

// Used when ClientType is producer
// Messages represents the message sended by producer
// Messages represents the message sent by producer
Messages []Message `json:"messages" yaml:"messages"`
}

Expand Down Expand Up @@ -103,15 +103,14 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro

switch e.ClientType {
case "publisher":
workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir")
err := e.publishMessages(ctx, workdir, nil, nil, false)
err := e.publishMessages(ctx, nil, nil, false)
if err != nil {
result.Err = err.Error()
return nil, err
}
case "subscriber":
var err error
result.Body, result.BodyJSON, result.Messages, result.Headers, err = e.consumeMessages(ctx)
result.Body, result.BodyJSON, result.Messages, result.Headers, err = e.consumeMessages(ctx, false)
if err != nil {
result.Err = err.Error()
return nil, err
Expand All @@ -130,8 +129,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
}
venom.Info(ctx, "Reply consumer started.")

workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir")
err = e.publishMessages(ctx, workdir, conn, ch, true)
err = e.publishMessages(ctx, conn, ch, true)
if err != nil {
result.Err = err.Error()
return nil, err
Expand All @@ -143,8 +141,15 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
body, bodyJSON = e.processMessage(ctx, d, true, body, bodyJSON)
result.Body = body
result.BodyJSON = bodyJSON
case "server":
var err error
result.Body, result.BodyJSON, result.Messages, result.Headers, err = e.consumeMessages(ctx, true)
if err != nil {
result.Err = err.Error()
return nil, err
}
default:
return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client", e.ClientType)
return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client or server", e.ClientType)
}

elapsed := time.Since(start)
Expand All @@ -153,7 +158,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
return result, nil
}

func (e Executor) publishMessages(ctx context.Context, workdir string, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error {
func (e Executor) publishMessages(ctx context.Context, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error {
var ch *amqp.Channel
var err error
if connection == nil || channel == nil {
Expand Down Expand Up @@ -204,37 +209,51 @@ func (e Executor) publishMessages(ctx context.Context, workdir string, connectio

venom.Debug(ctx, "%d message to send", len(e.Messages))
for i := range e.Messages {
deliveryMode := amqp.Persistent
if !e.Messages[i].Persistent {
deliveryMode = amqp.Transient
}
var replyTo string = e.Messages[i].ReplyTo
if rpc {
replyTo = "amq.rabbitmq.reply-to"
}
err = ch.Publish(
e.Exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: deliveryMode,
ContentType: e.Messages[i].ContentType,
ContentEncoding: e.Messages[i].ContentEncoding,
ReplyTo: replyTo,
Body: []byte(e.Messages[i].Value),
Headers: e.Messages[i].Headers,
})

err = e.publishMessage(e.Messages[i], ctx, ch, routingKey, rpc, false)
if err != nil {
return err
}
venom.Debug(ctx, "Message %q sent (exchange: %q, routing key: %q)", e.Messages[i].Value, e.Exchange, routingKey)
}

return nil
}

func (e Executor) publishMessage(message Message, ctx context.Context, ch *amqp.Channel, routingKey string, request bool, reply bool) error {
var err error
deliveryMode := amqp.Persistent
if !message.Persistent {
deliveryMode = amqp.Transient
}
var replyTo string = message.ReplyTo
if request {
replyTo = "amq.rabbitmq.reply-to"
}
exchange := e.Exchange
if reply {
exchange = ""
}
err = ch.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: deliveryMode,
ContentType: message.ContentType,
ContentEncoding: message.ContentEncoding,
ReplyTo: replyTo,
Body: []byte(message.Value),
Headers: message.Headers,
})

if err != nil {
return err
}
venom.Info(ctx, "Message %q sent (exchange: %q, routing key: %q)", message.Value, exchange, routingKey)

return nil
}

func (e Executor) openChannel(ctx context.Context) (*amqp.Connection, *amqp.Channel, error) {
uri, err := amqp.ParseURI(e.Addrs)
if err != nil {
Expand Down Expand Up @@ -274,7 +293,7 @@ func (e Executor) processMessage(ctx context.Context, msg amqp.Delivery, ok bool
return body, bodyJSON
}

func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) {
func (e Executor) consumeMessages(ctx context.Context, sendReply bool) ([]string, []interface{}, []interface{}, []amqp.Table, error) {
conn, ch, err := e.openChannel(ctx)
if err != nil {
return nil, nil, nil, nil, err
Expand Down Expand Up @@ -328,19 +347,39 @@ func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{},
bodyJSON := []interface{}{}
messages := []interface{}{}
headers := []amqp.Table{}

for i := 0; i < e.MessageLimit; i++ {
venom.Debug(ctx, "Read message n° %d", i)

msg, ok, err := ch.Get(q.Name, true) // Read one message from RabbitMQ
if err != nil {
return nil, nil, nil, nil, err
ticker := time.NewTicker(1 * time.Second)
i := 0

for i < e.MessageLimit {
select {
case <-ticker.C:
msg, ok, err := ch.Get(q.Name, true) // Read one message from RabbitMQ
if err != nil {
ticker.Stop()
return nil, nil, nil, nil, err
}
if ok {
venom.Info(ctx, "Received message from the queue.")

headers = append(headers, msg.Headers)
messages = append(messages, msg)
body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON)
if sendReply {
if msg.ReplyTo == "" {
venom.Error(ctx, "Received message does not contain a reply address. Verify it has been published with a ReplyTo property. Skipping...")
continue
}
if e.Messages[i].Headers == nil {
e.Messages[i].Headers = make(map[string]interface{})
}
e.Messages[i].Headers["x-request-messageid"] = msg.MessageId
e.publishMessage(e.Messages[i], ctx, ch, msg.ReplyTo, false, true)
}
i++
}
}

headers = append(headers, msg.Headers)
messages = append(messages, msg)
body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON)
}

ticker.Stop()
return body, bodyJSON, messages, headers, err
}