Skip to content

Commit

Permalink
Modify Manifester class to work with MockStub
Browse files Browse the repository at this point in the history
Add a class to test_manifester.py to enable requests to be sent to a
MockStub version of the requests package instead of sending live
requests to the API. Modify the Manifester class to use the
MockStub-based requests when running unit tests. Add a helper function
to generate mock HTTP response codes.
  • Loading branch information
synkd committed Jun 1, 2022
1 parent 31406b5 commit 4973169
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 12 deletions.
47 changes: 47 additions & 0 deletions manifester/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import random
import time

from collections import UserDict
from logzero import logger


Expand All @@ -12,6 +14,7 @@ def simple_retry(cmd, cmd_args=None, cmd_kwargs=None, max_timeout=240, _cur_time
# with caution as some data (notably the offline token) should be treated as a secret.
logger.debug(f"Sending request to endpoint {cmd_args}")
response = cmd(*cmd_args, **cmd_kwargs)
breakpoint()
logger.debug(f"Response status code is {response.status_code}")
if response.status_code in [429, 500, 504]:
new_wait = _cur_timeout * 2
Expand All @@ -21,3 +24,47 @@ def simple_retry(cmd, cmd_args=None, cmd_kwargs=None, max_timeout=240, _cur_time
time.sleep(_cur_timeout)
response = simple_retry(cmd, cmd_args, cmd_kwargs, max_timeout, new_wait)
return response

def fake_http_response_code(good_codes=None, bad_codes=None, fail_rate=20):
# randomish = random.random()
# print(randomish, fail_rate/100)
if random.random() > (fail_rate / 100):
return random.choice(good_codes)
else:
return random.choice(bad_codes)


class MockStub(UserDict):
"""Test helper class. Allows for both arbitrary mocking and stubbing"""

def __init__(self, in_dict=None):
"""Initialize the class and all nested dictionaries"""
if in_dict is None:
in_dict = {}
for key, value in in_dict.items():
if isinstance(value, dict):
setattr(self, key, MockStub(value))
elif type(value) in (list, tuple):
setattr(
self,
key,
[MockStub(x) if isinstance(x, dict) else x for x in value],
)
else:
setattr(self, key, value)
super().__init__(in_dict)

def __getattr__(self, name):
return self

def __getitem__(self, key):
if isinstance(key, str):
item = getattr(self, key, self)
try:
item = super().__getitem__(key)
except KeyError:
item = self
return item

def __call__(self, *args, **kwargs):
return self
27 changes: 15 additions & 12 deletions manifester/manifester.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import string
from pathlib import Path

import requests
from logzero import logger

from manifester.helpers import simple_retry
Expand All @@ -13,7 +12,11 @@
class Manifester:
def __init__(self, manifest_category, allocation_name=None, **kwargs):
self.allocation_name = allocation_name or "".join(random.sample(string.ascii_letters, 10))
# self.manifest_name = kwargs.get("manifest_name")
if kwargs.get("requester") is not None:
self.requester = kwargs["requester"]
else:
import requests
self.requester = requests
self.offline_token = kwargs.get("offline_token", settings.offline_token)
manifest_data = settings.manifest_category.get(manifest_category)
self.subscription_data = manifest_data.subscription_data
Expand All @@ -33,7 +36,7 @@ def access_token(self):
token_request_data = {"data": self.token_request_data}
logger.debug("Generating access token")
token_data = simple_retry(
requests.post,
self.requester.post,
cmd_args=["https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token"],
cmd_kwargs=token_request_data,
).json()
Expand All @@ -50,7 +53,7 @@ def create_subscription_allocation(self):
},
}
self.allocation = simple_retry(
requests.post,
self.requester.post,
cmd_args=["https://api.access.redhat.com/management/v1/allocations"],
cmd_kwargs=allocation_data,
).json()
Expand All @@ -69,7 +72,7 @@ def subscription_pools(self):
"params": {"offset": _offset},
}
self._subscription_pools = simple_retry(
requests.get,
self.requester.get,
cmd_args=[f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}/pools"],
cmd_kwargs=data,
).json()
Expand All @@ -85,7 +88,7 @@ def subscription_pools(self):
"params": {"offset": _offset},
}
offset_pools = simple_retry(
requests.get,
self.requester.get,
cmd_args=[f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}/pools"],
cmd_kwargs=data,
).json()
Expand All @@ -101,7 +104,7 @@ def add_entitlements_to_allocation(self, pool_id, entitlement_quantity):
"params": {"pool": f"{pool_id}", "quantity": f"{entitlement_quantity}"},
}
add_entitlements = simple_retry(
requests.post,
self.requester.post,
cmd_args=[f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}/entitlements"],
cmd_kwargs=data,
)
Expand All @@ -114,7 +117,7 @@ def verify_allocation_entitlements(self, entitlement_quantity, subscription_name
"params": {"include": "entitlements"},
}
self.entitlement_data = simple_retry(
requests.get,
self.requester.get,
cmd_args=[f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}"],
cmd_kwargs=data,
).json()
Expand Down Expand Up @@ -193,22 +196,22 @@ def trigger_manifest_export(self):
local_file.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Triggering manifest export job for subscription allocation {self.allocation_name}")
trigger_export_job = simple_retry(
requests.get,
self.requester.get,
cmd_args=[
f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}/export"
],
cmd_kwargs=headers,
).json()
export_job_id = trigger_export_job["body"]["exportJobID"]
export_job = simple_retry(
requests.get,
self.requester.get,
cmd_args=[f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}/exportJob/{export_job_id}"],
cmd_kwargs=headers,
)
request_count = 1
while export_job.status_code != 200:
export_job = simple_retry(
requests.get,
self.requester.get,
cmd_args=[f"https://api.access.redhat.com/management/v1/allocations/{self.allocation_uuid}/exportJob/{export_job_id}"],
cmd_kwargs=headers,
)
Expand All @@ -227,7 +230,7 @@ def trigger_manifest_export(self):
export_job = export_job.json()
export_href = export_job["body"]["href"]
manifest = simple_retry(
requests.get,
self.requester.get,
cmd_args=[f"{export_href}"],
cmd_kwargs=headers,
)
Expand Down
52 changes: 52 additions & 0 deletions tests/test_manifester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from unittest.mock import Mock

