From a10744eb6011f756cd959e100ffc74af9bf22901 Mon Sep 17 00:00:00 2001 From: Renan Date: Sun, 5 Jan 2020 14:28:23 +0100 Subject: [PATCH] Added flag to limit the number of processed messages from the source queue (#4) * added parameters to enable limiting the amount ou message processed from the queue * added tests to cover the feature for limiting messages received from the queue * refactoring tests * bumping version to 0.2.1 * added clean and folders to .gitignore --- .gitignore | 4 +- Makefile | 10 +- src/phoenix_letter/common/__init__.py | 0 src/phoenix_letter/common/arguments.py | 37 +++++ src/phoenix_letter/common/credentials.py | 8 ++ src/phoenix_letter/common/enums.py | 6 + src/phoenix_letter/main.py | 69 ++++----- src/phoenix_letter/version.py | 2 +- tests/bootstrap.py | 72 +++++++++- tests/test_move_message_max_message.py | 91 ++++++++++++ tests/test_move_message_without_aws_key.py | 74 ++++++++++ tests/test_move_messages.py | 160 +++++---------------- 12 files changed, 362 insertions(+), 171 deletions(-) create mode 100644 src/phoenix_letter/common/__init__.py create mode 100644 src/phoenix_letter/common/arguments.py create mode 100644 src/phoenix_letter/common/credentials.py create mode 100644 src/phoenix_letter/common/enums.py create mode 100644 tests/test_move_message_max_message.py create mode 100644 tests/test_move_message_without_aws_key.py diff --git a/.gitignore b/.gitignore index f3ac30d..f9c9e09 100644 --- a/.gitignore +++ b/.gitignore @@ -165,4 +165,6 @@ fabric.properties # Editor-based Rest Client .idea/httpRequests -.idea/ \ No newline at end of file +.idea/ + +out/ \ No newline at end of file diff --git a/Makefile b/Makefile index ec47d1d..2357b75 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,5 @@ -run-tests: - nosetests tests/ --with-coverage --cover-package=phoenix_letter -s +clean: + find . -name "*.py[co]" -o -name __pycache__ -exec rm -rf {} + -update-pypi: - rm -rf dist/* - python setup.py sdist - twine upload dist/* \ No newline at end of file +run-tests: clean + nosetests tests/ --with-coverage --cover-package=phoenix_letter -s \ No newline at end of file diff --git a/src/phoenix_letter/common/__init__.py b/src/phoenix_letter/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/phoenix_letter/common/arguments.py b/src/phoenix_letter/common/arguments.py new file mode 100644 index 0000000..a74f82b --- /dev/null +++ b/src/phoenix_letter/common/arguments.py @@ -0,0 +1,37 @@ +from argparse import ArgumentParser + + +def parse_arguments(args): + parser = ArgumentParser() + parser.add_argument("--src", dest="source", + required=True, + help="Source SQS Queue Name", + metavar="SOURCE_QUEUE") + + parser.add_argument("--dst", dest="destination", + required=True, + help="Destination SQS Queue Name", + metavar="DESTINATION_QUEUE") + + parser.add_argument("--aws-keys", dest="input_keys", + help="Flag that indicates you want to enter custom AWS keys.", action='store_true') + + parser.add_argument("--region", dest="region", default="us-east-1", + required=True, + help="AWS Region", + metavar="REGION") + + parser.add_argument("--empty-receive", dest="max_empty_receives_count", default=10, + help="Max number of empty receives before giving up", + metavar="EMPTY_RECEIVE") + + parser.add_argument("--max", dest="max_messages", default=0, type=int, + help="Max number of messages to process from the source queue.", + metavar="N") + + parser.add_argument("--max-per-request", dest="max_receive_messages", default=10, type=int, choices=range(1, 11), + help="Max number of messages to received from the source queue per request (this will be pass " + "in the MaxNumberOfMessages param). Default: 10 (AWS API max limit)", + metavar="N") + + return parser.parse_args(args) diff --git a/src/phoenix_letter/common/credentials.py b/src/phoenix_letter/common/credentials.py new file mode 100644 index 0000000..d7b8972 --- /dev/null +++ b/src/phoenix_letter/common/credentials.py @@ -0,0 +1,8 @@ +from getpass import getpass + + +def get_credentials(): + access_key = getpass("AWS Access Key:") + secret_key = getpass("AWS Secret Key:") + + return access_key, secret_key diff --git a/src/phoenix_letter/common/enums.py b/src/phoenix_letter/common/enums.py new file mode 100644 index 0000000..1388a35 --- /dev/null +++ b/src/phoenix_letter/common/enums.py @@ -0,0 +1,6 @@ +from enum import IntEnum + + +class ReasonStopEnum(IntEnum): + EMPTY_RECEIVED = 1, + MAX_MESSAGES_RECEIVED = 2 \ No newline at end of file diff --git a/src/phoenix_letter/main.py b/src/phoenix_letter/main.py index 99ab89c..3cd6b9e 100644 --- a/src/phoenix_letter/main.py +++ b/src/phoenix_letter/main.py @@ -1,44 +1,12 @@ import random import sys -from argparse import ArgumentParser from time import sleep -from getpass import getpass import boto3 - -def parse_arguments(args): - parser = ArgumentParser() - parser.add_argument("--src", dest="source", - required=True, - help="Source SQS Queue Name", - metavar="SOURCE_QUEUE") - - parser.add_argument("--dst", dest="destination", - required=True, - help="Destination SQS Queue Name", - metavar="DESTINATION_QUEUE") - - parser.add_argument("--aws-keys", dest="input_keys", - help="Flag that indicates you want to enter custom AWS keys.", action='store_true') - - parser.add_argument("--region", dest="region", default="us-east-1", - required=True, - help="AWS Region", - metavar="REGION") - - parser.add_argument("--empty-receive", dest="max_empty_receives_count", default=10, - help="Max number of empty receives before giving up", - metavar="EMPTY_RECEIVE") - - return parser.parse_args(args) - - -def get_credentials(): - access_key = getpass("AWS Access Key:") - secret_key = getpass("AWS Secret Key:") - - return access_key, secret_key +from phoenix_letter.common.arguments import parse_arguments +from phoenix_letter.common.credentials import get_credentials +from phoenix_letter.common.enums import ReasonStopEnum def main(args=None): @@ -62,12 +30,22 @@ def main(args=None): destination_queue_url = destination_queue['QueueUrl'] number_of_empty_receives = 0 + total_messages_received = 0 + + reason = None + + while True: + if number_of_empty_receives == int(args.max_empty_receives_count): + reason = ReasonStopEnum.EMPTY_RECEIVED + break + elif 0 < args.max_messages <= total_messages_received: + reason = ReasonStopEnum.MAX_MESSAGES_RECEIVED + break - while number_of_empty_receives <= int(args.max_empty_receives_count): print("Receiving message...") received_response = sqs_client.receive_message(QueueUrl=source_queue_url, MessageAttributeNames=["All"], AttributeNames=['All'], - MaxNumberOfMessages=10) + MaxNumberOfMessages=args.max_receive_messages) if ("Messages" not in received_response) or (len(received_response['Messages']) == 0): print("Queue did not returned messages") @@ -80,7 +58,11 @@ def main(args=None): continue - print("Received {} messages".format(len(received_response['Messages']))) + messages_received = len(received_response['Messages']) + + total_messages_received += messages_received + + print("Received {} messages".format(messages_received)) for message in received_response['Messages']: print("Sending message to '{}'".format(args.destination)) @@ -93,8 +75,15 @@ def main(args=None): sqs_client.delete_message(QueueUrl=source_queue_url, ReceiptHandle=message['ReceiptHandle']) - print("Giving up after {} empty receives from the source queue.".format(number_of_empty_receives)) + if reason == ReasonStopEnum.MAX_MESSAGES_RECEIVED: + print("Stopping after processing {} messages.".format(total_messages_received)) + else: + print("Giving up after {} empty receives from the source queue.".format(number_of_empty_receives)) + + return reason if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + response = main(sys.argv[1:]) + print("Stop Reason: {}".format(response.name)) + sys.exit(0) diff --git a/src/phoenix_letter/version.py b/src/phoenix_letter/version.py index 7fd229a..fc79d63 100644 --- a/src/phoenix_letter/version.py +++ b/src/phoenix_letter/version.py @@ -1 +1 @@ -__version__ = '0.2.0' +__version__ = '0.2.1' diff --git a/tests/bootstrap.py b/tests/bootstrap.py index c4353b2..c6ed534 100644 --- a/tests/bootstrap.py +++ b/tests/bootstrap.py @@ -1,6 +1,12 @@ +import json import logging import sys + +import boto3 import six +from moto import mock_sqs + +from phoenix_letter.main import main if six.PY2: from unittest2 import TestCase @@ -21,10 +27,72 @@ logger = logging.getLogger(__name__) +@mock_sqs class BaseTestCase(TestCase): def setUp(self): - pass + self.region = "us-east-1" + + self.access_key = "AWS_ACCESS_MOCKED_KEY" + self.secret_key = "AWS_SECRET_MOCKED_KEY" + + self.args = list() + + self.args.append("--src") + self.args.append("queue_a") + + self.args.append("--dst") + self.args.append("queue_b") + + self.args.append("--region") + self.args.append(self.region) + + self.args.append("--empty-receive") + self.args.append("2") + + self.sqs = boto3.client("sqs", region_name=self.region, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key) + + self.sqs.create_queue(QueueName="queue_a") + self.queue_a_url = self.sqs.get_queue_url(QueueName="queue_a")['QueueUrl'] + + self.sqs.create_queue(QueueName="queue_b") + self.queue_b_url = self.sqs.get_queue_url(QueueName="queue_b")['QueueUrl'] def tearDown(self): - pass + self._clean_queues([self.queue_a_url, self.queue_b_url]) + + def _create_message(self): + message = dict() + message["Body"] = json.dumps(dict(test="This is a test")) + + message_attr = { + 'Attribute1': { + 'StringValue': 'Attribute Value', + 'DataType': 'String' + }, + 'Attribute2': { + 'StringValue': 'Attribute 2 Value', + 'DataType': 'String' + }, + } + + message["MessageAttributes"] = message_attr + + return message + + def add_message(self, queue_url): + message = self._create_message() + self.sqs.send_message(QueueUrl=queue_url, + MessageBody=message['Body'], + MessageAttributes=message['MessageAttributes']) + + def get_number_of_message(self, queue_url): + attributes = self.sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages']) + + return int(attributes['Attributes'].get("ApproximateNumberOfMessages")) + + def _clean_queues(self, queues): + for q in queues: + self.sqs.purge_queue(QueueUrl=q) diff --git a/tests/test_move_message_max_message.py b/tests/test_move_message_max_message.py new file mode 100644 index 0000000..f566f5b --- /dev/null +++ b/tests/test_move_message_max_message.py @@ -0,0 +1,91 @@ +import json + +import six + +from phoenix_letter.common.enums import ReasonStopEnum + +if six.PY2: + from mock import patch +else: + from unittest.mock import patch + +from moto import mock_sqs + +from phoenix_letter.main import main + +from tests.bootstrap import BaseTestCase + + +@mock_sqs +@patch("phoenix_letter.common.credentials.getpass") +class MoveMessagesWithMaxMessageLimitTestCase(BaseTestCase): + + def setUp(self): + super(MoveMessagesWithMaxMessageLimitTestCase, self).setUp() + self.args.append("--aws-keys") + + self.args.append("--max") + self.args.append("1") + + self.args.append("--max-per-request") + self.args.append("1") + + def tearDown(self): + super(MoveMessagesWithMaxMessageLimitTestCase, self).tearDown() + + def test_move_message_without_args(self, mock_get_pass): + with self.assertRaises(SystemExit) as cm: + main([]) + + self.assertEqual(cm.exception.code, 2) + + def test_move_messages_empty_queue(self, mock_get_pass): + mock_get_pass.side_effect = [self.access_key, self.secret_key] * 2 + + self._clean_queues([self.queue_a_url, self.queue_b_url]) + + result = main(self.args) + + self.assertEquals(result, ReasonStopEnum.EMPTY_RECEIVED) + self.assertEqual(mock_get_pass.call_count, 2) + + mock_get_pass.reset_mock() + + def test_move_messages(self, mock_get_pass): + mock_get_pass.side_effect = [self.access_key, self.secret_key] * 2 + + self.add_message(self.queue_a_url) + self.add_message(self.queue_a_url) + + before_processing = self.get_number_of_message(self.queue_a_url) + + result = main(self.args) + + after_processing = self.get_number_of_message(self.queue_a_url) + + self.assertEquals(result, ReasonStopEnum.MAX_MESSAGES_RECEIVED) + self.assertEquals(after_processing, before_processing - 1) + self.assertEquals(mock_get_pass.call_count, 2) + + dst_message = self.sqs.receive_message(QueueUrl=self.queue_b_url, + MessageAttributeNames=["All"], + AttributeNames=['All'], + MaxNumberOfMessages=10) + + self.assertIsNotNone(dst_message) + self.assertIn("Messages", dst_message) + self.assertTrue(len(dst_message["Messages"]) == 1) + + first_message = dst_message["Messages"][0] + self.assertEqual(first_message["Body"], json.dumps(dict(test="This is a test"))) + + msg_attributes = first_message["MessageAttributes"] + self.assertIn("Attribute1", msg_attributes) + self.assertIn("Attribute2", msg_attributes) + + self.assertEqual(msg_attributes["Attribute1"]["StringValue"], "Attribute Value") + self.assertEqual(msg_attributes["Attribute1"]["DataType"], "String") + + self.assertEqual(msg_attributes["Attribute2"]["StringValue"], "Attribute 2 Value") + self.assertEqual(msg_attributes["Attribute2"]["DataType"], "String") + mock_get_pass.reset_mock() diff --git a/tests/test_move_message_without_aws_key.py b/tests/test_move_message_without_aws_key.py new file mode 100644 index 0000000..47c0353 --- /dev/null +++ b/tests/test_move_message_without_aws_key.py @@ -0,0 +1,74 @@ +import json + +import six + +from phoenix_letter.common.enums import ReasonStopEnum + +if six.PY2: + from mock import patch +else: + from unittest.mock import patch + +from moto import mock_sqs + +from phoenix_letter.main import main + +from tests.bootstrap import BaseTestCase + + +@mock_sqs +class MoveMessagesWithoutAWSKeysTestCase(BaseTestCase): + + def setUp(self): + super(MoveMessagesWithoutAWSKeysTestCase, self).setUp() + + def tearDown(self): + super(MoveMessagesWithoutAWSKeysTestCase, self).tearDown() + + def test_move_message_without_args(self): + with self.assertRaises(SystemExit) as cm: + main([]) + + self.assertEqual(cm.exception.code, 2) + + @patch("phoenix_letter.common.credentials.getpass") + def test_move_message(self, mock_get_pass): + self.add_message(self.queue_a_url) + + result = main(self.args) + + self.assertEquals(result, ReasonStopEnum.EMPTY_RECEIVED) + mock_get_pass.assert_not_called() + + dst_message = self.sqs.receive_message(QueueUrl=self.queue_b_url, + MessageAttributeNames=["All"], + AttributeNames=['All'], + MaxNumberOfMessages=10) + + self.assertIsNotNone(dst_message) + + self.assertIn("Messages", dst_message) + self.assertTrue(len(dst_message["Messages"]) == 1) + + first_message = dst_message["Messages"][0] + self.assertEqual(first_message["Body"], json.dumps(dict(test="This is a test"))) + + msg_attributes = first_message["MessageAttributes"] + self.assertIn("Attribute1", msg_attributes) + self.assertIn("Attribute2", msg_attributes) + + self.assertEqual(msg_attributes["Attribute1"]["StringValue"], "Attribute Value") + self.assertEqual(msg_attributes["Attribute1"]["DataType"], "String") + self.assertEqual(msg_attributes["Attribute2"]["StringValue"], "Attribute 2 Value") + self.assertEqual(msg_attributes["Attribute2"]["DataType"], "String") + + mock_get_pass.reset_mock() + + @patch("phoenix_letter.common.credentials.getpass") + def test_move_message_empty(self, mock_get_pass): + result = main(self.args) + + self.assertEquals(result, ReasonStopEnum.EMPTY_RECEIVED) + + mock_get_pass.assert_not_called() + mock_get_pass.reset_mock() diff --git a/tests/test_move_messages.py b/tests/test_move_messages.py index b01e630..41db8fa 100644 --- a/tests/test_move_messages.py +++ b/tests/test_move_messages.py @@ -1,6 +1,9 @@ import json + import six +from phoenix_letter.common.enums import ReasonStopEnum + if six.PY2: from mock import patch else: @@ -14,149 +17,64 @@ @mock_sqs +@patch("phoenix_letter.common.credentials.getpass") class MoveMessagesTestCase(BaseTestCase): def setUp(self): - self.region = "us-east-1" - - self.access_key = "AWS_ACCESS_MOCKED_KEY" - self.secret_key = "AWS_SECRET_MOCKED_KEY" - - self.args = list() - - self.args.append("--src") - self.args.append("queue_a") - - self.args.append("--dst") - self.args.append("queue_b") + super(MoveMessagesTestCase, self).setUp() self.args.append("--aws-keys") - self.args.append("--region") - self.args.append(self.region) - - self.args.append("--empty-receive") - self.args.append("2") + def tearDown(self): + super(MoveMessagesTestCase, self).tearDown() - self.sqs = boto3.client("sqs", region_name=self.region, - aws_access_key_id=self.access_key, - aws_secret_access_key=self.secret_key) + def test_move_message_without_args(self, mock_get_pass): + with self.assertRaises(SystemExit) as cm: + main([]) - self.sqs.create_queue(QueueName="queue_a") - self.queue_a_url = self.sqs.get_queue_url(QueueName="queue_a")['QueueUrl'] + self.assertEqual(cm.exception.code, 2) - message = self.__create_message() + def test_move_message_with_empty_queue(self, mock_get_pass): + mock_get_pass.side_effect = [self.access_key, self.secret_key] * 2 - self.sqs.send_message(QueueUrl=self.queue_a_url, - MessageBody=message['Body'], - MessageAttributes=message['MessageAttributes']) + self._clean_queues([self.queue_a_url, self.queue_b_url]) + result = main(self.args) - self.sqs.create_queue(QueueName="queue_b") - self.queue_b_url = self.sqs.get_queue_url(QueueName="queue_b")['QueueUrl'] + self.assertEquals(result, ReasonStopEnum.EMPTY_RECEIVED) - def tearDown(self): - self.sqs.purge_queue(QueueUrl=self.queue_a_url) - self.sqs.purge_queue(QueueUrl=self.queue_b_url) + self.assertEqual(mock_get_pass.call_count, 2) + mock_get_pass.reset_mock() - @patch("phoenix_letter.main.getpass") def test_move_message_with_aws_key(self, mock_get_pass): mock_get_pass.side_effect = [self.access_key, self.secret_key] * 2 - with self.subTest("move_message_without_args"): - with self.assertRaises(SystemExit) as cm: - main([]) - - self.assertEqual(cm.exception.code, 2) - with self.subTest("move_message_empty"): - main(self.args) - - self.assertEqual(mock_get_pass.call_count, 2) - mock_get_pass.reset_mock() - with self.subTest("move_message"): - main(self.args) - - self.assertEquals(mock_get_pass.call_count, 2) - - dst_message = self.sqs.receive_message(QueueUrl=self.queue_b_url, - MessageAttributeNames=["All"], - AttributeNames=['All'], - MaxNumberOfMessages=10) - - self.assertIsNotNone(dst_message) - self.assertIn("Messages", dst_message) - self.assertTrue(len(dst_message["Messages"]) == 1) - - first_message = dst_message["Messages"][0] - self.assertEqual(first_message["Body"], json.dumps(dict(test="This is a test"))) - - msg_attributes = first_message["MessageAttributes"] - self.assertIn("Attribute1", msg_attributes) - self.assertIn("Attribute2", msg_attributes) - - self.assertEqual(msg_attributes["Attribute1"]["StringValue"], "Attribute Value") - self.assertEqual(msg_attributes["Attribute1"]["DataType"], "String") - - self.assertEqual(msg_attributes["Attribute2"]["StringValue"], "Attribute 2 Value") - self.assertEqual(msg_attributes["Attribute2"]["DataType"], "String") - mock_get_pass.reset_mock() - - @patch("phoenix_letter.main.getpass") - def test_move_message_without_aws_key(self, mock_get_pass): - self.args.remove("--aws-keys") - - with self.subTest("move_message_without_args"): - with self.assertRaises(SystemExit) as cm: - main([]) - - self.assertEqual(cm.exception.code, 2) - with self.subTest("move_message_empty"): - main(self.args) - - mock_get_pass.assert_not_called() - mock_get_pass.reset_mock() - with self.subTest("move_message"): - main(self.args) - - mock_get_pass.assert_not_called() - - dst_message = self.sqs.receive_message(QueueUrl=self.queue_b_url, - MessageAttributeNames=["All"], - AttributeNames=['All'], - MaxNumberOfMessages=10) + self.add_message(self.queue_a_url) - self.assertIsNotNone(dst_message) - self.assertIn("Messages", dst_message) - self.assertTrue(len(dst_message["Messages"]) == 1) + result = main(self.args) - first_message = dst_message["Messages"][0] - self.assertEqual(first_message["Body"], json.dumps(dict(test="This is a test"))) + self.assertEquals(result, ReasonStopEnum.EMPTY_RECEIVED) - msg_attributes = first_message["MessageAttributes"] - self.assertIn("Attribute1", msg_attributes) - self.assertIn("Attribute2", msg_attributes) + self.assertEquals(mock_get_pass.call_count, 2) - self.assertEqual(msg_attributes["Attribute1"]["StringValue"], "Attribute Value") - self.assertEqual(msg_attributes["Attribute1"]["DataType"], "String") + dst_message = self.sqs.receive_message(QueueUrl=self.queue_b_url, + MessageAttributeNames=["All"], + AttributeNames=['All'], + MaxNumberOfMessages=10) - self.assertEqual(msg_attributes["Attribute2"]["StringValue"], "Attribute 2 Value") - self.assertEqual(msg_attributes["Attribute2"]["DataType"], "String") - mock_get_pass.reset_mock() + self.assertIsNotNone(dst_message) + self.assertIn("Messages", dst_message) + self.assertTrue(len(dst_message["Messages"]) == 1) - def __create_message(self): - message = dict() - message["Body"] = json.dumps(dict(test="This is a test")) + first_message = dst_message["Messages"][0] + self.assertEqual(first_message["Body"], json.dumps(dict(test="This is a test"))) - message_attr = { - 'Attribute1': { - 'StringValue': 'Attribute Value', - 'DataType': 'String' - }, - 'Attribute2': { - 'StringValue': 'Attribute 2 Value', - 'DataType': 'String' - }, - } + msg_attributes = first_message["MessageAttributes"] + self.assertIn("Attribute1", msg_attributes) + self.assertIn("Attribute2", msg_attributes) - message["MessageAttributes"] = message_attr + self.assertEqual(msg_attributes["Attribute1"]["StringValue"], "Attribute Value") + self.assertEqual(msg_attributes["Attribute1"]["DataType"], "String") - return message + self.assertEqual(msg_attributes["Attribute2"]["StringValue"], "Attribute 2 Value") + self.assertEqual(msg_attributes["Attribute2"]["DataType"], "String") + mock_get_pass.reset_mock()