diff --git a/executors/rabbitmq/README.md b/executors/rabbitmq/README.md index 88d18c84..292f63d6 100644 --- a/executors/rabbitmq/README.md +++ b/executors/rabbitmq/README.md @@ -1,9 +1,10 @@ # Venom - Executor RabbitMQ -Three types of execution are supported: +Four types of execution are supported: - **publisher**: publish a message to a queue or to an exchange. - **subscriber**: bind to a queue or an exchange (using routing key) and wait for message(s) to be consumed. -- **client**: publish a message to a queue or to an exchange and wait for the response message to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue. +- **client**: publish a request message to a queue or to an exchange and wait for the reply to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue. +- **server**: bind to a queue or an exchange (using routing key) and send a reply over the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue when a request is received. Steps to use publish / subscribe on a RabbitMQ: @@ -128,6 +129,8 @@ vars: addrs: 'amqp://localhost:5672' user: password: + qName: 'consumer-queue' +testcases: - name: RabbitMQ subscribe testcase steps: - type: rabbitmq @@ -136,6 +139,9 @@ vars: password: "{{.password}}" clientType: subscriber exchange: exchange_test + exchangeType: direct + durable: true + qName: "{{.qName}}" routingKey: pubsub_test messageLimit: 1 assertions: @@ -162,9 +168,49 @@ testcases: password: "{{.password}}" clientType: client exchange: exchange_test - routingKey: pubsub_test + exchangeType: direct + durable: true + routingKey: order-query messages: - - value: '{"a": "b"}' + - value: '{"OrderId": "ORDER-12345"}' + contentType: application/json + contentEncoding: utf8 + persistent: false + headers: + myCustomHeader: value + myCustomHeader2: value2 + messageLimit: 1 + assertions: + - result.bodyjson.bodyjson0 ShouldContainKey OrderStatus + - result.bodyjson.bodyjson0.OrderStatus ShouldEqual Pending +``` + +### Server (pubsub RPC) +Use the _assertions_ to validate the request. Note the reply will be sent regardless of validation result. +Use the _messages_ to define reply payload(s). +For convenience, each reply message includes an _x-request-messageid_ header populated with the _MessageId_ property of the request message. +```yaml +name: TestSuite RabbitMQ +vars: + addrs: 'amqp://localhost:5672' + user: + password: + qName: 'order-query-handler' +testcases: + - name: RabbitMQ request/reply + steps: + - type: rabbitmq + addrs: "{{.addrs}}" + user: "{{.user}}" + password: "{{.password}}" + qName: "{{.qName}}" + clientType: server + exchange: exchange_test + exchangeType: direct + durable: true + routingKey: order-query + messages: + - value: '{"Status": "OK", "OrderDate": "2024/11/07", "OrderStatus": "Pending"}' contentType: application/json contentEncoding: utf8 persistent: false @@ -173,6 +219,5 @@ testcases: myCustomHeader2: value2 messageLimit: 1 assertions: - - result.bodyjson.bodyjson0 ShouldContainKey Status - - result.bodyjson.bodyjson0.Status ShouldEqual Succeeded + - result.bodyjson.bodyjson0 ShouldContainKey OrderId ``` diff --git a/executors/rabbitmq/rabbitmq.go b/executors/rabbitmq/rabbitmq.go index baead3dc..0034edee 100644 --- a/executors/rabbitmq/rabbitmq.go +++ b/executors/rabbitmq/rabbitmq.go @@ -52,11 +52,11 @@ type Executor struct { // ExchangeType represents the type of exchange (fanout, etc..) RoutingKey string `json:"routing_key" yaml:"routingKey"` - // Represents the limit of message will be read. After limit, consumer stop read message + // Represents the limit of messages that will be read. After limit, consumer stops reading messages MessageLimit int `json:"message_limit" yaml:"messageLimit"` // Used when ClientType is producer - // Messages represents the message sended by producer + // Messages represents the message sent by producer Messages []Message `json:"messages" yaml:"messages"` } @@ -103,15 +103,14 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro switch e.ClientType { case "publisher": - workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir") - err := e.publishMessages(ctx, workdir, nil, nil, false) + err := e.publishMessages(ctx, nil, nil, false) if err != nil { result.Err = err.Error() return nil, err } case "subscriber": var err error - result.Body, result.BodyJSON, result.Messages, result.Headers, err = e.consumeMessages(ctx) + result.Body, result.BodyJSON, result.Messages, result.Headers, err = e.consumeMessages(ctx, false) if err != nil { result.Err = err.Error() return nil, err @@ -130,8 +129,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro } venom.Info(ctx, "Reply consumer started.") - workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir") - err = e.publishMessages(ctx, workdir, conn, ch, true) + err = e.publishMessages(ctx, conn, ch, true) if err != nil { result.Err = err.Error() return nil, err @@ -143,8 +141,15 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro body, bodyJSON = e.processMessage(ctx, d, true, body, bodyJSON) result.Body = body result.BodyJSON = bodyJSON + case "server": + var err error + result.Body, result.BodyJSON, result.Messages, result.Headers, err = e.consumeMessages(ctx, true) + if err != nil { + result.Err = err.Error() + return nil, err + } default: - return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client", e.ClientType) + return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client or server", e.ClientType) } elapsed := time.Since(start) @@ -153,7 +158,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro return result, nil } -func (e Executor) publishMessages(ctx context.Context, workdir string, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error { +func (e Executor) publishMessages(ctx context.Context, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error { var ch *amqp.Channel var err error if connection == nil || channel == nil { @@ -204,37 +209,51 @@ func (e Executor) publishMessages(ctx context.Context, workdir string, connectio venom.Debug(ctx, "%d message to send", len(e.Messages)) for i := range e.Messages { - deliveryMode := amqp.Persistent - if !e.Messages[i].Persistent { - deliveryMode = amqp.Transient - } - var replyTo string = e.Messages[i].ReplyTo - if rpc { - replyTo = "amq.rabbitmq.reply-to" - } - err = ch.Publish( - e.Exchange, // exchange - routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - DeliveryMode: deliveryMode, - ContentType: e.Messages[i].ContentType, - ContentEncoding: e.Messages[i].ContentEncoding, - ReplyTo: replyTo, - Body: []byte(e.Messages[i].Value), - Headers: e.Messages[i].Headers, - }) - + err = e.publishMessage(e.Messages[i], ctx, ch, routingKey, rpc, false) if err != nil { return err } - venom.Debug(ctx, "Message %q sent (exchange: %q, routing key: %q)", e.Messages[i].Value, e.Exchange, routingKey) } return nil } +func (e Executor) publishMessage(message Message, ctx context.Context, ch *amqp.Channel, routingKey string, request bool, reply bool) error { + var err error + deliveryMode := amqp.Persistent + if !message.Persistent { + deliveryMode = amqp.Transient + } + var replyTo string = message.ReplyTo + if request { + replyTo = "amq.rabbitmq.reply-to" + } + exchange := e.Exchange + if reply { + exchange = "" + } + err = ch.Publish( + exchange, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: deliveryMode, + ContentType: message.ContentType, + ContentEncoding: message.ContentEncoding, + ReplyTo: replyTo, + Body: []byte(message.Value), + Headers: message.Headers, + }) + + if err != nil { + return err + } + venom.Info(ctx, "Message %q sent (exchange: %q, routing key: %q)", message.Value, exchange, routingKey) + + return nil +} + func (e Executor) openChannel(ctx context.Context) (*amqp.Connection, *amqp.Channel, error) { uri, err := amqp.ParseURI(e.Addrs) if err != nil { @@ -274,7 +293,7 @@ func (e Executor) processMessage(ctx context.Context, msg amqp.Delivery, ok bool return body, bodyJSON } -func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) { +func (e Executor) consumeMessages(ctx context.Context, sendReply bool) ([]string, []interface{}, []interface{}, []amqp.Table, error) { conn, ch, err := e.openChannel(ctx) if err != nil { return nil, nil, nil, nil, err @@ -328,19 +347,39 @@ func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, bodyJSON := []interface{}{} messages := []interface{}{} headers := []amqp.Table{} - - for i := 0; i < e.MessageLimit; i++ { - venom.Debug(ctx, "Read message n° %d", i) - - msg, ok, err := ch.Get(q.Name, true) // Read one message from RabbitMQ - if err != nil { - return nil, nil, nil, nil, err + ticker := time.NewTicker(1 * time.Second) + i := 0 + + for i < e.MessageLimit { + select { + case <-ticker.C: + msg, ok, err := ch.Get(q.Name, true) // Read one message from RabbitMQ + if err != nil { + ticker.Stop() + return nil, nil, nil, nil, err + } + if ok { + venom.Info(ctx, "Received message from the queue.") + + headers = append(headers, msg.Headers) + messages = append(messages, msg) + body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON) + if sendReply { + if msg.ReplyTo == "" { + venom.Error(ctx, "Received message does not contain a reply address. Verify it has been published with a ReplyTo property. Skipping...") + continue + } + if e.Messages[i].Headers == nil { + e.Messages[i].Headers = make(map[string]interface{}) + } + e.Messages[i].Headers["x-request-messageid"] = msg.MessageId + e.publishMessage(e.Messages[i], ctx, ch, msg.ReplyTo, false, true) + } + i++ + } } - - headers = append(headers, msg.Headers) - messages = append(messages, msg) - body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON) } + ticker.Stop() return body, bodyJSON, messages, headers, err }