from requests import request
from manifester import Manifester
from manifester.settings import settings
from manifester.helpers import MockStub, fake_http_response_code
import pytest
import random

def test_empty_init(manifest_category="golden_ticket"):
manifester_inst = Manifester(manifest_category=manifest_category)
assert isinstance(manifester_inst, Manifester)

class RhsmApiStub(MockStub):
def __init__(self, in_dict=None, **kwargs):
self._good_codes = kwargs.get("good_codes", [200])
self._bad_codes = kwargs.get("bad_codes", [429, 500, 504])
self._fail_rate = kwargs.get("fail_rate", 10)
super().__init__(in_dict)

@property
def status_code(self):
return fake_http_response_code(self._good_codes, self._bad_codes, self._fail_rate)

def post(*args, **kwargs):
if args[0].endswith("openid-connect/token"):
return MockStub(in_dict={"access_token": "this is a simulated access token"}, status_code=200)
if args[0].endswith("allocations"):
return MockStub(in_dict={"uuid": "1234567890"})
if args[0].endswith("entitlements"):
return MockStub(status_code=200)

def get(*args, **kwargs):
if args[0].endswith("pools"):
# question: how to fake > 50 pools to test use of offset parameter?
return MockStub(in_dict={"pool": "this is a simulated list of dictionaries of subscription pool data"})
if "allocations" in args[0] and not ("export" in args[0] or "pools" in args[0]):
return MockStub(in_dict={"allocation_data": "this allocation data also includes entitlement data"})
if args[0].endswith("export"):
return MockStub(in_dict={"export_job": "Manifest export job triggered successfully"})
if "exportJob" in args[0]:
responses = [202, 200]
return MockStub(status_code=random.choice(responses))
if "export" in args[0] and not args[0].endswith("export"):
return MockStub(in_dict={"content": "this is a simulated manifest"})


def test_create_allocation():
manifester = Manifester(manifest_category="golden_ticket", requester=RhsmApiStub(in_dict=None, status_code=200))
allocation_uuid = manifester.create_subscription_allocation()
breakpoint()
assert allocation_uuid == "1234567890"

0 comments on commit 4973169

Please sign in to comment.