Skip to content

Commit

Permalink
Testing mixin for Google class (#221)
Browse files Browse the repository at this point in the history
* Testing mixin for Google class
 Remove testing logic from library's Routing & Entity classes
 Fixes  #219 #220

* Testing mixin for Google class
 Remove testing logic from library's Routing & Entity classes
 Fixes  #219 #220

* Testing mixin for Google class
 Remove testing logic from library's Routing & Entity classes
 Fixes  #219 #220
  • Loading branch information
Joe Gasewicz authored Jul 21, 2021
1 parent 8de3476 commit 6495ccb
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 171 deletions.
5 changes: 4 additions & 1 deletion flask_jwt_router/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""
Welcome To Flask-JWT-Router
"""
from ._jwt_routes import JwtRoutes
from ._jwt_routes import JwtRoutes, BaseJwtRoutes
from .oauth2.google import Google
from .oauth2.google_test_util import GoogleTestUtil
from ._routing import TestRoutingMixin


if __name__ == "__main__":
Expand Down
49 changes: 37 additions & 12 deletions flask_jwt_router/_jwt_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ def login():

import logging
from warnings import warn
from typing import List, Dict
from typing import List, Dict, Optional

from ._config import Config
from ._entity import BaseEntity, Entity, _ORMType
from ._routing import BaseRouting, Routing
from ._routing import BaseRouting, RoutingMixin
from ._authentication import BaseAuthentication, Authentication
from .oauth2.google import Google
from .oauth2._base import BaseOAuth
from .oauth2._base import BaseOAuth, TestBaseOAuth
from .oauth2.http_requests import HttpRequests
from .oauth2._urls import GOOGLE_OAUTH_URL

Expand All @@ -190,10 +190,10 @@ def login():
EXPIRE_DEFAULT = 30


class JwtRoutes:
class BaseJwtRoutes:
"""
If there app is None then self.init_app(app=None, **kwargs) need to be called
inside the Flask app factory pattern.
If there app is None then self.init_app(app=None, **kwargs) need to be called
inside the Flask app factory pattern.
:param app: Flask application instance
:param kwargs: entity_model
"""
Expand Down Expand Up @@ -228,17 +228,23 @@ class JwtRoutes:

#: Optional Google OAuth 2.0 Single Sign On. See :class:`~flask_jwt_router.oauth2.google``
#: for more information.
google: BaseOAuth
# google: BaseOAuth TODO remove

#: Optional. See :class:`~flask_jwt_router.oauth2.google`
google_oauth: Dict
google_oauth: Dict # TODO needs to be a list

#: Optional. A Lust of strategies to be implement in the routing
strategies: List[BaseOAuth]

#: List of instantiated strategies
strategy_dict: Dict[str, BaseOAuth] = {}

def __init__(self, app=None, **kwargs):
self.entity_models = kwargs.get("entity_models")
self.google_oauth = kwargs.get("google_oauth")
self.strategies = kwargs.get("strategies")
self.config = Config()
self.auth = Authentication()
self.google = Google(HttpRequests(GOOGLE_OAUTH_URL))
self.app = app
if app:
self.init_app(app, entity_models=self.entity_models)
Expand All @@ -252,12 +258,17 @@ def init_app(self, app=None, **kwargs):
self.app = app if app else self.app
entity_models = self.entity_models or kwargs.get("entity_models")
self.google_oauth = self.google_oauth or kwargs.get("google_oauth")
self.strategies = self.strategies or kwargs.get("strategies") or []
app_config = self.get_app_config(self.app)
if self.google_oauth:
self.google.init(**self.google_oauth)
if len(self.strategies):
for S in self.strategies:
strategy = S(HttpRequests(GOOGLE_OAUTH_URL))
strategy.init(**self.google_oauth)
self.strategy_dict[strategy.__class__.__name__] = strategy

self.config.init_config(app_config, entity_models=entity_models, google_oauth=self.google_oauth)
self.entity = Entity(self.config)
self.routing = Routing(self.app, self.config, self.entity, self.google)
self.routing.init(self.app, self.config, self.entity, self.strategy_dict)
self.app.before_request(self.routing.before_middleware)
if self.config.expire_days:
self.exp = self.config.expire_days
Expand Down Expand Up @@ -328,3 +339,17 @@ def encode_token(self, entity_id) -> str:
self.config.entity_key = self.entity.get_attr_name()
table_name = self.entity.get_entity_from_ext().__tablename__
return self.auth.encode_token(self.config, entity_id, self.exp, table_name)

def get_strategy(self, name: str) -> Optional[BaseOAuth]:
"""
:param name: The name of the strategy
:return:
"""
try:
return self.strategy_dict[name]
except KeyError:
return None


class JwtRoutes(RoutingMixin, BaseJwtRoutes):
pass
172 changes: 133 additions & 39 deletions flask_jwt_router/_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,57 @@
from abc import ABC, abstractmethod
# pylint:disable=invalid-name
import logging
from flask import request, abort, g
from typing import List, Optional, Dict
from flask import request, abort, g, Flask
from werkzeug.routing import RequestRedirect
from werkzeug.exceptions import MethodNotAllowed, NotFound
from jwt.exceptions import InvalidTokenError
import jwt


from ._entity import BaseEntity
from .oauth2.google import Google
from .oauth2.google import BaseOAuth
from ._config import Config
from flask_jwt_router.oauth2._base import BaseOAuth, TestBaseOAuth

logger = logging.getLogger()


class BaseRouting(ABC):
# pylint:disable=missing-class-docstring

@abstractmethod
def before_middleware(self) -> None:
def handle_token(self) -> None:
# pylint:disable=missing-function-docstring
pass

@abstractmethod
def init(self, app, config: Config, entity: BaseEntity, strategy_dict: Dict[str, BaseOAuth] = None) -> None:
pass


class Routing(BaseRouting):
"""
:param app: Flask application instance
:param config: :class:`~flask_jwt_router._config`
:param entity: :class:`~flask_jwt_router._entity`
"""
def __init__(self, app, config: Config, entity: BaseEntity, google: Google = None):
app: Flask

config: Config

logger: logging

entity: BaseEntity

strategy_dict: Dict[str, BaseOAuth]

def init(self, app, config: Config, entity: BaseEntity, strategy_dict: Dict[str, BaseOAuth] = None) -> None:
self.app = app
self.config = config
self.logger = logger
self.entity = entity
self.google = google
self.strategy_dict = strategy_dict

def _prefix_api_name(self, w_routes=None):
"""
Expand Down Expand Up @@ -158,32 +175,123 @@ def before_middleware(self) -> None:
not_whitelist = self._allow_public_routes(white_routes)
if not_whitelist:
self.entity.clean_up()
self._handle_token()
self.handle_token()

def _handle_token(self):
def handle_token(self):
"""
Checks the headers contain a Bearer string OR params.
Checks to see that the route is white listed.
:return None:
"""
entity = None
strategy: Optional[BaseOAuth] = None
try:
# TODO update docs
resource_headers = request.headers.get("X-Auth-Resource")
oauth_headers = request.headers.get("X-Auth-Token")
if request.args.get("auth"):
token = request.args.get("auth")
elif oauth_headers is not None and self.google:
# Strategies --------------------------------------------------------- #

elif oauth_headers is not None and len(self.strategy_dict.keys()):
for s in self.strategy_dict.values():
if s.header_name == oauth_headers:
strategy = s
if not strategy:
abort(401)
bearer = oauth_headers
token = bearer.split("Bearer ")[1]
if not token:
abort(401)
try:
if self.google.test_metadata:
email, entity = self.google._update_test_metadata(token)
# Currently token refreshing is not supported, so pass the current token through
auth_results = strategy.authorize(token)
email = auth_results["email"]
self.entity.oauth_entity_key = self.config.oauth_entity
if resource_headers:
# If multiple tables are used to look up against incoming oauth users
# then assign the value from the X-Auth-Resource headers as the entity table name.
entity = self.entity.get_entity_from_token_or_tablename(
tablename=resource_headers,
email_value=email,
)
setattr(g, resource_headers, entity)
else:
# Currently token refreshing is not supported, so pass the current token through
auth_results = self.google.authorize(token)
email = auth_results["email"]
# Attach the the entity using the table name as the attribute name to Flask's
# global context object.
entity = self.entity.get_entity_from_token_or_tablename(
tablename=strategy.tablename,
email_value=email,
)
setattr(g, self.entity.get_entity_from_ext().__tablename__, entity)
setattr(g, "access_token", token)
return None
except InvalidTokenError:
return abort(401)
except AttributeError:
return abort(401)
except TypeError:
# This is raised from auth_results["email"] not present
abort(401)
else:
# Basic Auth --------------------------------------------------------- #
# Sometimes a developer may define the auth field name as Bearer or Basic
auth_header = request.headers.get("Authorization")
if not auth_header:
abort(401)
if "Bearer " in auth_header:
token = auth_header.split("Bearer ")[1]
elif "Basic " in auth_header:
token = auth_header.split("Basic ")[1]
except AttributeError:
return abort(401)
try:
decoded_token = jwt.decode(
token,
self.config.secret_key,
algorithms="HS256"
)
self.entity_key = self.config.entity_key
entity = self.entity.get_entity_from_token_or_tablename(decoded_token)
setattr(g, self.entity.get_entity_from_ext().__tablename__, entity)
return None
except ValueError:
return abort(401)
except InvalidTokenError:
return abort(401)


class _TestMixin(Routing):

strategies: List[TestBaseOAuth]

def __init__(self):
super(_TestMixin, self).__init__()

def handle_token(self):

strategy: Optional[TestBaseOAuth] = None

try:
resource_headers = request.headers.get("X-Auth-Resource")
oauth_headers = request.headers.get("X-Auth-Token")
if request.args.get("auth"):
super(_TestMixin, self).handle_token()
# Strategies --------------------------------------------------------- #
elif oauth_headers is not None and len(self.strategy_dict.keys()):
for s in self.strategy_dict.values():
if s.header_name == "X-Auth-Token":
strategy = s
if not strategy:
abort(401)
bearer = oauth_headers
token = bearer.split("Bearer ")[1]
if not token:
abort(401)
try:
if not strategy.test_metadata:
raise Exception("You didn't create your test headers with create_test_headers()")
email, entity = strategy.update_test_metadata(token)

self.entity.oauth_entity_key = self.config.oauth_entity
if not entity:
if resource_headers:
Expand All @@ -199,7 +307,7 @@ def _handle_token(self):
# Attach the the entity using the table name as the attribute name to Flask's
# global context object.
entity = self.entity.get_entity_from_token_or_tablename(
tablename=self.google.tablename,
tablename=strategy.tablename,
email_value=email,
)
setattr(g, self.entity.get_entity_from_ext().__tablename__, entity)
Expand All @@ -208,7 +316,7 @@ def _handle_token(self):
setattr(g, entity.__tablename__, entity)
setattr(g, "access_token", token)
# Clean up google test util
self.google.tear_down()
strategy.tear_down()
return None
except InvalidTokenError:
return abort(401)
Expand All @@ -218,28 +326,14 @@ def _handle_token(self):
# This is raised from auth_results["email"] not present
abort(401)
else:
# Sometimes a developer may define the auth field name as Bearer or Basic
auth_header = request.headers.get("Authorization")
if not auth_header:
abort(401)
if "Bearer " in auth_header:
token = auth_header.split("Bearer ")[1]
elif "Basic " in auth_header:
token = auth_header.split("Basic ")[1]
super(_TestMixin, self).handle_token()
except AttributeError:
return abort(401)
try:
decoded_token = jwt.decode(
token,
self.config.secret_key,
algorithms="HS256"
)
except InvalidTokenError:
return abort(401)
try:
self.entity_key = self.config.entity_key
entity = self.entity.get_entity_from_token_or_tablename(decoded_token)
setattr(g, self.entity.get_entity_from_ext().__tablename__, entity)
return None
except ValueError:
return abort(401)


class RoutingMixin:
routing = Routing()


class TestRoutingMixin:
routing = _TestMixin()
Loading

0 comments on commit 6495ccb

Please sign in to comment.