Skip to content

Commit

Permalink
Added flag to limit the number of processed messages from the source …
Browse files Browse the repository at this point in the history
…queue (#4)

* added parameters to enable limiting the amount ou message processed from the queue

* added tests to cover the feature for limiting messages received from the queue

* refactoring tests

* bumping version to 0.2.1

* added clean and folders to .gitignore
  • Loading branch information
renanvieira authored Jan 5, 2020
1 parent dafc57c commit a10744e
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 171 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,6 @@ fabric.properties

# Editor-based Rest Client
.idea/httpRequests
.idea/
.idea/

out/
10 changes: 4 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
run-tests:
nosetests tests/ --with-coverage --cover-package=phoenix_letter -s
clean:
find . -name "*.py[co]" -o -name __pycache__ -exec rm -rf {} +

update-pypi:
rm -rf dist/*
python setup.py sdist
twine upload dist/*
run-tests: clean
nosetests tests/ --with-coverage --cover-package=phoenix_letter -s
Empty file.
37 changes: 37 additions & 0 deletions src/phoenix_letter/common/arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from argparse import ArgumentParser


def parse_arguments(args):
parser = ArgumentParser()
parser.add_argument("--src", dest="source",
required=True,
help="Source SQS Queue Name",
metavar="SOURCE_QUEUE")

parser.add_argument("--dst", dest="destination",
required=True,
help="Destination SQS Queue Name",
metavar="DESTINATION_QUEUE")

parser.add_argument("--aws-keys", dest="input_keys",
help="Flag that indicates you want to enter custom AWS keys.", action='store_true')

parser.add_argument("--region", dest="region", default="us-east-1",
required=True,
help="AWS Region",
metavar="REGION")

parser.add_argument("--empty-receive", dest="max_empty_receives_count", default=10,
help="Max number of empty receives before giving up",
metavar="EMPTY_RECEIVE")

parser.add_argument("--max", dest="max_messages", default=0, type=int,
help="Max number of messages to process from the source queue.",
metavar="N")

parser.add_argument("--max-per-request", dest="max_receive_messages", default=10, type=int, choices=range(1, 11),
help="Max number of messages to received from the source queue per request (this will be pass "
"in the MaxNumberOfMessages param). Default: 10 (AWS API max limit)",
metavar="N")

return parser.parse_args(args)
8 changes: 8 additions & 0 deletions src/phoenix_letter/common/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from getpass import getpass


def get_credentials():
access_key = getpass("AWS Access Key:")
secret_key = getpass("AWS Secret Key:")

return access_key, secret_key
6 changes: 6 additions & 0 deletions src/phoenix_letter/common/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import IntEnum


class ReasonStopEnum(IntEnum):
EMPTY_RECEIVED = 1,
MAX_MESSAGES_RECEIVED = 2
69 changes: 29 additions & 40 deletions src/phoenix_letter/main.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,12 @@
import random
import sys
from argparse import ArgumentParser
from time import sleep
from getpass import getpass

import boto3


def parse_arguments(args):
parser = ArgumentParser()
parser.add_argument("--src", dest="source",
required=True,
help="Source SQS Queue Name",
metavar="SOURCE_QUEUE")

parser.add_argument("--dst", dest="destination",
required=True,
help="Destination SQS Queue Name",
metavar="DESTINATION_QUEUE")

parser.add_argument("--aws-keys", dest="input_keys",
help="Flag that indicates you want to enter custom AWS keys.", action='store_true')

parser.add_argument("--region", dest="region", default="us-east-1",
required=True,
help="AWS Region",
metavar="REGION")

parser.add_argument("--empty-receive", dest="max_empty_receives_count", default=10,
help="Max number of empty receives before giving up",
metavar="EMPTY_RECEIVE")

return parser.parse_args(args)


def get_credentials():
access_key = getpass("AWS Access Key:")
secret_key = getpass("AWS Secret Key:")

return access_key, secret_key
from phoenix_letter.common.arguments import parse_arguments
from phoenix_letter.common.credentials import get_credentials
from phoenix_letter.common.enums import ReasonStopEnum


def main(args=None):
Expand All @@ -62,12 +30,22 @@ def main(args=None):
destination_queue_url = destination_queue['QueueUrl']

number_of_empty_receives = 0
total_messages_received = 0

reason = None

while True:
if number_of_empty_receives == int(args.max_empty_receives_count):
reason = ReasonStopEnum.EMPTY_RECEIVED
break
elif 0 < args.max_messages <= total_messages_received:
reason = ReasonStopEnum.MAX_MESSAGES_RECEIVED
break

while number_of_empty_receives <= int(args.max_empty_receives_count):
print("Receiving message...")
received_response = sqs_client.receive_message(QueueUrl=source_queue_url, MessageAttributeNames=["All"],
AttributeNames=['All'],
MaxNumberOfMessages=10)
MaxNumberOfMessages=args.max_receive_messages)

if ("Messages" not in received_response) or (len(received_response['Messages']) == 0):
print("Queue did not returned messages")
Expand All @@ -80,7 +58,11 @@ def main(args=None):

continue

print("Received {} messages".format(len(received_response['Messages'])))
messages_received = len(received_response['Messages'])

total_messages_received += messages_received

print("Received {} messages".format(messages_received))

for message in received_response['Messages']:
print("Sending message to '{}'".format(args.destination))
Expand All @@ -93,8 +75,15 @@ def main(args=None):
sqs_client.delete_message(QueueUrl=source_queue_url,
ReceiptHandle=message['ReceiptHandle'])

print("Giving up after {} empty receives from the source queue.".format(number_of_empty_receives))
if reason == ReasonStopEnum.MAX_MESSAGES_RECEIVED:
print("Stopping after processing {} messages.".format(total_messages_received))
else:
print("Giving up after {} empty receives from the source queue.".format(number_of_empty_receives))

return reason


if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
response = main(sys.argv[1:])
print("Stop Reason: {}".format(response.name))
sys.exit(0)
2 changes: 1 addition & 1 deletion src/phoenix_letter/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.0'
__version__ = '0.2.1'
72 changes: 70 additions & 2 deletions tests/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import json
import logging
import sys

import boto3
import six
from moto import mock_sqs

from phoenix_letter.main import main

if six.PY2:
from unittest2 import TestCase
Expand All @@ -21,10 +27,72 @@
logger = logging.getLogger(__name__)


@mock_sqs
class BaseTestCase(TestCase):

def setUp(self):
pass
self.region = "us-east-1"

self.access_key = "AWS_ACCESS_MOCKED_KEY"
self.secret_key = "AWS_SECRET_MOCKED_KEY"

self.args = list()

self.args.append("--src")
self.args.append("queue_a")

self.args.append("--dst")
self.args.append("queue_b")

self.args.append("--region")
self.args.append(self.region)

self.args.append("--empty-receive")
self.args.append("2")

self.sqs = boto3.client("sqs", region_name=self.region,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key)

self.sqs.create_queue(QueueName="queue_a")
self.queue_a_url = self.sqs.get_queue_url(QueueName="queue_a")['QueueUrl']

self.sqs.create_queue(QueueName="queue_b")
self.queue_b_url = self.sqs.get_queue_url(QueueName="queue_b")['QueueUrl']

def tearDown(self):
pass
self._clean_queues([self.queue_a_url, self.queue_b_url])

def _create_message(self):
message = dict()
message["Body"] = json.dumps(dict(test="This is a test"))

message_attr = {
'Attribute1': {
'StringValue': 'Attribute Value',
'DataType': 'String'
},
'Attribute2': {
'StringValue': 'Attribute 2 Value',
'DataType': 'String'
},
}

message["MessageAttributes"] = message_attr

return message

def add_message(self, queue_url):
message = self._create_message()
self.sqs.send_message(QueueUrl=queue_url,
MessageBody=message['Body'],
MessageAttributes=message['MessageAttributes'])

def get_number_of_message(self, queue_url):
attributes = self.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages'])

return int(attributes['Attributes'].get("ApproximateNumberOfMessages"))

def _clean_queues(self, queues):
for q in queues:
self.sqs.purge_queue(QueueUrl=q)
91 changes: 91 additions & 0 deletions tests/test_move_message_max_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json

import six

from phoenix_letter.common.enums import ReasonStopEnum

if six.PY2:
from mock import patch
else:
from unittest.mock import patch

from moto import mock_sqs

from phoenix_letter.main import main

from tests.bootstrap import BaseTestCase


@mock_sqs
@patch("phoenix_letter.common.credentials.getpass")
class MoveMessagesWithMaxMessageLimitTestCase(BaseTestCase):

def setUp(self):
super(MoveMessagesWithMaxMessageLimitTestCase, self).setUp()
self.args.append("--aws-keys")

self.args.append("--max")
self.args.append("1")

self.args.append("--max-per-request")
self.args.append("1")

def tearDown(self):
super(MoveMessagesWithMaxMessageLimitTestCase, self).tearDown()

def test_move_message_without_args(self, mock_get_pass):
with self.assertRaises(SystemExit) as cm:
main([])

self.assertEqual(cm.exception.code, 2)

def test_move_messages_empty_queue(self, mock_get_pass):
mock_get_pass.side_effect = [self.access_key, self.secret_key] * 2

self._clean_queues([self.queue_a_url, self.queue_b_url])

result = main(self.args)

self.assertEquals(result, ReasonStopEnum.EMPTY_RECEIVED)
self.assertEqual(mock_get_pass.call_count, 2)

mock_get_pass.reset_mock()

def test_move_messages(self, mock_get_pass):
mock_get_pass.side_effect = [self.access_key, self.secret_key] * 2

self.add_message(self.queue_a_url)
self.add_message(self.queue_a_url)

before_processing = self.get_number_of_message(self.queue_a_url)

result = main(self.args)

after_processing = self.get_number_of_message(self.queue_a_url)

self.assertEquals(result, ReasonStopEnum.MAX_MESSAGES_RECEIVED)
self.assertEquals(after_processing, before_processing - 1)
self.assertEquals(mock_get_pass.call_count, 2)

dst_message = self.sqs.receive_message(QueueUrl=self.queue_b_url,
MessageAttributeNames=["All"],
AttributeNames=['All'],
MaxNumberOfMessages=10)

self.assertIsNotNone(dst_message)
self.assertIn("Messages", dst_message)
self.assertTrue(len(dst_message["Messages"]) == 1)

first_message = dst_message["Messages"][0]
self.assertEqual(first_message["Body"], json.dumps(dict(test="This is a test")))

msg_attributes = first_message["MessageAttributes"]
self.assertIn("Attribute1", msg_attributes)
self.assertIn("Attribute2", msg_attributes)

self.assertEqual(msg_attributes["Attribute1"]["StringValue"], "Attribute Value")
self.assertEqual(msg_attributes["Attribute1"]["DataType"], "String")

self.assertEqual(msg_attributes["Attribute2"]["StringValue"], "Attribute 2 Value")
self.assertEqual(msg_attributes["Attribute2"]["DataType"], "String")
mock_get_pass.reset_mock()
Loading

0 comments on commit a10744e

Please sign in to comment.