diff --git a/flask_jwt_router/__init__.py b/flask_jwt_router/__init__.py index 32366fc..3cf5772 100644 --- a/flask_jwt_router/__init__.py +++ b/flask_jwt_router/__init__.py @@ -1,7 +1,10 @@ """ Welcome To Flask-JWT-Router """ -from ._jwt_routes import JwtRoutes +from ._jwt_routes import JwtRoutes, BaseJwtRoutes +from .oauth2.google import Google +from .oauth2.google_test_util import GoogleTestUtil +from ._routing import TestRoutingMixin if __name__ == "__main__": diff --git a/flask_jwt_router/_jwt_routes.py b/flask_jwt_router/_jwt_routes.py index b048f45..f9efda5 100644 --- a/flask_jwt_router/_jwt_routes.py +++ b/flask_jwt_router/_jwt_routes.py @@ -173,14 +173,14 @@ def login(): import logging from warnings import warn -from typing import List, Dict +from typing import List, Dict, Optional from ._config import Config from ._entity import BaseEntity, Entity, _ORMType -from ._routing import BaseRouting, Routing +from ._routing import BaseRouting, RoutingMixin from ._authentication import BaseAuthentication, Authentication from .oauth2.google import Google -from .oauth2._base import BaseOAuth +from .oauth2._base import BaseOAuth, TestBaseOAuth from .oauth2.http_requests import HttpRequests from .oauth2._urls import GOOGLE_OAUTH_URL @@ -190,10 +190,10 @@ def login(): EXPIRE_DEFAULT = 30 -class JwtRoutes: +class BaseJwtRoutes: """ - If there app is None then self.init_app(app=None, **kwargs) need to be called - inside the Flask app factory pattern. + If there app is None then self.init_app(app=None, **kwargs) need to be called + inside the Flask app factory pattern. :param app: Flask application instance :param kwargs: entity_model """ @@ -228,17 +228,23 @@ class JwtRoutes: #: Optional Google OAuth 2.0 Single Sign On. See :class:`~flask_jwt_router.oauth2.google`` #: for more information. - google: BaseOAuth + # google: BaseOAuth TODO remove #: Optional. See :class:`~flask_jwt_router.oauth2.google` - google_oauth: Dict + google_oauth: Dict # TODO needs to be a list + + #: Optional. A Lust of strategies to be implement in the routing + strategies: List[BaseOAuth] + + #: List of instantiated strategies + strategy_dict: Dict[str, BaseOAuth] = {} def __init__(self, app=None, **kwargs): self.entity_models = kwargs.get("entity_models") self.google_oauth = kwargs.get("google_oauth") + self.strategies = kwargs.get("strategies") self.config = Config() self.auth = Authentication() - self.google = Google(HttpRequests(GOOGLE_OAUTH_URL)) self.app = app if app: self.init_app(app, entity_models=self.entity_models) @@ -252,12 +258,17 @@ def init_app(self, app=None, **kwargs): self.app = app if app else self.app entity_models = self.entity_models or kwargs.get("entity_models") self.google_oauth = self.google_oauth or kwargs.get("google_oauth") + self.strategies = self.strategies or kwargs.get("strategies") or [] app_config = self.get_app_config(self.app) - if self.google_oauth: - self.google.init(**self.google_oauth) + if len(self.strategies): + for S in self.strategies: + strategy = S(HttpRequests(GOOGLE_OAUTH_URL)) + strategy.init(**self.google_oauth) + self.strategy_dict[strategy.__class__.__name__] = strategy + self.config.init_config(app_config, entity_models=entity_models, google_oauth=self.google_oauth) self.entity = Entity(self.config) - self.routing = Routing(self.app, self.config, self.entity, self.google) + self.routing.init(self.app, self.config, self.entity, self.strategy_dict) self.app.before_request(self.routing.before_middleware) if self.config.expire_days: self.exp = self.config.expire_days @@ -328,3 +339,17 @@ def encode_token(self, entity_id) -> str: self.config.entity_key = self.entity.get_attr_name() table_name = self.entity.get_entity_from_ext().__tablename__ return self.auth.encode_token(self.config, entity_id, self.exp, table_name) + + def get_strategy(self, name: str) -> Optional[BaseOAuth]: + """ + :param name: The name of the strategy + :return: + """ + try: + return self.strategy_dict[name] + except KeyError: + return None + + +class JwtRoutes(RoutingMixin, BaseJwtRoutes): + pass diff --git a/flask_jwt_router/_routing.py b/flask_jwt_router/_routing.py index 8fc983b..b419530 100644 --- a/flask_jwt_router/_routing.py +++ b/flask_jwt_router/_routing.py @@ -4,7 +4,8 @@ from abc import ABC, abstractmethod # pylint:disable=invalid-name import logging -from flask import request, abort, g +from typing import List, Optional, Dict +from flask import request, abort, g, Flask from werkzeug.routing import RequestRedirect from werkzeug.exceptions import MethodNotAllowed, NotFound from jwt.exceptions import InvalidTokenError @@ -12,19 +13,25 @@ from ._entity import BaseEntity -from .oauth2.google import Google +from .oauth2.google import BaseOAuth from ._config import Config +from flask_jwt_router.oauth2._base import BaseOAuth, TestBaseOAuth logger = logging.getLogger() class BaseRouting(ABC): # pylint:disable=missing-class-docstring + @abstractmethod - def before_middleware(self) -> None: + def handle_token(self) -> None: # pylint:disable=missing-function-docstring pass + @abstractmethod + def init(self, app, config: Config, entity: BaseEntity, strategy_dict: Dict[str, BaseOAuth] = None) -> None: + pass + class Routing(BaseRouting): """ @@ -32,12 +39,22 @@ class Routing(BaseRouting): :param config: :class:`~flask_jwt_router._config` :param entity: :class:`~flask_jwt_router._entity` """ - def __init__(self, app, config: Config, entity: BaseEntity, google: Google = None): + app: Flask + + config: Config + + logger: logging + + entity: BaseEntity + + strategy_dict: Dict[str, BaseOAuth] + + def init(self, app, config: Config, entity: BaseEntity, strategy_dict: Dict[str, BaseOAuth] = None) -> None: self.app = app self.config = config self.logger = logger self.entity = entity - self.google = google + self.strategy_dict = strategy_dict def _prefix_api_name(self, w_routes=None): """ @@ -158,32 +175,123 @@ def before_middleware(self) -> None: not_whitelist = self._allow_public_routes(white_routes) if not_whitelist: self.entity.clean_up() - self._handle_token() + self.handle_token() - def _handle_token(self): + def handle_token(self): """ Checks the headers contain a Bearer string OR params. Checks to see that the route is white listed. :return None: """ - entity = None + strategy: Optional[BaseOAuth] = None try: + # TODO update docs resource_headers = request.headers.get("X-Auth-Resource") oauth_headers = request.headers.get("X-Auth-Token") if request.args.get("auth"): token = request.args.get("auth") - elif oauth_headers is not None and self.google: + # Strategies --------------------------------------------------------- # + + elif oauth_headers is not None and len(self.strategy_dict.keys()): + for s in self.strategy_dict.values(): + if s.header_name == oauth_headers: + strategy = s + if not strategy: + abort(401) bearer = oauth_headers token = bearer.split("Bearer ")[1] if not token: abort(401) try: - if self.google.test_metadata: - email, entity = self.google._update_test_metadata(token) + # Currently token refreshing is not supported, so pass the current token through + auth_results = strategy.authorize(token) + email = auth_results["email"] + self.entity.oauth_entity_key = self.config.oauth_entity + if resource_headers: + # If multiple tables are used to look up against incoming oauth users + # then assign the value from the X-Auth-Resource headers as the entity table name. + entity = self.entity.get_entity_from_token_or_tablename( + tablename=resource_headers, + email_value=email, + ) + setattr(g, resource_headers, entity) else: - # Currently token refreshing is not supported, so pass the current token through - auth_results = self.google.authorize(token) - email = auth_results["email"] + # Attach the the entity using the table name as the attribute name to Flask's + # global context object. + entity = self.entity.get_entity_from_token_or_tablename( + tablename=strategy.tablename, + email_value=email, + ) + setattr(g, self.entity.get_entity_from_ext().__tablename__, entity) + setattr(g, "access_token", token) + return None + except InvalidTokenError: + return abort(401) + except AttributeError: + return abort(401) + except TypeError: + # This is raised from auth_results["email"] not present + abort(401) + else: + # Basic Auth --------------------------------------------------------- # + # Sometimes a developer may define the auth field name as Bearer or Basic + auth_header = request.headers.get("Authorization") + if not auth_header: + abort(401) + if "Bearer " in auth_header: + token = auth_header.split("Bearer ")[1] + elif "Basic " in auth_header: + token = auth_header.split("Basic ")[1] + except AttributeError: + return abort(401) + try: + decoded_token = jwt.decode( + token, + self.config.secret_key, + algorithms="HS256" + ) + self.entity_key = self.config.entity_key + entity = self.entity.get_entity_from_token_or_tablename(decoded_token) + setattr(g, self.entity.get_entity_from_ext().__tablename__, entity) + return None + except ValueError: + return abort(401) + except InvalidTokenError: + return abort(401) + + +class _TestMixin(Routing): + + strategies: List[TestBaseOAuth] + + def __init__(self): + super(_TestMixin, self).__init__() + + def handle_token(self): + + strategy: Optional[TestBaseOAuth] = None + + try: + resource_headers = request.headers.get("X-Auth-Resource") + oauth_headers = request.headers.get("X-Auth-Token") + if request.args.get("auth"): + super(_TestMixin, self).handle_token() + # Strategies --------------------------------------------------------- # + elif oauth_headers is not None and len(self.strategy_dict.keys()): + for s in self.strategy_dict.values(): + if s.header_name == "X-Auth-Token": + strategy = s + if not strategy: + abort(401) + bearer = oauth_headers + token = bearer.split("Bearer ")[1] + if not token: + abort(401) + try: + if not strategy.test_metadata: + raise Exception("You didn't create your test headers with create_test_headers()") + email, entity = strategy.update_test_metadata(token) + self.entity.oauth_entity_key = self.config.oauth_entity if not entity: if resource_headers: @@ -199,7 +307,7 @@ def _handle_token(self): # Attach the the entity using the table name as the attribute name to Flask's # global context object. entity = self.entity.get_entity_from_token_or_tablename( - tablename=self.google.tablename, + tablename=strategy.tablename, email_value=email, ) setattr(g, self.entity.get_entity_from_ext().__tablename__, entity) @@ -208,7 +316,7 @@ def _handle_token(self): setattr(g, entity.__tablename__, entity) setattr(g, "access_token", token) # Clean up google test util - self.google.tear_down() + strategy.tear_down() return None except InvalidTokenError: return abort(401) @@ -218,28 +326,14 @@ def _handle_token(self): # This is raised from auth_results["email"] not present abort(401) else: - # Sometimes a developer may define the auth field name as Bearer or Basic - auth_header = request.headers.get("Authorization") - if not auth_header: - abort(401) - if "Bearer " in auth_header: - token = auth_header.split("Bearer ")[1] - elif "Basic " in auth_header: - token = auth_header.split("Basic ")[1] + super(_TestMixin, self).handle_token() except AttributeError: return abort(401) - try: - decoded_token = jwt.decode( - token, - self.config.secret_key, - algorithms="HS256" - ) - except InvalidTokenError: - return abort(401) - try: - self.entity_key = self.config.entity_key - entity = self.entity.get_entity_from_token_or_tablename(decoded_token) - setattr(g, self.entity.get_entity_from_ext().__tablename__, entity) - return None - except ValueError: - return abort(401) + + +class RoutingMixin: + routing = Routing() + + +class TestRoutingMixin: + routing = _TestMixin() diff --git a/flask_jwt_router/oauth2/_base.py b/flask_jwt_router/oauth2/_base.py index af9663f..8834e04 100644 --- a/flask_jwt_router/oauth2/_base.py +++ b/flask_jwt_router/oauth2/_base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, Tuple class _FlaskRequestType(ABC): @@ -14,6 +14,10 @@ def get_json() -> Dict: class BaseOAuth(ABC): + header_name: str + + tablename: str + @abstractmethod def init(self, *, client_id, client_secret, redirect_uri, expires_in, email_field, tablename) -> None: pass @@ -22,6 +26,22 @@ def init(self, *, client_id, client_secret, redirect_uri, expires_in, email_fiel def oauth_login(self, request: _FlaskRequestType) -> Dict: pass + @abstractmethod + def authorize(self, token: str) -> Dict: + pass + + +class TestBaseOAuth(BaseOAuth, ABC): + test_metadata: Dict[str, Dict[str, str]] = {} + @abstractmethod def create_test_headers(self, *, email: str, entity: object = None, scope: str = "function") -> Dict[str, str]: pass + + @abstractmethod + def update_test_metadata(self, email: str) -> Tuple[str, object]: + pass + + @abstractmethod + def tear_down(self, *, scope: str = "function"): + pass diff --git a/flask_jwt_router/oauth2/google.py b/flask_jwt_router/oauth2/google.py index cc394c0..9a87c41 100644 --- a/flask_jwt_router/oauth2/google.py +++ b/flask_jwt_router/oauth2/google.py @@ -127,7 +127,7 @@ def test_blogs(client): _ = jwt_routes.google.create_test_headers(email="user@gmail.com", scope="application", entity=my_entity) """ -from typing import Dict, Optional, Tuple +from typing import Dict from .http_requests import HttpRequests from ._base import BaseOAuth, _FlaskRequestType @@ -135,6 +135,11 @@ def test_blogs(client): class Google(BaseOAuth): + # test_metadata: Dict[str, Dict[str, str]] = {} TODO remove + + #: The X header name used for the Google Auth2.0 Strategy (subsequent strategies should be "X--Token" + header_name = "X-Auth-Token" + #: As defined in https://tools.ietf.org/html/rfc6749#section-4.1.3 #: Value MUST be set to "authorization_code". grant_type = "authorization_code" @@ -170,8 +175,6 @@ class Google(BaseOAuth): _data: Dict - test_metadata: Dict[str, Dict[str, str]] = {} - _current_test_email: str = None def __init__(self, http): @@ -247,7 +250,7 @@ def oauth_login(self, request: _FlaskRequestType, **kwargs) -> Dict: } return res_data - def authorize(self, token: str): + def authorize(self, token: str) -> Dict: """ Call to a Google API to authenticate via access_token """ @@ -261,76 +264,3 @@ def _set_expires(self): :return: None """ self.expires_in = 3600 * 24 * 7 - - def create_test_headers(self, *, email: str, entity=None, scope="function") -> Dict[str, str]: - """ - :key email: SqlAlchemy object will be filtered against the email value. - :key entity: Optional. SqlAlchemy object if you prefer not to run a db in your tests. - :key scope: Optional. Default is *function*. Pass *application* if each unit test requires - more than one request to a Flask view handler. - - If you are running your tests against a test db then just pass in the `email` kwarg. - For example:: - - user_headers = jwt_routes.google.create_test_headers(email="user@gmail.com") - # user_headers: { "X-Auth-Token": "Bearer user@gmail.com" } - - If you are not running a db in your tests, then you can use the `entity` kwarg. - For example:: - - # user is an instantiated SqlAlchemy object - user_headers = jwt_routes.google.create_test_headers(email="user@gmail.com", entity=user) - # user_headers: { "X-Auth-Token": "Bearer user@gmail.com" } - - If you require more than one request to a Flask view handler in a single unit test, then set - the *scope* kwarg to **application**. - For example:: - - _ = jwt_routes.google.create_test_headers(email="user@gmail.com", scope="application") - - - :return: Python Dict containing header key value for OAuth routing with FJR - """ - _meta = { - "email": email, - "entity": entity, - "scope": scope, - } - self.test_metadata[f"{email}"] = _meta - - return { - "X-Auth-Token": f"Bearer {email}", - } - - def tear_down(self, *, scope: str = "function"): - """ - If you are setting the *scope* to *application* in :class:`~flask_jwt_router.google.create_test_headers` - then you may want to clean up the state outside or the teardown scope of your test runner - (unittest or pytest etc.). Calling *tear_down()* will clean up the authorised OAuth state. - For example:: - - @pytest.fixture() - def client(): - ... # See https://flask.palletsprojects.com/en/1.1.x/testing/ for details - jwt_routes.google.tear_down(scope="application") - - :key scope: Optional. Default is *function*. Set to "application" to teardown all oauth state - """ - if scope == "application": - self.test_metadata = {} - elif self._current_test_email in self.test_metadata: - if self.test_metadata[self._current_test_email].get("scope") != "application": - del self.test_metadata[self._current_test_email] - - def _update_test_metadata(self, email: str) -> Tuple[str, object]: - """ - Updates test_metadata from passed values to create_test_headers - :email: The email comes from the `Bearer ` token - :private: - :return: - """ - self._current_test_email = email - - email = self.test_metadata[email]["email"] - entity = self.test_metadata[email]["entity"] - return email, entity diff --git a/flask_jwt_router/oauth2/google_test_util.py b/flask_jwt_router/oauth2/google_test_util.py new file mode 100644 index 0000000..dd91fbc --- /dev/null +++ b/flask_jwt_router/oauth2/google_test_util.py @@ -0,0 +1,79 @@ +from ._base import BaseOAuth, _FlaskRequestType, TestBaseOAuth +from typing import Dict, Tuple +from flask_jwt_router.oauth2.google import Google + + +class GoogleTestUtil(Google, TestBaseOAuth): + + def create_test_headers(self, *, email: str, entity=None, scope="function") -> Dict[str, str]: + """ + :key email: SqlAlchemy object will be filtered against the email value. + :key entity: Optional. SqlAlchemy object if you prefer not to run a db in your tests. + :key scope: Optional. Default is *function*. Pass *application* if each unit test requires + more than one request to a Flask view handler. + + If you are running your tests against a test db then just pass in the `email` kwarg. + For example:: + + user_headers = jwt_routes.google.create_test_headers(email="user@gmail.com") + # user_headers: { "X-Auth-Token": "Bearer user@gmail.com" } + + If you are not running a db in your tests, then you can use the `entity` kwarg. + For example:: + + # user is an instantiated SqlAlchemy object + user_headers = jwt_routes.google.create_test_headers(email="user@gmail.com", entity=user) + # user_headers: { "X-Auth-Token": "Bearer user@gmail.com" } + + If you require more than one request to a Flask view handler in a single unit test, then set + the *scope* kwarg to **application**. + For example:: + + _ = jwt_routes.google.create_test_headers(email="user@gmail.com", scope="application") + + + :return: Python Dict containing header key value for OAuth routing with FJR + """ + _meta = { + "email": email, + "entity": entity, + "scope": scope, + } + self.test_metadata[f"{email}"] = _meta + + return { + "X-Auth-Token": f"Bearer {email}", + } + + def tear_down(self, *, scope: str = "function"): + """ + If you are setting the *scope* to *application* in :class:`~flask_jwt_router.google.create_test_headers` + then you may want to clean up the state outside or the teardown scope of your test runner + (unittest or pytest etc.). Calling *tear_down()* will clean up the authorised OAuth state. + For example:: + + @pytest.fixture() + def client(): + ... # See https://flask.palletsprojects.com/en/1.1.x/testing/ for details + jwt_routes.google.tear_down(scope="application") + + :key scope: Optional. Default is *function*. Set to "application" to teardown all oauth state + """ + if scope == "application": + self.test_metadata = {} + elif self._current_test_email in self.test_metadata: + if self.test_metadata[self._current_test_email].get("scope") != "application": + del self.test_metadata[self._current_test_email] + + def update_test_metadata(self, email: str) -> Tuple[str, object]: + """ + Updates test_metadata from passed values to create_test_headers + :email: The email comes from the `Bearer ` token + :private: + :return: + """ + self._current_test_email = email + + email = self.test_metadata[email]["email"] + entity = self.test_metadata[email]["entity"] + return email, entity diff --git a/flask_jwt_router/testing/__init__.py b/flask_jwt_router/testing/__init__.py new file mode 100644 index 0000000..e2964b0 --- /dev/null +++ b/flask_jwt_router/testing/__init__.py @@ -0,0 +1,23 @@ +""" + +jwt_routes = JwtRoutes(app, strategies=[Google]) + + +class BaseJwtRoutes: + pass + +class JwtRoutes(BaseJwtRoutes): + pass + + + +# Usage: +if E2E_TESTS == True: + + class TestJwtRoutes(TestRouterMixin, BaseJwtRoutes): + pass + + jwt_routes = TestJwtRoutes(app, strategies=[GoogleTest]) +else: + jwt_routes = JwtRoutes(app, strategies=[Google]) +""" \ No newline at end of file diff --git a/tests/fixtures/app_fixtures.py b/tests/fixtures/app_fixtures.py index f54b0a9..f267991 100644 --- a/tests/fixtures/app_fixtures.py +++ b/tests/fixtures/app_fixtures.py @@ -1,9 +1,9 @@ import pytest -from flask import Flask, jsonify -from flask_jwt_router._jwt_routes import JwtRoutes +from flask import Flask, jsonify, g +from flask_jwt_router import JwtRoutes, Google, GoogleTestUtil -app = Flask(__name__) +app = Flask(__name__, ) jwt_routes = JwtRoutes() @app.route("/test", methods=["GET"]) @@ -11,9 +11,22 @@ def test_one(): return "/test" + +@app.route("/api/v1/test_google_oauth", methods=["GET"]) +def request_google_oauth(): + oauth_tablename = g.oauth_tablename + return { + "email": oauth_tablename.email, + }, 200 + + @pytest.fixture def jwt_router_client(request): - app.config = {**app.config, **request.param} + if hasattr(request, "param"): + param = request.param + else: + param = {} + app.config = {**app.config, **param} app.config["TESTING"] = True app.config["SECRET_KEY"] = "__TEST_SECRET__" google_oauth = { @@ -24,7 +37,7 @@ def jwt_router_client(request): "email_field": "email", "expires_in": 3600, } - jwt_routes.init_app(app, google_oauth=google_oauth) + jwt_routes.init_app(app, google_oauth=google_oauth, strategies=[GoogleTestUtil]) client = app.test_client() ctx = app.app_context() ctx.push() diff --git a/tests/fixtures/main_fixture.py b/tests/fixtures/main_fixture.py index e1c37e0..398e1ca 100644 --- a/tests/fixtures/main_fixture.py +++ b/tests/fixtures/main_fixture.py @@ -1,11 +1,26 @@ import pytest from flask import Flask, jsonify, g, request -from flask_jwt_router._jwt_routes import JwtRoutes +from flask_jwt_router import GoogleTestUtil, BaseJwtRoutes, TestRoutingMixin from flask_sqlalchemy import SQLAlchemy flask_app = Flask(__name__) -jwt_routes = JwtRoutes() +google_oauth = { + "client_id": "", + "client_secret": "", + "redirect_uri": "http://localhost:3000", + "tablename": "oauth_tablename", + "email_field": "email", + "expires_in": 3600, +} + + +class TestJwtRoutes(TestRoutingMixin, BaseJwtRoutes): + pass + + +jwt_routes = TestJwtRoutes(google_oauth=google_oauth, strategies=[GoogleTestUtil]) + db = SQLAlchemy() @@ -98,15 +113,7 @@ def request_client(): from tests.fixtures.models import TeacherModel, OAuthUserModel flask_app.config["ENTITY_MODELS"] = [TeacherModel, OAuthUserModel] - google_oauth = { - "client_id": "", - "client_secret": "", - "redirect_uri": "http://localhost:3000", - "tablename": "oauth_tablename", - "email_field": "email", - "expires_in": 3600, - } - jwt_routes.init_app(flask_app, google_oauth=google_oauth) + jwt_routes.init_app(flask_app, google_oauth=google_oauth, strategies=[GoogleTestUtil]) db.init_app(flask_app) with flask_app.app_context(): diff --git a/tests/oauth2/test_google.py b/tests/oauth2/test_google.py index cd6b3a5..2020e5f 100644 --- a/tests/oauth2/test_google.py +++ b/tests/oauth2/test_google.py @@ -5,6 +5,7 @@ from flask_jwt_router.oauth2.google import Google, _FlaskRequestType from flask_jwt_router.oauth2.http_requests import HttpRequests from flask_jwt_router.oauth2._urls import GOOGLE_OAUTH_URL +from flask_jwt_router import GoogleTestUtil from tests.fixtures.oauth_fixtures import TEST_OAUTH_URL, http_requests from tests.fixtures.model_fixtures import MockAOuthModel @@ -115,7 +116,7 @@ def test_authorize(self, http_requests): "verified_email": true } """ - g = Google(http_requests(GOOGLE_OAUTH_URL)) + g = GoogleTestUtil(http_requests(GOOGLE_OAUTH_URL)) g.init(**self.mock_options) token = "" result = g.authorize(token) @@ -124,7 +125,7 @@ def test_authorize(self, http_requests): def test_create_test_headers(self, http_requests, MockAOuthModel): mock_user = MockAOuthModel(email="test@email.com") - g = Google(http_requests(GOOGLE_OAUTH_URL)) + g = GoogleTestUtil(http_requests(GOOGLE_OAUTH_URL)) g.init(**self.mock_options) result = g.create_test_headers(email="test@email.com") @@ -147,7 +148,7 @@ def test_create_test_headers(self, http_requests, MockAOuthModel): def test_tear_down(self, http_requests, MockAOuthModel): mock_user = MockAOuthModel(email="test@email.com") - g = Google(http_requests(GOOGLE_OAUTH_URL)) + g = GoogleTestUtil(http_requests(GOOGLE_OAUTH_URL)) g.init(**self.mock_options) result = g.create_test_headers(email="test@email.com", entity=mock_user, scope="application") diff --git a/tests/test_jwt_routes.py b/tests/test_jwt_routes.py index 9a91ff6..b20a96f 100644 --- a/tests/test_jwt_routes.py +++ b/tests/test_jwt_routes.py @@ -1,7 +1,7 @@ from flask import Flask import pytest -from flask_jwt_router._jwt_routes import JwtRoutes +from flask_jwt_router import JwtRoutes, GoogleTestUtil from tests.fixtures.model_fixtures import MockEntityModel, MockAOuthModel @@ -53,7 +53,7 @@ def test_init_app(self, MockEntityModel, MockAOuthModel): self.app.config["ENTITY_MODELS"] = [MockEntityModel, MockAOuthModel] jwt = JwtRoutes() - jwt.init_app(self.app, google_oauth=self.oauth_options) + jwt.init_app(self.app, google_oauth=self.oauth_options, strategies=[GoogleTestUtil]) assert jwt.config.entity_models[1] == MockAOuthModel assert jwt.config.google_oauth["client_id"] == "" assert jwt.config.google_oauth["client_secret"] == "" @@ -63,7 +63,7 @@ def test_init_app(self, MockEntityModel, MockAOuthModel): assert jwt.config.google_oauth["expires_in"] == 3600 self.app.config["ENTITY_MODELS"] = [MockEntityModel, MockAOuthModel] - jwt = JwtRoutes(self.app, google_oauth=self.oauth_options) + jwt = JwtRoutes(self.app, google_oauth=self.oauth_options, strategies=[GoogleTestUtil]) jwt.init_app() assert jwt.config.entity_models[1] == MockAOuthModel assert jwt.config.google_oauth["client_id"] == "" @@ -155,3 +155,9 @@ def test_set_expire_days(self): assert jwt.exp == 99 jwt.set_exp(expire_days=22) assert jwt.exp == 22 + + def test_get_strategy(self): + jwt = JwtRoutes() + jwt.init_app(self.app, google_oauth=self.oauth_options, strategies=[GoogleTestUtil]) + strategy = jwt.get_strategy("GoogleTestUtil") + assert isinstance(strategy, GoogleTestUtil) diff --git a/tests/test_routing.py b/tests/test_routing.py index 3a7750f..3c682f6 100644 --- a/tests/test_routing.py +++ b/tests/test_routing.py @@ -19,6 +19,7 @@ from flask_jwt_router._config import Config from flask_jwt_router._entity import Entity from flask_jwt_router.oauth2.google import Google +from flask_jwt_router import GoogleTestUtil from tests.fixtures.token_fixture import mock_token, mock_access_token from tests.fixtures.model_fixtures import TestMockEntity, MockAOuthModel from tests.fixtures.app_fixtures import jwt_router_client, test_client_static @@ -86,7 +87,8 @@ def fc_one(): entity = Entity(config) google = Google(http_requests(oauth_urls)) google.init(**config.google_oauth) - routing = Routing(app, config, entity, google) + routing = Routing() + routing.init(app, config, entity, google) with ctx: # token from args @@ -192,43 +194,43 @@ def test_handle_pre_flight_request(self, request_client): def test_routing_with_google_create_test_headers(self, request_client, MockAOuthModel, google_oauth_user): email = "test_one@oauth.com" test_user = MockAOuthModel(email="test_one@oauth.com") - - assert jwt_routes.google.test_metadata == {} + google = jwt_routes.get_strategy("GoogleTestUtil") + assert google.test_metadata == {} # Pure stateless test with no db - oauth_headers = jwt_routes.google.create_test_headers(email=email, entity=test_user) + oauth_headers = google.create_test_headers(email=email, entity=test_user) - assert jwt_routes.google.test_metadata[email] == {"email": email, "entity": test_user, "scope": "function"} + assert google.test_metadata[email] == {"email": email, "entity": test_user, "scope": "function"} assert oauth_headers == {'X-Auth-Token': f'Bearer {email}'} - + # rv = request_client.get("/api/v1/test_google_oauth", headers=oauth_headers) assert "200" in str(rv.status) assert email == rv.get_json()["email"] - assert jwt_routes.google.test_metadata == {} - - # Tests with side effects to db - oauth_headers = jwt_routes.google.create_test_headers(email=google_oauth_user.email) - - assert jwt_routes.google.test_metadata[email] == {"email": email, "entity": None, "scope": "function"} + assert google.test_metadata == {} + # + # # Tests with side effects to db + oauth_headers = google.create_test_headers(email=google_oauth_user.email) + # + assert google.test_metadata[email] == {"email": email, "entity": None, "scope": "function"} assert oauth_headers == {'X-Auth-Token': f'Bearer {email}'} - + # rv = request_client.get("/api/v1/test_google_oauth", headers=oauth_headers) assert "200" in str(rv.status) assert email == rv.get_json()["email"] - assert jwt_routes.google.test_metadata == {} + assert google.test_metadata == {} def test_routing_with_google_create_headers_scope(self, request_client, MockAOuthModel, google_oauth_user): email = "test_one@oauth.com" test_user = MockAOuthModel(email="test_one@oauth.com") test_metadata = {"email": email, "entity": test_user, "scope": "application"} - - assert jwt_routes.google.test_metadata == {} + google = jwt_routes.get_strategy("GoogleTestUtil") + assert google.test_metadata == {} # Pure stateless test with no db - oauth_headers = jwt_routes.google.create_test_headers(email=email, entity=test_user, scope="application") + oauth_headers = google.create_test_headers(email=email, entity=test_user, scope="application") - assert jwt_routes.google.test_metadata[email] == test_metadata + assert google.test_metadata[email] == test_metadata assert oauth_headers == {'X-Auth-Token': f'Bearer {email}'} rv = request_client.get("/api/v1/test_google_oauth", headers=oauth_headers) assert "200" in str(rv.status) assert email == rv.get_json()["email"] - assert jwt_routes.google.test_metadata[email] == test_metadata + assert google.test_metadata[email] == test_metadata