Skip to content

Commit

Permalink
scopes as str & types cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
klinga committed Feb 6, 2024
1 parent 07af406 commit 9c4630b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
67 changes: 32 additions & 35 deletions bookops_worldcat/authorize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@

import datetime
import sys
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, Optional, Tuple, Union

import requests
from requests import Response


from . import __title__, __version__ # type: ignore
from . import __title__, __version__
from .errors import WorldcatAuthorizationError


Expand All @@ -23,13 +21,14 @@ class WorldcatAccessToken:
Explicit Authorization Code and Refresh Token flows. Token with correctly
bonded scopes can then be passed into a session of particular web service
to authorize requests for resources.
More on OCLC's web services authorization:
https://www.oclc.org/developer/develop/authentication/oauth/client-credentials-grant.en.html
More on OCLC's client credentials grant:
https://www.oclc.org/developer/api/keys/oauth/client-credentials-grant.en.html
Args:
key: your WSKey public client_id
secret: your WSKey secret
scopes: request scopes for the access token
scopes: request scopes for the access token as a string,
separate different scopes with space
principal_id: principalID (required for read/write endpoints)
principal_idns: principalIDNS (required for read/write endpoints)
agent: "User-agent" parameter to be passed in the request
Expand Down Expand Up @@ -75,10 +74,10 @@ def __init__(
self,
key: str,
secret: str,
scopes: Union[str, List[str]],
scopes: str,
principal_id: str,
principal_idns: str,
agent: Optional[str] = None,
agent: str = "",
timeout: Optional[
Union[int, float, Tuple[int, int], Tuple[float, float]]
] = None,
Expand All @@ -93,51 +92,49 @@ def __init__(
self.principal_idns = principal_idns
self.scopes = scopes
self.secret = secret
self.server_response = None
self.server_response: Optional[requests.Response] = None
self.timeout = timeout
self.token_expires_at = None
self.token_str = None
self.token_type = None
self.token_expires_at = ""
self.token_str = ""
self.token_type = ""

# default bookops-worldcat request header
if self.agent is None:
if not self.agent:
self.agent = f"{__title__}/{__version__}"
else:
if type(self.agent) is not str:
if not isinstance(self.agent, str):
raise WorldcatAuthorizationError("Argument 'agent' must be a string.")

# asure passed arguments are valid
if not self.key:
raise WorldcatAuthorizationError("Argument 'key' is required.")
else:
if type(self.key) is not str:
if not isinstance(self.key, str):
raise WorldcatAuthorizationError("Argument 'key' must be a string.")

if not self.secret:
raise WorldcatAuthorizationError("Argument 'secret' is required.")
else:
if type(self.secret) is not str:
if not isinstance(self.secret, str):
raise WorldcatAuthorizationError("Argument 'secret' must be a string.")

if not self.principal_id:
raise WorldcatAuthorizationError(
"Argument 'principal_id' is required for read/write endpoint of Metadata API."
"Argument 'principal_id' is required for read/write endpoint of "
"Metadata API."
)
if not self.principal_idns:
raise WorldcatAuthorizationError(
"Argument 'principal_idns' is required for read/write endpoint of Metadata API."
"Argument 'principal_idns' is required for read/write endpoint of "
"Metadata API."
)

# validate passed scopes
if type(self.scopes) is list:
self.scopes = " ".join(self.scopes)
elif type(self.scopes) is not str:
raise WorldcatAuthorizationError(
"Argument 'scopes' must a string or a list."
)
self.scopes = self.scopes.strip() # type: ignore
if self.scopes == "":
raise WorldcatAuthorizationError("Argument 'scope' is missing.")
if not self.scopes:
raise WorldcatAuthorizationError("Argument 'scopes' is required.")
elif not isinstance(self.scopes, str):
raise WorldcatAuthorizationError("Argument 'scopes' must a string.")
self.scopes = self.scopes.strip()

# assign default value for timout
if not self.timeout:
Expand Down Expand Up @@ -166,12 +163,12 @@ def _hasten_expiration_time(self, utc_stamp_str: str) -> str:
) - datetime.timedelta(seconds=1)
return datetime.datetime.strftime(utcstamp, "%Y-%m-%d %H:%M:%SZ")

def _parse_server_response(self, response: Response) -> None:
def _parse_server_response(self, response: requests.Response) -> None:
"""Parses authorization server response"""
self.server_response = response # type: ignore
self.server_response = response
if response.status_code == requests.codes.ok:
self.token_str = response.json()["access_token"]
self.token_expires_at = self._hasten_expiration_time( # type: ignore
self.token_expires_at = self._hasten_expiration_time(
response.json()["expires_at"]
)
self.token_type = response.json()["token_type"]
Expand All @@ -182,12 +179,12 @@ def _payload(self) -> Dict[str, str]:
"""Preps requests params"""
return {
"grant_type": self.grant_type,
"scope": self.scopes, # type: ignore
"scope": self.scopes,
"principalID": self.principal_id,
"principalIDNS": self.principal_idns,
}

def _post_token_request(self) -> Response:
def _post_token_request(self) -> requests.Response:
"""
Fetches Worldcat access token for specified scope (web service)
Expand Down Expand Up @@ -222,7 +219,7 @@ def _request_token(self):
self._parse_server_response(response)

def _token_headers(self) -> Dict[str, str]:
return {"User-Agent": self.agent, "Accept": "application/json"} # type: ignore
return {"User-Agent": self.agent, "Accept": "application/json"}

def _token_url(self) -> str:
return f"{self.oauth_server}/token"
Expand All @@ -240,7 +237,7 @@ def is_expired(self) -> bool:
"""
try:
if (
datetime.datetime.strptime(self.token_expires_at, "%Y-%m-%d %H:%M:%SZ") # type: ignore
datetime.datetime.strptime(self.token_expires_at, "%Y-%m-%d %H:%M:%SZ")
< datetime.datetime.utcnow()
):
return True
Expand Down
15 changes: 9 additions & 6 deletions tests/test_authorize.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

import datetime
from contextlib import nullcontext as does_not_raise
import os

import pytest
Expand Down Expand Up @@ -156,22 +157,22 @@ def test_principal_idns_exception(self, arg, expectation, msg):
(
None,
pytest.raises(WorldcatAuthorizationError),
"Argument 'scope' must a string or a list.",
"Argument 'scopes' must a string.",
),
(
123,
pytest.raises(WorldcatAuthorizationError),
"Argument 'scope' must a string or a list.",
"Argument 'scopes' must a string.",
),
(
" ",
pytest.raises(WorldcatAuthorizationError),
"Argument 'scope' is missing.",
"Argument 'scopes' is required.",
),
(
["", ""],
pytest.raises(WorldcatAuthorizationError),
"Argument 'scope' is missing.",
"Argument 'scopes' is required.",
),
],
)
Expand Down Expand Up @@ -211,7 +212,7 @@ def test_timeout_argument(

@pytest.mark.parametrize(
"argm,expectation",
[("scope1", "scope1"), (["scope1", "scope2"], "scope1 scope2")],
[("scope1 ", "scope1"), (" scope1 scope2 ", "scope1 scope2")],
)
def test_scope_manipulation(
self, argm, expectation, mock_successful_post_token_response
Expand Down Expand Up @@ -436,5 +437,7 @@ def test_post_token_request_with_live_service(self, live_keys):
assert sorted(params) == sorted(response.keys())

# test if token looks right
assert token.token_str is not None
assert token.token_str.startswith("tk_")
assert token.is_expired() is False
with does_not_raise():
datetime.datetime.strptime(token.token_expires_at, "%Y-%m-%d %H:%M:%SZ")

0 comments on commit 9c4630b

Please sign in to comment.