diff --git a/bookops_worldcat/authorize.py b/bookops_worldcat/authorize.py index 3943c50..490cb0a 100644 --- a/bookops_worldcat/authorize.py +++ b/bookops_worldcat/authorize.py @@ -6,13 +6,11 @@ import datetime import sys -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, Optional, Tuple, Union import requests -from requests import Response - -from . import __title__, __version__ # type: ignore +from . import __title__, __version__ from .errors import WorldcatAuthorizationError @@ -23,13 +21,14 @@ class WorldcatAccessToken: Explicit Authorization Code and Refresh Token flows. Token with correctly bonded scopes can then be passed into a session of particular web service to authorize requests for resources. - More on OCLC's web services authorization: - https://www.oclc.org/developer/develop/authentication/oauth/client-credentials-grant.en.html + More on OCLC's client credentials grant: + https://www.oclc.org/developer/api/keys/oauth/client-credentials-grant.en.html Args: key: your WSKey public client_id secret: your WSKey secret - scopes: request scopes for the access token + scopes: request scopes for the access token as a string, + separate different scopes with space principal_id: principalID (required for read/write endpoints) principal_idns: principalIDNS (required for read/write endpoints) agent: "User-agent" parameter to be passed in the request @@ -75,10 +74,10 @@ def __init__( self, key: str, secret: str, - scopes: Union[str, List[str]], + scopes: str, principal_id: str, principal_idns: str, - agent: Optional[str] = None, + agent: str = "", timeout: Optional[ Union[int, float, Tuple[int, int], Tuple[float, float]] ] = None, @@ -93,51 +92,49 @@ def __init__( self.principal_idns = principal_idns self.scopes = scopes self.secret = secret - self.server_response = None + self.server_response: Optional[requests.Response] = None self.timeout = timeout - self.token_expires_at = None - self.token_str = None - self.token_type = None + self.token_expires_at = "" + self.token_str = "" + self.token_type = "" # default bookops-worldcat request header - if self.agent is None: + if not self.agent: self.agent = f"{__title__}/{__version__}" else: - if type(self.agent) is not str: + if not isinstance(self.agent, str): raise WorldcatAuthorizationError("Argument 'agent' must be a string.") # asure passed arguments are valid if not self.key: raise WorldcatAuthorizationError("Argument 'key' is required.") else: - if type(self.key) is not str: + if not isinstance(self.key, str): raise WorldcatAuthorizationError("Argument 'key' must be a string.") if not self.secret: raise WorldcatAuthorizationError("Argument 'secret' is required.") else: - if type(self.secret) is not str: + if not isinstance(self.secret, str): raise WorldcatAuthorizationError("Argument 'secret' must be a string.") if not self.principal_id: raise WorldcatAuthorizationError( - "Argument 'principal_id' is required for read/write endpoint of Metadata API." + "Argument 'principal_id' is required for read/write endpoint of " + "Metadata API." ) if not self.principal_idns: raise WorldcatAuthorizationError( - "Argument 'principal_idns' is required for read/write endpoint of Metadata API." + "Argument 'principal_idns' is required for read/write endpoint of " + "Metadata API." ) # validate passed scopes - if type(self.scopes) is list: - self.scopes = " ".join(self.scopes) - elif type(self.scopes) is not str: - raise WorldcatAuthorizationError( - "Argument 'scopes' must a string or a list." - ) - self.scopes = self.scopes.strip() # type: ignore - if self.scopes == "": - raise WorldcatAuthorizationError("Argument 'scope' is missing.") + if not self.scopes: + raise WorldcatAuthorizationError("Argument 'scopes' is required.") + elif not isinstance(self.scopes, str): + raise WorldcatAuthorizationError("Argument 'scopes' must a string.") + self.scopes = self.scopes.strip() # assign default value for timout if not self.timeout: @@ -166,12 +163,12 @@ def _hasten_expiration_time(self, utc_stamp_str: str) -> str: ) - datetime.timedelta(seconds=1) return datetime.datetime.strftime(utcstamp, "%Y-%m-%d %H:%M:%SZ") - def _parse_server_response(self, response: Response) -> None: + def _parse_server_response(self, response: requests.Response) -> None: """Parses authorization server response""" - self.server_response = response # type: ignore + self.server_response = response if response.status_code == requests.codes.ok: self.token_str = response.json()["access_token"] - self.token_expires_at = self._hasten_expiration_time( # type: ignore + self.token_expires_at = self._hasten_expiration_time( response.json()["expires_at"] ) self.token_type = response.json()["token_type"] @@ -182,12 +179,12 @@ def _payload(self) -> Dict[str, str]: """Preps requests params""" return { "grant_type": self.grant_type, - "scope": self.scopes, # type: ignore + "scope": self.scopes, "principalID": self.principal_id, "principalIDNS": self.principal_idns, } - def _post_token_request(self) -> Response: + def _post_token_request(self) -> requests.Response: """ Fetches Worldcat access token for specified scope (web service) @@ -222,7 +219,7 @@ def _request_token(self): self._parse_server_response(response) def _token_headers(self) -> Dict[str, str]: - return {"User-Agent": self.agent, "Accept": "application/json"} # type: ignore + return {"User-Agent": self.agent, "Accept": "application/json"} def _token_url(self) -> str: return f"{self.oauth_server}/token" @@ -240,7 +237,7 @@ def is_expired(self) -> bool: """ try: if ( - datetime.datetime.strptime(self.token_expires_at, "%Y-%m-%d %H:%M:%SZ") # type: ignore + datetime.datetime.strptime(self.token_expires_at, "%Y-%m-%d %H:%M:%SZ") < datetime.datetime.utcnow() ): return True diff --git a/tests/test_authorize.py b/tests/test_authorize.py index e440d65..66faf21 100644 --- a/tests/test_authorize.py +++ b/tests/test_authorize.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import datetime +from contextlib import nullcontext as does_not_raise import os import pytest @@ -156,22 +157,22 @@ def test_principal_idns_exception(self, arg, expectation, msg): ( None, pytest.raises(WorldcatAuthorizationError), - "Argument 'scope' must a string or a list.", + "Argument 'scopes' must a string.", ), ( 123, pytest.raises(WorldcatAuthorizationError), - "Argument 'scope' must a string or a list.", + "Argument 'scopes' must a string.", ), ( " ", pytest.raises(WorldcatAuthorizationError), - "Argument 'scope' is missing.", + "Argument 'scopes' is required.", ), ( ["", ""], pytest.raises(WorldcatAuthorizationError), - "Argument 'scope' is missing.", + "Argument 'scopes' is required.", ), ], ) @@ -211,7 +212,7 @@ def test_timeout_argument( @pytest.mark.parametrize( "argm,expectation", - [("scope1", "scope1"), (["scope1", "scope2"], "scope1 scope2")], + [("scope1 ", "scope1"), (" scope1 scope2 ", "scope1 scope2")], ) def test_scope_manipulation( self, argm, expectation, mock_successful_post_token_response @@ -436,5 +437,7 @@ def test_post_token_request_with_live_service(self, live_keys): assert sorted(params) == sorted(response.keys()) # test if token looks right - assert token.token_str is not None + assert token.token_str.startswith("tk_") assert token.is_expired() is False + with does_not_raise(): + datetime.datetime.strptime(token.token_expires_at, "%Y-%m-%d %H:%M:%SZ")