From d70ceedba50ec52f12f52e14b7ee427cbbcf0b45 Mon Sep 17 00:00:00 2001 From: roland <roland@catalogix.se> Date: Sun, 20 Oct 2019 15:25:13 +0200 Subject: [PATCH] The provider side of handling logout. --- CHANGELOG.md | 2 + src/oic/oic/__init__.py | 3 - src/oic/oic/provider.py | 678 ++++++++++++++----- tests/test_oauth2_consumer.py | 2 +- tests/test_oauth2_provider.py | 2 +- tests/test_oic_consumer_logout.py | 2 +- tests/test_oic_provider.py | 158 +---- tests/test_oic_provider_logout.py | 1042 +++++++++++++++++++++++++++++ tests/test_sdb.py | 2 +- tests/test_token.py | 2 +- tests/test_x_provider.py | 2 +- 11 files changed, 1547 insertions(+), 348 deletions(-) create mode 100644 tests/test_oic_provider_logout.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 49db52662..9da4d74c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on the [KeepAChangeLog] project. ### Changed - [#688] Second stage of adding logout support. +- [#700] Third stage of adding logout support, provider side ### Fixed - [#602] Fixed uncaught error on unpacking of message @@ -28,6 +29,7 @@ The format is based on the [KeepAChangeLog] project. [#683]: https://github.com/OpenIDC/pyoidc/issues/683 [#688]: https://github.com/OpenIDC/pyoidc/pull/688 [#698]: https://github.com/OpenIDC/pyoidc/pull/698 +[#700]: https://github.com/OpenIDC/pyoidc/pull/700 ## 1.0.1 [2019-06-30] diff --git a/src/oic/oic/__init__.py b/src/oic/oic/__init__.py index a066e3fa1..6fe3e532b 100644 --- a/src/oic/oic/__init__.py +++ b/src/oic/oic/__init__.py @@ -382,9 +382,6 @@ def __init__( self.kid = {"sig": {}, "enc": {}} self.requests_dir = requests_dir - # LOGOUT related - self.post_logout_redirect_uris = [] - def _get_id_token(self, **kwargs): try: return kwargs["id_token"] diff --git a/src/oic/oic/provider.py b/src/oic/oic/provider.py index 7155414a3..1616ac49e 100644 --- a/src/oic/oic/provider.py +++ b/src/oic/oic/provider.py @@ -4,11 +4,15 @@ import logging import socket import time +import uuid from functools import cmp_to_key +from http.cookies import SimpleCookie from typing import Any from typing import Dict # noqa from typing import List # noqa from typing import Optional +from typing import Tuple +from typing import Union from urllib.parse import parse_qs from urllib.parse import splitquery # type: ignore from urllib.parse import unquote @@ -34,11 +38,11 @@ from oic.exception import MessageException from oic.exception import NotForMe from oic.exception import ParameterError -from oic.exception import SubMismatch from oic.exception import UnSupported from oic.oauth2 import compact from oic.oauth2 import error_response from oic.oauth2 import redirect_authz_error +from oic.oauth2.base import PBase from oic.oauth2.exception import CapabilitiesMisMatch from oic.oauth2.exception import VerificationError from oic.oauth2.message import Message @@ -52,6 +56,7 @@ from oic.oic import Server from oic.oic import claims_match from oic.oic import scope2claims +from oic.oic.message import BACK_CHANNEL_LOGOUT_EVENT from oic.oic.message import SCOPE2CLAIMS from oic.oic.message import AccessTokenResponse from oic.oic.message import AuthorizationResponse @@ -69,6 +74,7 @@ from oic.utils.http_util import Response from oic.utils.http_util import SeeOther from oic.utils.http_util import Unauthorized +from oic.utils.jwt import JWT from oic.utils.keyio import KEYS # noqa from oic.utils.keyio import KeyBundle from oic.utils.keyio import KeyJar # noqa @@ -238,6 +244,9 @@ def __init__( template_renderer=render_template, extra_scope_dict=None, message_factory=OIDCMessageFactory, + post_logout_page="", + self_signing_alg="RS256", + logout_path="", ): # This has to be defined before calling super() @@ -294,6 +303,13 @@ def __init__( # additional attributes self.schema = schema + # Logout connected attributes + self.httpc = PBase(verify_ssl=verify_ssl, keyjar=self.keyjar) + self.post_logout_page = post_logout_page + self.signing_alg = self_signing_alg + self.logout_path = logout_path + self.logout_verify_url = "" + @property def default_capabilities(self): """Define default capabilities for implementation.""" @@ -488,195 +504,12 @@ def required_user(self, areq): return req_user - def verify_post_logout_redirect_uri(self, esreq, client_id): - """ - Verify the post_logout_redirect_uris. - - :param esreq: End session request - :param client_id: The Client ID - :return: - """ - try: - redirect_uri = esreq["post_logout_redirect_uri"] - except KeyError: - logger.debug("Missing post_logout_redirect_uri parameter") - return - - try: - accepted_urls = self.cdb[client_id]["post_logout_redirect_uris"] - if self._verify_url(redirect_uri, accepted_urls): - return redirect_uri - except Exception as exc: - msg = "An error occurred while verifying redirect URI: %s" - logger.debug(msg, str(exc)) - - return None - def is_session_revoked(self, request="", cookie=None): areq = parse_qs(request) authn, _ = self.pick_auth(areq) identity, _ts = authn.authenticated_as(cookie) return self.sdb.is_revoke_uid(identity["uid"]) - def let_user_verify_logout(self, uid, esr, cookie, redirect_uri): - if cookie: - headers = [cookie] - else: - headers = [] - - self.sdb.set_verified_logout(uid) - - if redirect_uri is not None: - redirect = redirect_uri - else: - redirect = "/" - try: - tmp_id_token_hint = esr["id_token_hint"] - except KeyError: - tmp_id_token_hint = "" - - context = { - "id_token_hint": tmp_id_token_hint, - "post_logout_redirect_uri": esr["post_logout_redirect_uri"], - "key": self.sdb.get_verify_logout(uid), - "redirect": redirect, - "action": "/" + EndSessionEndpoint("").etype, - } - return Response( - self.template_renderer("verify_logout", context), headers=headers - ) - - def _get_sids_from_cookie(self, cookie): - """Get cookie_dealer, client_id and sids from cookie.""" - if cookie is None: - return None, None, None - - cookie_dealer = CookieDealer(srv=self) - client_id = sids = None - - _cval = cookie_dealer.get_cookie_value(cookie, self.sso_cookie_name) - if _cval: - (value, _ts, typ) = _cval - if typ == "sso": - uid, client_id = value.split(DELIM) - try: - sids = session_get(self.sdb, "uid", uid) - except (KeyError, IndexError): - raise SubMismatch("Mismatch uid") - return cookie_dealer, client_id, sids - - def end_session_endpoint(self, request="", cookie=None, **kwargs): - esr = self.server.message_factory.get_request_type( - "endsession_endpoint" - )().from_urlencoded(request) - - logger.debug("End session request: %s", format(sanitize(esr.to_dict()))) - - # 2 ways of find out client ID and user. Either through a cookie - # or using the id_token_hint - try: - cookie_dealer, client_id, sids = self._get_sids_from_cookie(cookie) - except SubMismatch as error: - return error_response("invalid_request", "%s" % error) - - if "id_token_hint" in esr: - id_token_hint = IdToken().from_jwt( - esr["id_token_hint"], keyjar=self.keyjar, verify=True - ) - far_away = 86400 * 30 # 30 days - - if client_id: - args = {"client_id": client_id} - else: - args = {} - - try: - id_token_hint.verify( - iss=self.baseurl, skew=far_away, nonce_storage_time=far_away, **args - ) - except (VerificationError, NotForMe) as err: - logger.warning("Verification error on id_token_hint: {}".format(err)) - return error_response("invalid_request", "Bad Id Token hint") - - sub = id_token_hint["sub"] - - if sids: - match = False - # verify that 'sub' are bound to 'user' - for sid in sids: - if self.sdb[sid]["sub"] == sub: - match = True - break - if not match: - return error_response("invalid_request", "Wrong user") - else: - try: - sids = self.sdb.get_by_sub(sub) - except IndexError: - pass - - if not client_id: - if len(id_token_hint["aud"]) == 1: - client_id = id_token_hint["aud"][0] - else: - client_id = id_token_hint["azp"] - - if not client_id: - return error_response("invalid_request", "Could not find client ID") - if client_id not in self.cdb: - return error_response("invalid_request", "Unknown client") - - match = False - for sid in sids: - if self.sdb[sid]["client_id"] == client_id: - match = True - break - if not match: - return error_response("invalid_request", "Unmatched client") - - redirect_uri = None - if "post_logout_redirect_uri" in esr: - redirect_uri = self.verify_post_logout_redirect_uri(esr, client_id) - if not redirect_uri: - msg = "Post logout redirect URI verification failed!" - return error_response("invalid_request", msg) - else: # If only one registered use that one - if len(self.cdb[client_id]["post_logout_redirect_uris"]) == 1: - _base, _query = self.cdb[client_id]["post_logout_redirect_uris"][0] - if _query: - query_string = urlencode( - [(key, v) for key in _query for v in _query[key]] - ) - redirect_uri = "%s?%s" % (_base, query_string) - else: - redirect_uri = _base - - for sid in sids: - del self.sdb[sid] - - # Delete cookies - authn, _ = self.pick_auth(esr) - headers = [authn.delete_cookie(), self.delete_session_cookie()] - if cookie_dealer: - headers.append(cookie_dealer.delete_cookie(self.sso_cookie_name)) - - if redirect_uri is not None: - try: - _state = esr["state"] - except KeyError: - redirect_uri = str(redirect_uri) - else: - if "?" in redirect_uri: - redirect_uri += "&" - else: - redirect_uri += "?" - - redirect_uri += urlencode({"state": _state}) - - return SeeOther(redirect_uri, headers=headers) - - return Response("Successful logout", headers=headers) - def verify_endpoint(self, request="", cookie=None, **kwargs): """ Verify endpoint. @@ -852,7 +685,7 @@ def authz_part2(self, user, areq, sid, **kwargs): aresp["client_id"] = areq["client_id"] if self.events: - self.events.store("Protocol response", aresp) + self.events.store("protocol response", aresp) response = sanitize(aresp.to_dict()) logger.info("authorization response: %s", response) @@ -2018,7 +1851,7 @@ def do_key_rollover(self, jwks, kid_template): # print to the jwks file dump_jwks(self.keyjar[""], self.jwks_name) - def remove_inactive_keys(self, more_then=3600): + def remove_inactive_keys(self, more_then: int = 3600): """ Remove all keys that has been inactive 'more_then' seconds. @@ -2052,3 +1885,476 @@ def get_by_sub_and_(self, sub: str, key: str, val: Any) -> Optional[str]: except KeyError: continue return None + + # Below are LOGOUT related methods + + def verify_post_logout_redirect_uri( + self, esreq: Message, client_id: str + ) -> Optional[str]: + """ + Verify a post logout URI. + + :param esreq: End session request + :param client_id: The Client ID + :return: The post logout URI if it was OK otherwise None + """ + try: + redirect_uri = esreq["post_logout_redirect_uri"] + except KeyError: + logger.debug("Missing post_logout_redirect_uri parameter") + return None + + try: + accepted_urls = self.cdb[client_id]["post_logout_redirect_uris"] + if self._verify_url(redirect_uri, accepted_urls): + return redirect_uri + except Exception as exc: + msg = "An error occurred while verifying redirect URI: %s" + logger.debug(msg, str(exc)) + + return None + + def let_user_verify_logout( + self, + uid: str, + esr: Message, + cookie: Optional[List[Tuple[str, str]]], + redirect_uri: Optional[str], + ) -> Response: + """ + Show a page to the user, that asks whether logout should be performed. + + :param uid: User ID + :param esr: EndSessionRequest instance + :param cookie: A cookie + :param redirect_uri: URL + :return: Response instance + """ + if cookie: + headers = cookie + else: + headers = [] + + self.sdb.set_verify_logout(uid) + + if redirect_uri is not None: + redirect = redirect_uri + else: + redirect = "/" + try: + tmp_id_token_hint = esr["id_token_hint"] + except KeyError: + tmp_id_token_hint = "" + + context = { + "id_token_hint": tmp_id_token_hint, + "post_logout_redirect_uri": esr["post_logout_redirect_uri"], + "key": self.sdb.get_verify_logout(uid), + "redirect": redirect, + "action": "/" + EndSessionEndpoint("").etype, + } + return Response( + self.template_renderer("verify_logout", context), headers=headers + ) + + def _get_uid_from_cookie( + self, cookie: Optional[Union[str, SimpleCookie]] + ) -> Tuple[Optional[CookieDealer], Optional[str], Optional[str]]: + """ + Get cookie_dealer, client_id and uid from cookie. + + :param cookie: Received cookie + :return: Tuple containing CookieDealer instance, client ID and User ID + """ + if cookie is None: + return None, None, None + + cookie_dealer = CookieDealer(srv=self) + client_id = uid = None + + _cval = cookie_dealer.get_cookie_value(cookie, self.sso_cookie_name) + if _cval: + (value, _ts, typ) = _cval + if typ == "sso": + uid, client_id = value.split(DELIM) + + return cookie_dealer, client_id, uid + + def do_back_channel_logout( + self, cinfo: dict, sub: str, sid: str + ) -> Optional[Tuple[str, str]]: + """ + Prepare information to be used to do a back-channel logout. + + :param cinfo: Client information + :param sub: Subject identifier + :param sid: The Issuer ID + :return: Tuple with logout URI and signed logout token + """ + try: + back_channel_logout_uri = cinfo["backchannel_logout_uri"] + except KeyError: + return None + + # always include sub and sid so I don't check for + # backchannel_logout_session_required + + payload = { + "sub": sub, + "sid": sid, + "events": {BACK_CHANNEL_LOGOUT_EVENT: {}}, + "jti": uuid.uuid4().hex, + } + + try: + alg = cinfo["id_token_signed_response_alg"] + except KeyError: + alg = self.capabilities["id_token_signing_alg_values_supported"][0] + + _jws = JWT(self.keyjar, iss=self.name, lifetime=86400, sign_alg=alg) + sjwt = _jws.pack(aud=cinfo["client_id"], **payload) + + return back_channel_logout_uri, sjwt + + def clean_sessions(self, usids: List[str]): + """ + Remove Session IDs from the session DB. + + :param usids: List of session IDs + """ + _sdb = self.sdb + # Clean out all sessions + for sid in usids: + del _sdb[sid] + + def logout_info_for_all_clients( + self, uid: Optional[str] = "", sid: Optional[str] = "" + ) -> Dict: + """ + Collect information necessary to logout one user from all clients he/she has been using. + + One of uid and sid MUST be provided. If uid is provided sid is ignored. + NO changes are made to the session DB. + No logout is actually performed + :param uid: User ID + :param sid: Session ID + :return: Dictionary with logout information + """ + if not uid: + if not sid: + raise ParameterError("One of uid and sid MUST be provided") + else: + uid = self.sdb.get_uid_by_sid(sid) + + # Find all the session IDs this user has gotten + usids = session_get(self.sdb, "uid", uid) + # Find all RPs this user has logged it from + _client_sid = {} + for usid in usids: + _client_sid[self.sdb[usid]["client_id"]] = usid + + # Front-/Backchannel logout ? + _cdb = self.cdb + _iss = self.name + bc_logouts = {} + fc_iframes = {} + for _cid, _csid in _client_sid.items(): + if "backchannel_logout_uri" in _cdb[_cid]: + _sub = self.sdb[_csid]["sub"] + bc_logouts[_cid] = self.do_back_channel_logout(_cdb[_cid], _sub, _csid) + if "frontchannel_logout_uri" in _cdb[_cid]: + # Construct an IFrame + fc_iframes[_cid] = self.do_front_channel_logout_iframe( + _cdb[_cid], _iss, _csid + ) + + return {"back_channel": bc_logouts, "front_channel": fc_iframes} + + def logout_info_for_one_client(self, session_id: str, client_id: str) -> Dict: + """ + Collect information necessary to log out from client. + + Note that if a client has both back channel and front channel logout registered both + will be handled. + :param session_id: Session ID + :param client_id: Client ID + :return: Dictionary with back_channel and front_channel logout info. + """ + logout_spec = { + "back_channel": {}, # back-channel logout information + "front_channel": {}, # front-channel logout information + } # type: Dict[str, Dict[str, Union[None, str, Tuple[str,str]]]] + + if "backchannel_logout_uri" in self.cdb[client_id]: + _subject_id = self.sdb[session_id]["sub"] + logout_spec["back_channel"] = { + client_id: self.do_back_channel_logout( + self.cdb[client_id], _subject_id, session_id + ) + } + elif "frontchannel_logout_uri" in self.cdb[client_id]: + # Construct an IFrame + _iframe = self.do_front_channel_logout_iframe( + self.cdb[client_id], self.name, session_id + ) + logout_spec["front_channel"] = {client_id: _iframe} + + return logout_spec + + def end_session_endpoint( + self, + request: str = "", + cookie: Optional[Union[str, SimpleCookie]] = None, + **kwargs + ) -> Response: + """ + Handle a RP initiated Logout request. + + :param request: The logout request + :param cookie: + :param kwargs: + :return: Returns a dictionary with one key 'sjwt' and the value + being a signed JWT token with session information. + """ + _req = self.server.message_factory.get_request_type("endsession_endpoint") + esr = _req().from_urlencoded(request) + + logger.debug("End session request: %s", sanitize(esr.to_dict())) + + if self.events: + self.events.store("protocol request", esr) + + # 2 ways of find out client ID and user. Either through a cookie + # or using the id_token_hint. If I get information from both make sure they match + _, client_id, uid = self._get_uid_from_cookie(cookie) + + sid = "" + + if "id_token_hint" in esr: + id_token_hint = IdToken().from_jwt( + esr["id_token_hint"], keyjar=self.keyjar, verify=True + ) + far_away = 86400 * 30 # 30 days + + if client_id: + args = {"client_id": client_id} + else: + args = {} + + try: + id_token_hint.verify( + iss=self.baseurl, skew=far_away, nonce_storage_time=far_away, **args + ) + except (VerificationError, NotForMe) as err: + logger.warning("Verification error on id_token_hint: %s", err) + return error_response("invalid_request", "Bad Id Token hint") + + sub = id_token_hint["sub"] + + if uid is not None: + # verify that 'sub' are bound to 'uid' + if self.sdb.get_uid_by_sub(sub) != uid: + return error_response("invalid_request", "Wrong user") + else: + uid = self.sdb.get_uid_by_sub(sub) + + if client_id is None: + if len(id_token_hint["aud"]) == 1: + client_id = id_token_hint["aud"][0] + else: + client_id = id_token_hint["azp"] + + sids = session_get(self.sdb, "sub", sub) + + matching_client_id = False + for sid in sids: + if self.sdb[sid]["client_id"] == client_id: + matching_client_id = True + break + + if not matching_client_id: + return error_response( + "invalid_request", "Could not find a matching client ID" + ) + + if not client_id: + return error_response("invalid_request", "Could not find client ID") + if client_id not in self.cdb: + return error_response("invalid_request", "Unknown client") + + redirect_uri = None + if "post_logout_redirect_uri" in esr: + redirect_uri = self.verify_post_logout_redirect_uri(esr, client_id) + if not redirect_uri: + msg = "Post logout redirect URI verification failed!" + return error_response("invalid_request", msg) + else: # If only one registered use that one + try: + _ruri = self.cdb[client_id]["post_logout_redirect_uris"] + except KeyError: + msg = "Missing post_logout_redirect_uri" + return error_response("invalid_request", msg) + + if len(_ruri) == 1: + _base, _query = _ruri[0] + if _query: + query_string = urlencode( + [(key, v) for key in _query for v in _query[key]] + ) + redirect_uri = "%s?%s" % (_base, query_string) + else: + redirect_uri = _base + else: + return error_response( + "invalid_request", + descr="Missing post_logout_redirect_uri and more then one post_logout_redirect_uris", + ) + + # redirect user to OP logout verification page + payload = { + "uid": uid, + "client_id": client_id, + "redirect_uri": redirect_uri, + "sid": sid, + } + if "state" in esr: + payload["state"] = esr["state"] + + if self.events: + self.events.store("object args", "{}".format(payload)) + + # From me to me + _jws = JWT( + self.keyjar, iss=self.name, lifetime=86400, sign_alg=self.signing_alg + ) + sjwt = _jws.pack(aud=[self.name], **payload) + + location = "{}?{}".format(self.logout_verify_url, urlencode({"sjwt": sjwt})) + return SeeOther(location) + + def unpack_signed_jwt(self, sjwt: str): + """Will unpack a signed JWT.""" + verifier = JWT(self.keyjar) + try: + return verifier.unpack(sjwt) + except Exception as err: + raise ValueError(err) + + def do_verified_logout( + self, sid: str, client_id: str, alla: bool = False, **kwargs + ) -> Dict: + """ + Perform back channel logout and prepares the information needed for front channel logout. + + :param sid: Session ID + :param client_id: Client ID + :param alla: Whether logout should be attempted from all clients or just one specific client. + :param kwargs: + :return: + """ + if alla: + uid = self.sdb.get_uid_by_sid(sid) + logout_spec = self.logout_info_for_all_clients(uid) + # Find all the session IDs this user has gotten + sids = session_get(self.sdb, "uid", uid) + else: + logout_spec = self.logout_info_for_one_client( + session_id=sid, client_id=client_id + ) + sids = [sid] + + if self.events: + self.events.store("object args", "{}".format(logout_spec)) + + if not logout_spec["back_channel"] and not logout_spec["front_channel"]: + return {} + + # take care of Back channel logout first + if logout_spec["back_channel"]: + failed = [] + for _cid, spec in logout_spec["back_channel"].items(): + _url, sjwt = spec + logger.info("logging out from {} at {}".format(_cid, _url)) + + try: + res = self.httpc.http_request( + _url, "POST", data="logout_token={}".format(sjwt) + ) + except Exception as err: + # Can't be more specific because I don't know which http client are used + logger.error("failed to logout from {}".format(_cid)) + if self.events: + self.events.store("exception", "{}: {}".format(_cid, str(err))) + failed.append(_cid) + continue + + if res.status_code < 300: + logger.info("Logged out from {}".format(_cid)) + else: + _errstr = "failed to logout from {}".format(_cid) + if self.events: + self.events.store("fault", _errstr) + logger.error(_errstr) + failed.append(_cid) + # If no back-channel logout worked and there is no front-channel logout + # regard this as a failure. + if len(failed) == len(logout_spec["back_channel"]): + if not logout_spec["front_channel"]: + return {} + + # kill cookies + kaka1 = self.write_session_cookie("removed") + kaka2 = self.cookie_func( + "", typ="sso", cookie_name=self.sso_cookie_name, kill=True + ) + res = {"cookie": [kaka1, kaka2]} + + if logout_spec["front_channel"]: + for _cid in logout_spec["front_channel"].keys(): + logger.info("Adding logout iframe for {}".format(_cid)) + res["iframe"] = list(logout_spec["front_channel"].values()) + + # Clean out all sessions + self.clean_sessions(sids) + + return res + + @staticmethod + def do_front_channel_logout_iframe( + client_info: Dict, issuer: str, session_id: str + ) -> Optional[str]: + """ + Construct a front channel logout IFrame. + + :param client_info: Client info + :param issuer: Issuer ID + :param session_id: Session ID + :return: HTML IFrame string + """ + try: + frontchannel_logout_uri = client_info["frontchannel_logout_uri"] + except KeyError: + return None + + try: + flsr = client_info["frontchannel_logout_session_required"] + except KeyError: + flsr = False + + if flsr: + _query = {"iss": issuer, "sid": session_id} + if "?" in frontchannel_logout_uri: + p = urlparse(frontchannel_logout_uri) + _args = {k: v[0] for k, v in parse_qs(p.query).items()} + _args.update(_query) + _query = _args + _np = p._replace(query="") + frontchannel_logout_uri = _np.geturl() + + _iframe = '<iframe src="{}?{}">'.format( + frontchannel_logout_uri, urlencode(_query) + ) + else: + _iframe = '<iframe src="{}">'.format(frontchannel_logout_uri) + + return _iframe diff --git a/tests/test_oauth2_consumer.py b/tests/test_oauth2_consumer.py index f63b2cbe8..e609f9855 100644 --- a/tests/test_oauth2_consumer.py +++ b/tests/test_oauth2_consumer.py @@ -20,7 +20,7 @@ from oic.oauth2.message import TokenErrorResponse from oic.utils import time_util from oic.utils.http_util import make_cookie -from oic.utils.sdb import DictSessionBackend +from oic.utils.session_backend import DictSessionBackend __author__ = "rohe0002" diff --git a/tests/test_oauth2_provider.py b/tests/test_oauth2_provider.py index f3ee7d7cc..beee2f09a 100644 --- a/tests/test_oauth2_provider.py +++ b/tests/test_oauth2_provider.py @@ -26,7 +26,7 @@ from oic.utils.authz import Implicit from oic.utils.http_util import Response from oic.utils.sdb import AuthnEvent -from oic.utils.sdb import DictSessionBackend +from oic.utils.session_backend import DictSessionBackend CLIENT_CONFIG = {"client_id": "client1", "config": {"issuer": "https://example.com/as"}} diff --git a/tests/test_oic_consumer_logout.py b/tests/test_oic_consumer_logout.py index fadea653c..fdac47433 100644 --- a/tests/test_oic_consumer_logout.py +++ b/tests/test_oic_consumer_logout.py @@ -22,8 +22,8 @@ from oic.utils.keyio import KeyBundle from oic.utils.keyio import KeyJar from oic.utils.keyio import keybundle_from_local_file -from oic.utils.sdb import DictSessionBackend from oic.utils.sdb import session_update +from oic.utils.session_backend import DictSessionBackend from oic.utils.userinfo import UserInfo # -- CLIENT INFO ---- diff --git a/tests/test_oic_provider.py b/tests/test_oic_provider.py index 1a4644a6d..39282fbf8 100644 --- a/tests/test_oic_provider.py +++ b/tests/test_oic_provider.py @@ -6,11 +6,9 @@ from time import time from typing import Any # noqa from typing import Dict # noqa -from typing import cast from unittest.mock import Mock from unittest.mock import patch from urllib.parse import parse_qs -from urllib.parse import urlencode from urllib.parse import urlparse import pytest @@ -54,13 +52,12 @@ from oic.utils.authz import AuthzHandling from oic.utils.http_util import CookieDealer from oic.utils.http_util import Response -from oic.utils.http_util import SeeOther from oic.utils.keyio import KeyBundle from oic.utils.keyio import KeyJar from oic.utils.keyio import ec_init from oic.utils.keyio import keybundle_from_local_file from oic.utils.sdb import AuthnEvent -from oic.utils.sdb import DictSessionBackend +from oic.utils.session_backend import DictSessionBackend from oic.utils.time_util import epoch_in_a_while from oic.utils.userinfo import UserInfo @@ -1348,7 +1345,10 @@ def test_verify_sector_identifier_no_scheme(self): assert len(logcap.records) == 2 # First log record is from server... assert isinstance(logcap.records[1].msg, MissingSchema) - error = "Invalid URL 'example.com': No schema supplied. Perhaps you meant http://example.com?" + error = ( + "Invalid URL 'example.com': No schema supplied. Perhaps you meant " + "http://example.com?" + ) assert str(logcap.records[1].msg) == error def test_verify_sector_identifier_nonreachable(self): @@ -1670,154 +1670,6 @@ def _code_auth2(self): ) return self.provider.authorization_endpoint(request=location.split("?")[1]) - def test_end_session_endpoint_with_cookie(self): - self._code_auth() - cookie = self._create_cookie("username", "number5") - - resp = self.provider.end_session_endpoint( - urlencode({"state": "abcde"}), cookie=cookie - ) - - assert isinstance(resp, SeeOther) - assert "state=abcde" in resp.message - assert self.provider.sdb.get_by_uid("username") == [] - self._assert_cookies_expired(resp.headers) - - def test_end_session_endpoint_with_wrong_cookie(self): - self._code_auth() - cookie = self._create_cookie("username", "number5", c_type="session") - - resp = self.provider.end_session_endpoint( - urlencode({"state": "abcde"}), cookie=cookie - ) - - assert isinstance(resp, Response) - _err = ErrorResponse().from_json(resp.message) - assert _err["error"] == "invalid_request" - - def test_end_session_endpoint_with_cookie_wrong_user(self): - self._code_auth() - cookie = self._create_cookie("diggins", "number5") - - resp = self.provider.end_session_endpoint( - urlencode({"state": "abcde"}), cookie=cookie - ) - - assert isinstance(resp, Response) - _err = ErrorResponse().from_json(resp.message) - assert _err["error"] == "invalid_request" - - def test_end_session_endpoint_with_cookie_wrong_client(self): - self._code_auth() - cookie = self._create_cookie("username", "a1b2c3") - - resp = self.provider.end_session_endpoint( - urlencode({"state": "abcde"}), cookie=cookie - ) - - assert isinstance(resp, Response) - _err = ErrorResponse().from_json(resp.message) - assert _err["error"] == "invalid_request" - - def test_end_session_endpoint_with_cookie_dual_login(self): - self._code_auth() - self._code_auth2() - cookie = self._create_cookie("username", "client0") - - resp = self.provider.end_session_endpoint( - urlencode({"state": "abcde"}), cookie=cookie - ) - - assert isinstance(resp, SeeOther) - assert "state=abcde" in resp.message - assert self.provider.sdb.get_by_uid("username") == [] - self._assert_cookies_expired(resp.headers) - - def test_end_session_endpoint_with_cookie_dual_login_wrong_client(self): - self._code_auth() - self._code_auth2() - cookie = self._create_cookie("username", "a1b2c3") - - resp = self.provider.end_session_endpoint( - urlencode({"state": "abcde"}), cookie=cookie - ) - - assert isinstance(resp, Response) - _err = ErrorResponse().from_json(resp.message) - assert _err["error"] == "invalid_request" - - def test_end_session_endpoint_with_id_token_hint_only(self): - id_token = self._auth_with_id_token() - assert self.provider.sdb.get_by_sub( - id_token["sub"] - ) # verify we got valid session - - id_token_hint = id_token.to_jwt(algorithm="none") - - resp = self.provider.end_session_endpoint( - urlencode({"id_token_hint": id_token_hint}) - ) - - assert isinstance(resp, SeeOther) - - assert not self.provider.sdb.get_by_sub( - id_token["sub"] - ) # verify session has been removed - self._assert_cookies_expired(resp.headers) - - def test_end_session_endpoint_with_id_token_hint_and_cookie(self): - id_token = self._auth_with_id_token() - assert self.provider.sdb.get_by_sub( - id_token["sub"] - ) # verify we got valid session - - id_token_hint = id_token.to_jwt(algorithm="none") - cookie = self._create_cookie("username", "number5") - - resp = self.provider.end_session_endpoint( - urlencode({"id_token_hint": id_token_hint}), cookie=cookie - ) - - assert isinstance(resp, SeeOther) - - assert not self.provider.sdb.get_by_sub( - id_token["sub"] - ) # verify session has been removed - self._assert_cookies_expired(resp.headers) - - def test_end_session_endpoint_with_post_logout_redirect_uri(self): - self._code_auth() - # verify we got valid session - cookie = self._create_cookie("username", "number5") - - client_id = cast(str, CLIENT_CONFIG["client_id"]) # type: str - post_logout_redirect_uri = CDB[client_id]["post_logout_redirect_uris"][0][0] - resp = self.provider.end_session_endpoint( - urlencode( - {"post_logout_redirect_uri": post_logout_redirect_uri, "state": "abcde"} - ), - cookie=cookie, - ) - assert isinstance(resp, SeeOther) - assert self.provider.sdb.get_by_uid("username") == [] - self._assert_cookies_expired(resp.headers) - - def test_end_session_endpoint_with_wrong_post_logout_redirect_uri(self): - self._code_auth() - # verify we got valid session - cookie = self._create_cookie("username", "number5") - - post_logout_redirect_uri = "https://www.example.com/logout" - resp = self.provider.end_session_endpoint( - urlencode( - {"post_logout_redirect_uri": post_logout_redirect_uri, "state": "abcde"} - ), - cookie=cookie, - ) - assert isinstance(resp, Response) - _err = ErrorResponse().from_json(resp.message) - assert _err["error"] == "invalid_request" - def test_session_state_in_auth_req_for_session_support(self, session_db_factory): provider = Provider( SERVER_INFO["issuer"], diff --git a/tests/test_oic_provider_logout.py b/tests/test_oic_provider_logout.py new file mode 100644 index 000000000..2f272983e --- /dev/null +++ b/tests/test_oic_provider_logout.py @@ -0,0 +1,1042 @@ +import copy +import os +import re +from http.cookies import SimpleCookie +from time import time +from typing import Any # noqa +from typing import Dict # noqa +from urllib.parse import parse_qs +from urllib.parse import urlencode +from urllib.parse import urlparse + +import pytest +import requests +import responses + +from oic import rndstr +from oic.exception import ParameterError +from oic.oauth2.message import ErrorResponse +from oic.oic import DEF_SIGN_ALG +from oic.oic.consumer import Consumer +from oic.oic.message import AuthorizationResponse +from oic.oic.message import EndSessionRequest +from oic.oic.provider import Provider +from oic.utils.authn.authn_context import AuthnBroker +from oic.utils.authn.client import verify_client +from oic.utils.authn.user import UserAuthnMethod +from oic.utils.authz import AuthzHandling +from oic.utils.http_util import CookieDealer +from oic.utils.http_util import Response +from oic.utils.http_util import SeeOther +from oic.utils.keyio import KeyBundle +from oic.utils.keyio import KeyJar +from oic.utils.keyio import keybundle_from_local_file +from oic.utils.sdb import DictSessionBackend +from oic.utils.sdb import session_get +from oic.utils.userinfo import UserInfo + +__author__ = "roland hedberg" + +CONSUMER_CONFIG = { + "authz_page": "/authz", + "scope": ["openid"], + "response_type": ["code"], + "user_info": {"name": None, "email": None, "nickname": None}, + "request_method": "param", +} + +SERVER_INFO = { + "version": "3.0", + "issuer": "https://connect-op.heroku.com", + "authorization_endpoint": "http://localhost:8088/authorization", + "token_endpoint": "http://localhost:8088/token", + "flows_supported": ["code", "token", "code token"], +} + +CLIENT_CONFIG = {"client_id": "number5", "config": {"issuer": SERVER_INFO["issuer"]}} + +CLIENT_CONFIG_2 = {"client_id": "client0", "config": {"issuer": SERVER_INFO["issuer"]}} + +CLIENT_SECRET = "abcdefghijklmnop" +CLIENT_ID = "client_1" + +KC_SYM = KeyBundle( + [ + {"kty": "oct", "key": CLIENT_SECRET, "use": "ver"}, + {"kty": "oct", "key": CLIENT_SECRET, "use": "sig"}, + ] +) +KC_SYM2 = KeyBundle( + [ + {"kty": "oct", "key": "drickyoughurt", "use": "sig"}, + {"kty": "oct", "key": "drickyoughurt", "use": "ver"}, + ] +) + +BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "data/keys")) +KC_RSA = keybundle_from_local_file( + os.path.join(BASE_PATH, "rsa.key"), "RSA", ["ver", "sig"] +) + +KEYJAR = KeyJar() +KEYJAR[CLIENT_ID] = [KC_SYM, KC_RSA] +KEYJAR["number5"] = [KC_SYM2, KC_RSA] +KEYJAR[""] = KC_RSA +KEYJAR["https://connect-op.heroku.com"] = KC_RSA + +USERDB = { + "user": { + "name": "Hans Granberg", + "nickname": "Hasse", + "email": "hans@example.org", + "verified": False, + "sub": "user", + }, + "username": { + "name": "Linda Lindgren", + "nickname": "Linda", + "email": "linda@example.com", + "verified": True, + "sub": "username", + "extra_claim": "extra_claim_value", + }, +} + +URLMAP = {CLIENT_ID: ["https://example.com/authz"]} + + +class DummyEventStore(object): + def __init__(self): + self.db = {} # type: Dict[str, str] + + def store(self, typ, val): + self.db[typ] = val + + +def _eq(l1, l2): + return set(l1) == set(l2) + + +class DummyAuthn(UserAuthnMethod): + def __init__(self, srv, user): + UserAuthnMethod.__init__(self, srv) + self.user = user + + def authenticated_as(self, cookie=None, **kwargs): + if cookie == "FAIL": + return None, 0 + else: + return {"uid": self.user}, time() + + +AUTHN_BROKER = AuthnBroker() +AUTHN_BROKER.add("UNDEFINED", DummyAuthn(None, "username")) + +# dealing with authorization +AUTHZ = AuthzHandling() +SYMKEY = rndstr(16) # symmetric key used to encrypt cookie info +USERINFO = UserInfo(USERDB) + + +class TestProvider(object): + CDB = { + "number5": { + "password": "hemligt", + "client_secret": "drickyoughurt", + "redirect_uris": [("http://localhost:8087/authz", None)], + "post_logout_redirect_uris": [("https://example.com/post_logout", None)], + "client_salt": "salted", + "response_types": [ + "code", + "token", + "code id_token", + "none", + "code token", + "id_token", + ], + }, + "a1b2c3": { + "redirect_uris": [("http://localhost:8088/authz", None)], + "client_salt": "salted", + "client_secret": "very_secret", + "response_types": ["code", "token", "code id_token"], + }, + "client0": { + "redirect_uris": [("http://www.example.org/authz", None)], + "client_secret": "very_secret", + "post_logout_redirect_uris": [ + ("https://www.example.org/post_logout", None) + ], + "client_salt": "salted", + "response_types": ["code", "token", "code id_token"], + }, + CLIENT_ID: { + "client_secret": CLIENT_SECRET, + "redirect_uris": [("http://localhost:8087/authz", None)], + "client_salt": "salted", + "token_endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token"], + }, + } # type: Dict[str, Dict[str, Any]] + + @pytest.fixture(autouse=True) + def create_provider(self, session_db_factory): + self.provider = Provider( + SERVER_INFO["issuer"], + session_db_factory(SERVER_INFO["issuer"]), + self.CDB, + AUTHN_BROKER, + USERINFO, + AUTHZ, + verify_client, + SYMKEY, + urlmap=URLMAP, + keyjar=KEYJAR, + ) + self.provider.baseurl = self.provider.name + self.provider.logout_verify_url = "https://127.0.0.1/logout_verify.html" + + self.cons = Consumer( + DictSessionBackend(), + CONSUMER_CONFIG.copy(), + CLIENT_CONFIG, + server_info=SERVER_INFO, + ) + self.cons.behaviour = { + "request_object_signing_alg": DEF_SIGN_ALG["openid_request_object"] + } + self.cons.keyjar[""] = KC_RSA + self.cons.keyjar.import_jwks( + self.provider.keyjar.export_jwks(), self.cons.issuer + ) + + self.cons2 = Consumer( + {}, CONSUMER_CONFIG.copy(), CLIENT_CONFIG_2, server_info=SERVER_INFO + ) + self.cons2.behaviour = { + "request_object_signing_alg": DEF_SIGN_ALG["openid_request_object"] + } + self.cons2.keyjar[""] = KC_RSA + + def _code_auth(self): + state, location = self.cons.begin( + "openid", "code", path="http://localhost:8087" + ) + return self.provider.authorization_endpoint(request=location.split("?")[1]) + + def _code_auth2(self): + state, location = self.cons2.begin( + "openid", "code", path="http://www.example.org" + ) + return self.provider.authorization_endpoint(request=location.split("?")[1]) + + def _auth_with_id_token(self): + state, location = self.cons.begin( + "openid", "id_token", path="http://localhost:8087" + ) + resp = self.provider.authorization_endpoint(request=location.split("?")[1]) + aresp = self.cons.parse_response( + AuthorizationResponse, resp.message, sformat="urlencoded" + ) + return aresp["id_token"] + + def _create_cookie(self, user, client_id, c_type="sso"): + cd = CookieDealer(self.provider) + set_cookie = cd.create_cookie( + "{}][{}".format(user, client_id), c_type, self.provider.sso_cookie_name + ) + cookies_string = set_cookie[1] + all_cookies = SimpleCookie() # type: SimpleCookie + + try: + cookies_string = cookies_string.decode() + except (AttributeError, UnicodeDecodeError): + pass + + all_cookies.load(cookies_string) + + return all_cookies + + def test_missing_post_logout_redirect_uri(self): + esr = EndSessionRequest(state="foo") + assert self.provider.verify_post_logout_redirect_uri(esr, CLIENT_ID) is None + + def test_wrong_post_logout_redirect_uri(self): + self.provider.cdb[CLIENT_ID]["post_logout_redirect_uris"] = [ + "https://example.com/plru" + ] + esr = EndSessionRequest( + state="foo", post_logout_redirect_uri="https://localhost:8087/plru" + ) + assert self.provider.verify_post_logout_redirect_uri(esr, CLIENT_ID) is None + + def test_no_post_logout_redirect_uri(self): + self.provider.cdb[CLIENT_ID]["post_logout_redirect_uris"] = [ + "https://example.com/plru", + "https://example.com/plru2", + ] + esr = EndSessionRequest(state="foo") + + assert self.provider.verify_post_logout_redirect_uri(esr, CLIENT_ID) is None + + def test_let_user_verify_logout(self): + self.provider.cdb[CLIENT_ID]["post_logout_redirect_uris"] = [ + "https://localhost:8087/plru" + ] + esr = EndSessionRequest( + state="foo", post_logout_redirect_uri="https://localhost:8087/plru" + ) + res = self.provider.let_user_verify_logout("user", esr, None, None) + assert isinstance(res, Response) + assert res.headers == [("Content-type", "text/html")] + assert res.status_code == 200 + + def test_let_user_verify_logout_with_cookie(self): + self.provider.cdb[CLIENT_ID]["post_logout_redirect_uris"] = [ + "https://localhost:8087/plru" + ] + esr = EndSessionRequest( + state="foo", post_logout_redirect_uri="https://localhost:8087/plru" + ) + res = self.provider.let_user_verify_logout( + "user", esr, [("Set-Cookie", "kaka")], None + ) + assert isinstance(res, Response) + assert set(res.headers) == { + ("Content-type", "text/html"), + ("Set-Cookie", "kaka"), + } + assert res.status_code == 200 + + def test_let_user_verify_logout_with_redirect(self): + self.provider.cdb[CLIENT_ID]["post_logout_redirect_uris"] = [ + "https://localhost:8087/plru" + ] + esr = EndSessionRequest( + state="foo", post_logout_redirect_uri="https://localhost:8087/plru" + ) + res = self.provider.let_user_verify_logout( + "user", esr, None, "https://example.com/redirect" + ) + assert isinstance(res, Response) + assert set(res.headers) == {("Content-type", "text/html")} + assert res.status_code == 200 + # make sure the redirect was propagated + txt = '<input type="hidden" name="{}" value="{}"/>'.format( + "post_logout_redirect_uri", "https://localhost:8087/plru" + ) + assert txt in res.message + + def test_let_user_verify_logout_with_id_token_hint(self): + self.provider.cdb[CLIENT_ID]["post_logout_redirect_uris"] = [ + "https://localhost:8087/plru" + ] + + esr = EndSessionRequest( + state="foo", + post_logout_redirect_uri="https://localhost:8087/plru", + id_token_hint="J.W.S", + ) + res = self.provider.let_user_verify_logout("user", esr, None, None) + assert isinstance(res, Response) + assert set(res.headers) == {("Content-type", "text/html")} + assert res.status_code == 200 + # make sure the id_token_hint was propagated + txt = '<input type="hidden" name="{}" value="{}"/>'.format( + "id_token_hint", "J.W.S" + ) + assert txt in res.message + + def test_end_session_endpoint_with_cookie(self): + self.provider.events = DummyEventStore() + + self._code_auth() + cookie = self._create_cookie("username", "number5") + + resp = self.provider.end_session_endpoint( + urlencode({"state": "abcde"}), cookie=cookie + ) + + # returns a SeeOther instance + p = urlparse(resp.message) + qs = parse_qs(p.query) + + jwt_info = self.provider.unpack_signed_jwt(qs["sjwt"][0]) + + assert jwt_info["state"] == "abcde" + assert jwt_info["uid"] == "username" + assert jwt_info["client_id"] == "number5" + assert jwt_info["redirect_uri"] == "https://example.com/post_logout" + + def test_end_session_endpoint_with_wrong_cookie(self): + # Need cookie and ID Token to figure this out + id_token = self._auth_with_id_token() + assert session_get( + self.provider.sdb, "sub", id_token["sub"] + ) # verify we got valid session + + id_token_hint = id_token.to_jwt(algorithm="none") + cookie = self._create_cookie("diggins", "number5") + + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}), cookie=cookie + ) + + assert isinstance(resp, Response) + _err = ErrorResponse().from_json(resp.message) + assert _err["error"] == "invalid_request" + assert _err["error_description"] == "Wrong user" + + def test_end_session_endpoint_with_cookie_wrong_user(self): + # Need cookie and ID Token to figure this out + id_token = self._auth_with_id_token() + assert session_get(self.provider.sdb, "sub", id_token["sub"]) + + id_token_hint = id_token.to_jwt(algorithm="none") + cookie = self._create_cookie("diggins", "number5") + + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}), cookie=cookie + ) + + assert isinstance(resp, Response) + _err = ErrorResponse().from_json(resp.message) + assert _err["error"] == "invalid_request" + assert _err["error_description"] == "Wrong user" + + def test_end_session_endpoint_with_cookie_wrong_client(self): + # Need cookie and ID Token to figure this out + id_token = self._auth_with_id_token() + assert session_get(self.provider.sdb, "sub", id_token["sub"]) + + id_token_hint = id_token.to_jwt(algorithm="none") + # Wrong client_id + cookie = self._create_cookie("username", "a1b2c3") + + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}), cookie=cookie + ) + + assert isinstance(resp, Response) + _err = ErrorResponse().from_json(resp.message) + assert _err["error"] == "invalid_request" + + def test_end_session_endpoint_with_cookie_dual_login(self): + self._code_auth() + self._code_auth2() + cookie = self._create_cookie("username", "client0") + + resp = self.provider.end_session_endpoint( + urlencode({"state": "abcde"}), cookie=cookie + ) + + # returns a SeeOther instance + p = urlparse(resp.message) + qs = parse_qs(p.query) + + jwt_info = self.provider.unpack_signed_jwt(qs["sjwt"][0]) + + assert jwt_info["state"] == "abcde" + assert jwt_info["uid"] == "username" + assert jwt_info["client_id"] == "client0" + assert jwt_info["redirect_uri"] == "https://www.example.org/post_logout" + + def test_end_session_endpoint_with_cookie_dual_login_wrong_client(self): + self._code_auth() + self._code_auth2() + cookie = self._create_cookie("username", "a1b2c3") + + resp = self.provider.end_session_endpoint( + urlencode({"state": "abcde"}), cookie=cookie + ) + + assert isinstance(resp, Response) + _err = ErrorResponse().from_json(resp.message) + assert _err["error"] == "invalid_request" + + def test_end_session_endpoint_with_id_token_hint_only(self): + id_token = self._auth_with_id_token() + assert session_get(self.provider.sdb, "sub", id_token["sub"]) + + id_token_hint = id_token.to_jwt(algorithm="none") + + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}) + ) + + # returns a SeeOther instance + p = urlparse(resp.message) + qs = parse_qs(p.query) + + jwt_info = self.provider.unpack_signed_jwt(qs["sjwt"][0]) + + assert jwt_info["uid"] == "username" + assert jwt_info["client_id"] == "number5" + assert jwt_info["redirect_uri"] == "https://example.com/post_logout" + + def test_end_session_endpoint_with_id_token_hint_and_cookie(self): + id_token = self._auth_with_id_token() + assert session_get(self.provider.sdb, "sub", id_token["sub"]) + + id_token_hint = id_token.to_jwt(algorithm="none") + cookie = self._create_cookie("username", "number5") + + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}), cookie=cookie + ) + + # returns a SeeOther instance + p = urlparse(resp.message) + qs = parse_qs(p.query) + + jwt_info = self.provider.unpack_signed_jwt(qs["sjwt"][0]) + + assert jwt_info["uid"] == "username" + assert jwt_info["client_id"] == "number5" + assert jwt_info["redirect_uri"] == "https://example.com/post_logout" + + def test_end_session_endpoint_with_post_logout_redirect_uri(self): + self._code_auth() + cookie = self._create_cookie("username", "number5") + + post_logout_redirect_uri = self.CDB[str(CLIENT_CONFIG["client_id"])][ + "post_logout_redirect_uris" + ][0][0] + resp = self.provider.end_session_endpoint( + urlencode( + {"post_logout_redirect_uri": post_logout_redirect_uri, "state": "abcde"} + ), + cookie=cookie, + ) + + # returns a SeeOther instance + p = urlparse(resp.message) + qs = parse_qs(p.query) + + jwt_info = self.provider.unpack_signed_jwt(qs["sjwt"][0]) + + assert jwt_info["state"] == "abcde" + assert jwt_info["uid"] == "username" + assert jwt_info["client_id"] == "number5" + assert jwt_info["redirect_uri"] == "https://example.com/post_logout" + + def test_end_session_endpoint_bogus_sjwt(self): + self._code_auth() + cookie = self._create_cookie("username", "number5") + + post_logout_redirect_uri = self.CDB[str(CLIENT_CONFIG["client_id"])][ + "post_logout_redirect_uris" + ][0][0] + resp = self.provider.end_session_endpoint( + urlencode( + {"post_logout_redirect_uri": post_logout_redirect_uri, "state": "abcde"} + ), + cookie=cookie, + ) + + # returns a SeeOther instance + p = urlparse(resp.message) + qs = parse_qs(p.query) + + _sjwt = qs["sjwt"][0] + _sjwt = ".".join(_sjwt.split(".")[:2]) + "." # Not signed + with pytest.raises(ValueError): + self.provider.unpack_signed_jwt(_sjwt) + + def test_end_session_endpoint_with_wrong_post_logout_redirect_uri(self): + self._code_auth() + cookie = self._create_cookie("username", "number5") + + post_logout_redirect_uri = "https://www.example.com/logout" + resp = self.provider.end_session_endpoint( + urlencode( + {"post_logout_redirect_uri": post_logout_redirect_uri, "state": "abcde"} + ), + cookie=cookie, + ) + + assert isinstance(resp, Response) + _err = ErrorResponse().from_json(resp.message) + assert _err["error"] == "invalid_request" + + def test_end_session_endpoint_with_registered_post_logout_redirect_uri_with_query_part( + self + ): + self._code_auth() + cookie = self._create_cookie("username", "number5") + + self.provider.cdb["number5"]["post_logout_redirect_uris"] = [ + ("https://www.example.com/logout", {"foo": ["bar"]}) + ] + + # No post_logout_redirect_uri in request + resp = self.provider.end_session_endpoint( + urlencode({"state": "abcde"}), cookie=cookie + ) + + assert isinstance(resp, Response) + _qp = parse_qs(resp.message.split("?")[1]) + _jwt = self.provider.unpack_signed_jwt(_qp["sjwt"][0]) + assert _jwt["redirect_uri"] == "https://www.example.com/logout?foo=bar" + + def test_back_channel_logout_no_uri(self): + self._code_auth() + + res = self.provider.do_back_channel_logout( + self.provider.cdb[CLIENT_ID], "username", "sid" + ) + assert res is None + + def test_back_channel_logout(self): + self._code_auth() + + _cdb = copy.copy(self.provider.cdb[CLIENT_ID]) + _cdb["backchannel_logout_uri"] = "https://example.com/bc_logout" + _cdb["client_id"] = CLIENT_ID + res = self.provider.do_back_channel_logout(_cdb, "username", "_sid_") + assert isinstance(res, tuple) + assert res[0] == "https://example.com/bc_logout" + _jwt = self.provider.unpack_signed_jwt(res[1]) + assert _jwt + assert _jwt["iss"] == SERVER_INFO["issuer"] + assert _jwt["aud"] == [CLIENT_ID] + assert _jwt["sub"] == "username" + assert _jwt["sid"] == "_sid_" + + def test_front_channel_logout(self): + self._code_auth() + + _cdb = copy.copy(self.provider.cdb[CLIENT_ID]) + _cdb["frontchannel_logout_uri"] = "https://example.com/fc_logout" + _cdb["client_id"] = CLIENT_ID + res = self.provider.do_front_channel_logout_iframe( + _cdb, str(SERVER_INFO["issuer"]), "_sid_" + ) + assert res == '<iframe src="https://example.com/fc_logout">' + + def test_front_channel_logout_session_required(self): + self._code_auth() + + _cdb = copy.copy(self.provider.cdb[CLIENT_ID]) + _cdb["frontchannel_logout_uri"] = "https://example.com/fc_logout" + _cdb["frontchannel_logout_session_required"] = True + _cdb["client_id"] = CLIENT_ID + res = self.provider.do_front_channel_logout_iframe( + _cdb, str(SERVER_INFO["issuer"]), "_sid_" + ) + m = re.match(r'<iframe src="([^"]+)">', str(res)) + assert m + _q = parse_qs(str(m.group(1)).split("?")[1]) + assert set(_q.keys()) == {"iss", "sid"} + + def test_front_channel_logout_session_required_uri_query(self): + self._code_auth() + + _cdb = copy.copy(self.provider.cdb[CLIENT_ID]) + _cdb["frontchannel_logout_uri"] = "https://example.com/fc_logout?foo=bar" + _cdb["frontchannel_logout_session_required"] = True + _cdb["client_id"] = CLIENT_ID + res = self.provider.do_front_channel_logout_iframe( + _cdb, str(SERVER_INFO["issuer"]), "_sid_" + ) + m = re.match(r'<iframe src="([^"]+)">', str(res)) + assert m + _q = parse_qs(str(m.group(1)).split("?")[1]) + assert set(_q.keys()) == {"foo", "iss", "sid"} + + def test_front_channel_logout_missing_url(self): + self._code_auth() + + _cdb = copy.copy(self.provider.cdb[CLIENT_ID]) + _cdb["client_id"] = CLIENT_ID + res = self.provider.do_front_channel_logout_iframe( + _cdb, str(SERVER_INFO["issuer"]), "_sid_" + ) + assert res is None + + def test_logout_from_client_bc(self): + self._code_auth() + self.provider.cdb[CLIENT_ID][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb[CLIENT_ID]["client_id"] = CLIENT_ID + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + res = self.provider.logout_info_for_one_client(_sid, CLIENT_ID) + assert set(res.keys()) == {"back_channel", "front_channel"} + assert res["back_channel"] != {} + assert res["front_channel"] == {} + assert set(res["back_channel"].keys()) == {CLIENT_ID} + _spec = res["back_channel"][CLIENT_ID] + assert _spec[0] == "https://example.com/bc_logout" + _jwt = self.provider.unpack_signed_jwt(_spec[1]) + assert _jwt + assert _jwt["iss"] == SERVER_INFO["issuer"] + assert _jwt["aud"] == [CLIENT_ID] + assert _jwt["sid"] == _sid + + def test_logout_from_client_fc(self): + self._code_auth() + del self.provider.cdb[CLIENT_ID]["backchannel_logout_uri"] + self.provider.cdb[CLIENT_ID][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb[CLIENT_ID]["client_id"] = CLIENT_ID + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + res = self.provider.logout_info_for_one_client(_sid, CLIENT_ID) + assert set(res.keys()) == {"front_channel", "back_channel"} + assert res["back_channel"] == {} + assert set(res["front_channel"].keys()) == {CLIENT_ID} + _spec = res["front_channel"][CLIENT_ID] + assert _spec == '<iframe src="https://example.com/fc_logout">' + + def test_logout_from_client(self): + self._code_auth() + self._code_auth2() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = CLIENT_ID + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + res = self.provider.logout_info_for_all_clients(sid=_sid) + assert res + assert set(res.keys()) == {"back_channel", "front_channel"} + assert set(res["front_channel"].keys()) == {"number5"} + _spec = res["front_channel"]["number5"] + assert _spec == '<iframe src="https://example.com/fc_logout">' + assert set(res["back_channel"].keys()) == {"client0"} + _spec = res["back_channel"]["client0"] + assert _spec[0] == "https://example.com/bc_logout" + _jwt = self.provider.unpack_signed_jwt(_spec[1]) + assert _jwt + assert _jwt["iss"] == SERVER_INFO["issuer"] + assert _jwt["aud"] == ["client0"] + + def test_logout_spec_all(self): + self._code_auth() + self._code_auth2() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = CLIENT_ID + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + logout_spec_all = self.provider.logout_info_for_all_clients(sid=_sid) + + assert set(logout_spec_all.keys()) == {"back_channel", "front_channel"} + assert set(logout_spec_all["back_channel"].keys()) == {"client0"} + assert set(logout_spec_all["front_channel"].keys()) == {"number5"} + + def test_do_verified_logout_all(self): + self._code_auth() + self._code_auth2() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = CLIENT_ID + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + with responses.RequestsMock() as rsps: + rsps.add(rsps.POST, "https://example.com/bc_logout", status=200) + res = self.provider.do_verified_logout(_sid, CLIENT_ID, alla=True) + + assert set(res.keys()) == {"iframe", "cookie"} + + def test_do_verified_logout_just_the_one(self): + self.provider.events = DummyEventStore() + + self._code_auth() + self._code_auth2() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = CLIENT_ID + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + # There is no back channel logout, hence there should be no HTTP POST + exception = requests.ConnectionError() + with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps: + rsps.add(responses.POST, "https://example.com/bc_logout", body=exception) + res = self.provider.do_verified_logout(_sid, CLIENT_ID, alla=False) + + assert set(res.keys()) == {"iframe", "cookie"} + + def test_do_verified_logout_the_other(self): + self._code_auth() + self._code_auth2() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = CLIENT_ID + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + # This only does back channel logout + with responses.RequestsMock() as rsps: + rsps.add(rsps.POST, "https://example.com/bc_logout", status=200) + res = self.provider.do_verified_logout(_sid, "client0", alla=False) + + assert set(res.keys()) == {"cookie"} + + def test_do_verified_logout_the_other_back_channel_failed(self): + self._code_auth() + self._code_auth2() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = CLIENT_ID + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + # Does back channel logout and it will fail + with responses.RequestsMock() as rsps: + rsps.add(rsps.POST, "https://example.com/bc_logout", status=400) + res = self.provider.do_verified_logout(_sid, "client0", alla=False) + + assert list(res.keys()) == [] + + def test_end_session_endpoint_no_post_logout_redirect_uri(self): + self._code_auth() + cookie = self._create_cookie("username", "number5") + + self.provider.cdb["number5"]["post_logout_redirect_uris"] = [ + ("https://example.com/plru", ""), + ("https://example.com/plru2", ""), + ] + + res = self.provider.end_session_endpoint( + urlencode({"state": "abcde"}), cookie=cookie + ) + assert isinstance(res, Response) + assert res.status_code == 400 + + def test_logout_info_for_all_clients_no_params(self): + with pytest.raises(ParameterError): + self.provider.logout_info_for_all_clients() + + def test_do_back_channel_logout_no_backchannel(self): + self._code_auth() + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + _sub = self.provider.sdb[_sid]["sub"] + # + if "backchannel_logout_uri" in self.provider.cdb["number5"]: + del self.provider.cdb["number5"]["backchannel_logout_uri"] + + res = self.provider.do_back_channel_logout( + self.provider.cdb["number5"], _sub, _sid + ) + assert res is None + + def test_id_token_hint_multiple_aud(self): + id_token = self._auth_with_id_token() + assert session_get( + self.provider.sdb, "sub", id_token["sub"] + ) # verify we got valid session + + self.provider.cdb["number5"]["post_logout_redirect_uris"] = [ + ("https://example.com/plru", "") + ] + + # add another aud and an azp. + id_token["azp"] = id_token["aud"][0] + id_token["aud"].append("foobar") + id_token_hint = id_token.to_jwt(algorithm="none") + + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}) + ) + + assert isinstance(resp, SeeOther) + + def test_id_token_hint_aud_does_not_match_client_id(self): + id_token = self._auth_with_id_token() + assert session_get( + self.provider.sdb, "sub", id_token["sub"] + ) # verify we got valid session + + # add another aud and an azp. + id_token_hint = id_token.to_jwt(algorithm="none") + + # Mess with the session DB + _sid = list(self.provider.sdb._db.storage.keys())[0] + self.provider.sdb[_sid]["client_id"] = "something else" + resp = self.provider.end_session_endpoint( + urlencode({"id_token_hint": id_token_hint}) + ) + + assert isinstance(resp, Response) + assert resp.status_code == 400 + + def test_no_back_or_front_channel_logout(self): + self._code_auth() + + # Mess with client DB + for c in ["backchannel_logout_uri", "frontchannel_logout_uri"]: + if c in self.provider.cdb["number5"]: + del self.provider.cdb["number5"][c] + + resp = self.provider.do_verified_logout( + sid=list(self.provider.sdb._db.storage.keys())[0], client_id="number5" + ) + + assert resp == {} + + def test_back_channel_logout_fails(self): + self._code_auth() + + # client0 + self.provider.cdb["client0"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["client0"]["client_id"] = "client0" + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + # There is no back channel logout, hence there should be no HTTP POST + with responses.RequestsMock(): + res = self.provider.do_verified_logout(_sid, "client0", alla=False) + + assert res == {} + + def test_logout_info_for_one_client_no_logout_info(self): + self._code_auth() + + # Mess with client DB + for c in ["backchannel_logout_uri", "frontchannel_logout_uri"]: + if c in self.provider.cdb["number5"]: + del self.provider.cdb["number5"][c] + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + resp = self.provider.logout_info_for_one_client(_sid, "number5") + + assert resp == {"back_channel": {}, "front_channel": {}} + + def test_unknown_client(self): + self._code_auth() + cookie = self._create_cookie("username", "unknown") + + resp = self.provider.end_session_endpoint( + urlencode({"state": "abcde"}), cookie=cookie + ) + + assert isinstance(resp, Response) + assert resp.status_code == 400 + + def test_no_cookie_no_id_token_hint(self): + self._code_auth() + + resp = self.provider.end_session_endpoint(urlencode({"state": "abcde"})) + + assert isinstance(resp, Response) + assert resp.status_code == 400 + + def test_back_channel_logout_failed_front_channel_logout_exists(self): + self._code_auth() + + # client0 + self.provider.cdb["number5"][ + "backchannel_logout_uri" + ] = "https://example.com/bc_logout" + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = "number5" + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + + # Does back channel logout and it will fail + with responses.RequestsMock() as rsps: + rsps.add(rsps.POST, "https://example.com/bc_logout", status=400) + res = self.provider.do_verified_logout(_sid, "client0", alla=True) + + assert set(res.keys()) == {"cookie", "iframe"} + + def test_logout_from_clients_one_without_logout_info(self): + self._code_auth() + self._code_auth2() + + # Mess with client DB + # neither back channel nor front channel + for c in ["backchannel_logout_uri", "frontchannel_logout_uri"]: + if c in self.provider.cdb["client0"]: + del self.provider.cdb["client0"][c] + + self.provider.cdb["client0"]["client_id"] = "client0" + + # both back channel and front channel + self.provider.cdb["number5"][ + "frontchannel_logout_uri" + ] = "https://example.com/fc_logout" + self.provider.cdb["number5"]["client_id"] = "number5" + + # Get a session ID, anyone will do. + # I know the session backend DB is a DictSessionBackend so I can use that + _sid = list(self.provider.sdb._db.storage.keys())[0] + res = self.provider.logout_info_for_all_clients(sid=_sid) + assert set(res.keys()) == {"back_channel", "front_channel"} + assert set(res["back_channel"].keys()) == {"number5"} + assert set(res["front_channel"].keys()) == {"number5"} diff --git a/tests/test_sdb.py b/tests/test_sdb.py index dda86dc18..697b8c31b 100644 --- a/tests/test_sdb.py +++ b/tests/test_sdb.py @@ -17,11 +17,11 @@ from oic.utils.sdb import Crypt from oic.utils.sdb import DefaultToken from oic.utils.sdb import DictRefreshDB -from oic.utils.sdb import DictSessionBackend from oic.utils.sdb import ExpiredToken from oic.utils.sdb import SessionDB from oic.utils.sdb import WrongTokenType from oic.utils.sdb import create_session_db +from oic.utils.session_backend import DictSessionBackend from oic.utils.time_util import utc_time_sans_frac __author__ = "rohe0002" diff --git a/tests/test_token.py b/tests/test_token.py index 6e6206ae3..ddcca3075 100644 --- a/tests/test_token.py +++ b/tests/test_token.py @@ -11,9 +11,9 @@ from oic.utils.sdb import AccessCodeUsed from oic.utils.sdb import AuthnEvent from oic.utils.sdb import DefaultToken -from oic.utils.sdb import DictSessionBackend from oic.utils.sdb import ExpiredToken from oic.utils.sdb import SessionDB +from oic.utils.session_backend import DictSessionBackend __author__ = "roland" diff --git a/tests/test_x_provider.py b/tests/test_x_provider.py index 3de635867..80f54706a 100644 --- a/tests/test_x_provider.py +++ b/tests/test_x_provider.py @@ -25,10 +25,10 @@ from oic.utils.keyio import KeyBundle from oic.utils.keyio import KeyJar from oic.utils.sdb import DefaultToken -from oic.utils.sdb import DictSessionBackend from oic.utils.sdb import SessionDB from oic.utils.sdb import lv_pack from oic.utils.sdb import lv_unpack +from oic.utils.session_backend import DictSessionBackend CLIENT_CONFIG = {"client_id": "client1", "config": {"issuer": "https://example.com/as"}}