diff --git a/CHANGELOG.md b/CHANGELOG.md index 49db52662..9da4d74c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on the [KeepAChangeLog] project. ### Changed - [#688] Second stage of adding logout support. +- [#700] Third stage of adding logout support, provider side ### Fixed - [#602] Fixed uncaught error on unpacking of message @@ -28,6 +29,7 @@ The format is based on the [KeepAChangeLog] project. [#683]: https://github.com/OpenIDC/pyoidc/issues/683 [#688]: https://github.com/OpenIDC/pyoidc/pull/688 [#698]: https://github.com/OpenIDC/pyoidc/pull/698 +[#700]: https://github.com/OpenIDC/pyoidc/pull/700 ## 1.0.1 [2019-06-30] diff --git a/src/oic/oic/__init__.py b/src/oic/oic/__init__.py index a066e3fa1..6fe3e532b 100644 --- a/src/oic/oic/__init__.py +++ b/src/oic/oic/__init__.py @@ -382,9 +382,6 @@ def __init__( self.kid = {"sig": {}, "enc": {}} self.requests_dir = requests_dir - # LOGOUT related - self.post_logout_redirect_uris = [] - def _get_id_token(self, **kwargs): try: return kwargs["id_token"] diff --git a/src/oic/oic/provider.py b/src/oic/oic/provider.py index 7155414a3..1616ac49e 100644 --- a/src/oic/oic/provider.py +++ b/src/oic/oic/provider.py @@ -4,11 +4,15 @@ import logging import socket import time +import uuid from functools import cmp_to_key +from http.cookies import SimpleCookie from typing import Any from typing import Dict # noqa from typing import List # noqa from typing import Optional +from typing import Tuple +from typing import Union from urllib.parse import parse_qs from urllib.parse import splitquery # type: ignore from urllib.parse import unquote @@ -34,11 +38,11 @@ from oic.exception import MessageException from oic.exception import NotForMe from oic.exception import ParameterError -from oic.exception import SubMismatch from oic.exception import UnSupported from oic.oauth2 import compact from oic.oauth2 import error_response from oic.oauth2 import redirect_authz_error +from oic.oauth2.base import PBase from oic.oauth2.exception import CapabilitiesMisMatch from oic.oauth2.exception import VerificationError from oic.oauth2.message import Message @@ -52,6 +56,7 @@ from oic.oic import Server from oic.oic import claims_match from oic.oic import scope2claims +from oic.oic.message import BACK_CHANNEL_LOGOUT_EVENT from oic.oic.message import SCOPE2CLAIMS from oic.oic.message import AccessTokenResponse from oic.oic.message import AuthorizationResponse @@ -69,6 +74,7 @@ from oic.utils.http_util import Response from oic.utils.http_util import SeeOther from oic.utils.http_util import Unauthorized +from oic.utils.jwt import JWT from oic.utils.keyio import KEYS # noqa from oic.utils.keyio import KeyBundle from oic.utils.keyio import KeyJar # noqa @@ -238,6 +244,9 @@ def __init__( template_renderer=render_template, extra_scope_dict=None, message_factory=OIDCMessageFactory, + post_logout_page="", + self_signing_alg="RS256", + logout_path="", ): # This has to be defined before calling super() @@ -294,6 +303,13 @@ def __init__( # additional attributes self.schema = schema + # Logout connected attributes + self.httpc = PBase(verify_ssl=verify_ssl, keyjar=self.keyjar) + self.post_logout_page = post_logout_page + self.signing_alg = self_signing_alg + self.logout_path = logout_path + self.logout_verify_url = "" + @property def default_capabilities(self): """Define default capabilities for implementation.""" @@ -488,195 +504,12 @@ def required_user(self, areq): return req_user - def verify_post_logout_redirect_uri(self, esreq, client_id): - """ - Verify the post_logout_redirect_uris. - - :param esreq: End session request - :param client_id: The Client ID - :return: - """ - try: - redirect_uri = esreq["post_logout_redirect_uri"] - except KeyError: - logger.debug("Missing post_logout_redirect_uri parameter") - return - - try: - accepted_urls = self.cdb[client_id]["post_logout_redirect_uris"] - if self._verify_url(redirect_uri, accepted_urls): - return redirect_uri - except Exception as exc: - msg = "An error occurred while verifying redirect URI: %s" - logger.debug(msg, str(exc)) - - return None - def is_session_revoked(self, request="", cookie=None): areq = parse_qs(request) authn, _ = self.pick_auth(areq) identity, _ts = authn.authenticated_as(cookie) return self.sdb.is_revoke_uid(identity["uid"]) - def let_user_verify_logout(self, uid, esr, cookie, redirect_uri): - if cookie: - headers = [cookie] - else: - headers = [] - - self.sdb.set_verified_logout(uid) - - if redirect_uri is not None: - redirect = redirect_uri - else: - redirect = "/" - try: - tmp_id_token_hint = esr["id_token_hint"] - except KeyError: - tmp_id_token_hint = "" - - context = { - "id_token_hint": tmp_id_token_hint, - "post_logout_redirect_uri": esr["post_logout_redirect_uri"], - "key": self.sdb.get_verify_logout(uid), - "redirect": redirect, - "action": "/" + EndSessionEndpoint("").etype, - } - return Response( - self.template_renderer("verify_logout", context), headers=headers - ) - - def _get_sids_from_cookie(self, cookie): - """Get cookie_dealer, client_id and sids from cookie.""" - if cookie is None: - return None, None, None - - cookie_dealer = CookieDealer(srv=self) - client_id = sids = None - - _cval = cookie_dealer.get_cookie_value(cookie, self.sso_cookie_name) - if _cval: - (value, _ts, typ) = _cval - if typ == "sso": - uid, client_id = value.split(DELIM) - try: - sids = session_get(self.sdb, "uid", uid) - except (KeyError, IndexError): - raise SubMismatch("Mismatch uid") - return cookie_dealer, client_id, sids - - def end_session_endpoint(self, request="", cookie=None, **kwargs): - esr = self.server.message_factory.get_request_type( - "endsession_endpoint" - )().from_urlencoded(request) - - logger.debug("End session request: %s", format(sanitize(esr.to_dict()))) - - # 2 ways of find out client ID and user. Either through a cookie - # or using the id_token_hint - try: - cookie_dealer, client_id, sids = self._get_sids_from_cookie(cookie) - except SubMismatch as error: - return error_response("invalid_request", "%s" % error) - - if "id_token_hint" in esr: - id_token_hint = IdToken().from_jwt( - esr["id_token_hint"], keyjar=self.keyjar, verify=True - ) - far_away = 86400 * 30 # 30 days - - if client_id: - args = {"client_id": client_id} - else: - args = {} - - try: - id_token_hint.verify( - iss=self.baseurl, skew=far_away, nonce_storage_time=far_away, **args - ) - except (VerificationError, NotForMe) as err: - logger.warning("Verification error on id_token_hint: {}".format(err)) - return error_response("invalid_request", "Bad Id Token hint") - - sub = id_token_hint["sub"] - - if sids: - match = False - # verify that 'sub' are bound to 'user' - for sid in sids: - if self.sdb[sid]["sub"] == sub: - match = True - break - if not match: - return error_response("invalid_request", "Wrong user") - else: - try: - sids = self.sdb.get_by_sub(sub) - except IndexError: - pass - - if not client_id: - if len(id_token_hint["aud"]) == 1: - client_id = id_token_hint["aud"][0] - else: - client_id = id_token_hint["azp"] - - if not client_id: - return error_response("invalid_request", "Could not find client ID") - if client_id not in self.cdb: - return error_response("invalid_request", "Unknown client") - - match = False - for sid in sids: - if self.sdb[sid]["client_id"] == client_id: - match = True - break - if not match: - return error_response("invalid_request", "Unmatched client") - - redirect_uri = None - if "post_logout_redirect_uri" in esr: - redirect_uri = self.verify_post_logout_redirect_uri(esr, client_id) - if not redirect_uri: - msg = "Post logout redirect URI verification failed!" - return error_response("invalid_request", msg) - else: # If only one registered use that one - if len(self.cdb[client_id]["post_logout_redirect_uris"]) == 1: - _base, _query = self.cdb[client_id]["post_logout_redirect_uris"][0] - if _query: - query_string = urlencode( - [(key, v) for key in _query for v in _query[key]] - ) - redirect_uri = "%s?%s" % (_base, query_string) - else: - redirect_uri = _base - - for sid in sids: - del self.sdb[sid] - - # Delete cookies - authn, _ = self.pick_auth(esr) - headers = [authn.delete_cookie(), self.delete_session_cookie()] - if cookie_dealer: - headers.append(cookie_dealer.delete_cookie(self.sso_cookie_name)) - - if redirect_uri is not None: - try: - _state = esr["state"] - except KeyError: - redirect_uri = str(redirect_uri) - else: - if "?" in redirect_uri: - redirect_uri += "&" - else: - redirect_uri += "?" - - redirect_uri += urlencode({"state": _state}) - - return SeeOther(redirect_uri, headers=headers) - - return Response("Successful logout", headers=headers) - def verify_endpoint(self, request="", cookie=None, **kwargs): """ Verify endpoint. @@ -852,7 +685,7 @@ def authz_part2(self, user, areq, sid, **kwargs): aresp["client_id"] = areq["client_id"] if self.events: - self.events.store("Protocol response", aresp) + self.events.store("protocol response", aresp) response = sanitize(aresp.to_dict()) logger.info("authorization response: %s", response) @@ -2018,7 +1851,7 @@ def do_key_rollover(self, jwks, kid_template): # print to the jwks file dump_jwks(self.keyjar[""], self.jwks_name) - def remove_inactive_keys(self, more_then=3600): + def remove_inactive_keys(self, more_then: int = 3600): """ Remove all keys that has been inactive 'more_then' seconds. @@ -2052,3 +1885,476 @@ def get_by_sub_and_(self, sub: str, key: str, val: Any) -> Optional[str]: except KeyError: continue return None + + # Below are LOGOUT related methods + + def verify_post_logout_redirect_uri( + self, esreq: Message, client_id: str + ) -> Optional[str]: + """ + Verify a post logout URI. + + :param esreq: End session request + :param client_id: The Client ID + :return: The post logout URI if it was OK otherwise None + """ + try: + redirect_uri = esreq["post_logout_redirect_uri"] + except KeyError: + logger.debug("Missing post_logout_redirect_uri parameter") + return None + + try: + accepted_urls = self.cdb[client_id]["post_logout_redirect_uris"] + if self._verify_url(redirect_uri, accepted_urls): + return redirect_uri + except Exception as exc: + msg = "An error occurred while verifying redirect URI: %s" + logger.debug(msg, str(exc)) + + return None + + def let_user_verify_logout( + self, + uid: str, + esr: Message, + cookie: Optional[List[Tuple[str, str]]], + redirect_uri: Optional[str], + ) -> Response: + """ + Show a page to the user, that asks whether logout should be performed. + + :param uid: User ID + :param esr: EndSessionRequest instance + :param cookie: A cookie + :param redirect_uri: URL + :return: Response instance + """ + if cookie: + headers = cookie + else: + headers = [] + + self.sdb.set_verify_logout(uid) + + if redirect_uri is not None: + redirect = redirect_uri + else: + redirect = "/" + try: + tmp_id_token_hint = esr["id_token_hint"] + except KeyError: + tmp_id_token_hint = "" + + context = { + "id_token_hint": tmp_id_token_hint, + "post_logout_redirect_uri": esr["post_logout_redirect_uri"], + "key": self.sdb.get_verify_logout(uid), + "redirect": redirect, + "action": "/" + EndSessionEndpoint("").etype, + } + return Response( + self.template_renderer("verify_logout", context), headers=headers + ) + + def _get_uid_from_cookie( + self, cookie: Optional[Union[str, SimpleCookie]] + ) -> Tuple[Optional[CookieDealer], Optional[str], Optional[str]]: + """ + Get cookie_dealer, client_id and uid from cookie. + + :param cookie: Received cookie + :return: Tuple containing CookieDealer instance, client ID and User ID + """ + if cookie is None: + return None, None, None + + cookie_dealer = CookieDealer(srv=self) + client_id = uid = None + + _cval = cookie_dealer.get_cookie_value(cookie, self.sso_cookie_name) + if _cval: + (value, _ts, typ) = _cval + if typ == "sso": + uid, client_id = value.split(DELIM) + + return cookie_dealer, client_id, uid + + def do_back_channel_logout( + self, cinfo: dict, sub: str, sid: str + ) -> Optional[Tuple[str, str]]: + """ + Prepare information to be used to do a back-channel logout. + + :param cinfo: Client information + :param sub: Subject identifier + :param sid: The Issuer ID + :return: Tuple with logout URI and signed logout token + """ + try: + back_channel_logout_uri = cinfo["backchannel_logout_uri"] + except KeyError: + return None + + # always include sub and sid so I don't check for + # backchannel_logout_session_required + + payload = { + "sub": sub, + "sid": sid, + "events": {BACK_CHANNEL_LOGOUT_EVENT: {}}, + "jti": uuid.uuid4().hex, + } + + try: + alg = cinfo["id_token_signed_response_alg"] + except KeyError: + alg = self.capabilities["id_token_signing_alg_values_supported"][0] + + _jws = JWT(self.keyjar, iss=self.name, lifetime=86400, sign_alg=alg) + sjwt = _jws.pack(aud=cinfo["client_id"], **payload) + + return back_channel_logout_uri, sjwt + + def clean_sessions(self, usids: List[str]): + """ + Remove Session IDs from the session DB. + + :param usids: List of session IDs + """ + _sdb = self.sdb + # Clean out all sessions + for sid in usids: + del _sdb[sid] + + def logout_info_for_all_clients( + self, uid: Optional[str] = "", sid: Optional[str] = "" + ) -> Dict: + """ + Collect information necessary to logout one user from all clients he/she has been using. + + One of uid and sid MUST be provided. If uid is provided sid is ignored. + NO changes are made to the session DB. + No logout is actually performed + :param uid: User ID + :param sid: Session ID + :return: Dictionary with logout information + """ + if not uid: + if not sid: + raise ParameterError("One of uid and sid MUST be provided") + else: + uid = self.sdb.get_uid_by_sid(sid) + + # Find all the session IDs this user has gotten + usids = session_get(self.sdb, "uid", uid) + # Find all RPs this user has logged it from + _client_sid = {} + for usid in usids: + _client_sid[self.sdb[usid]["client_id"]] = usid + + # Front-/Backchannel logout ? + _cdb = self.cdb + _iss = self.name + bc_logouts = {} + fc_iframes = {} + for _cid, _csid in _client_sid.items(): + if "backchannel_logout_uri" in _cdb[_cid]: + _sub = self.sdb[_csid]["sub"] + bc_logouts[_cid] = self.do_back_channel_logout(_cdb[_cid], _sub, _csid) + if "frontchannel_logout_uri" in _cdb[_cid]: + # Construct an IFrame + fc_iframes[_cid] = self.do_front_channel_logout_iframe( + _cdb[_cid], _iss, _csid + ) + + return {"back_channel": bc_logouts, "front_channel": fc_iframes} + + def logout_info_for_one_client(self, session_id: str, client_id: str) -> Dict: + """ + Collect information necessary to log out from client. + + Note that if a client has both back channel and front channel logout registered both + will be handled. + :param session_id: Session ID + :param client_id: Client ID + :return: Dictionary with back_channel and front_channel logout info. + """ + logout_spec = { + "back_channel": {}, # back-channel logout information + "front_channel": {}, # front-channel logout information + } # type: Dict[str, Dict[str, Union[None, str, Tuple[str,str]]]] + + if "backchannel_logout_uri" in self.cdb[client_id]: + _subject_id = self.sdb[session_id]["sub"] + logout_spec["back_channel"] = { + client_id: self.do_back_channel_logout( + self.cdb[client_id], _subject_id, session_id + ) + } + elif "frontchannel_logout_uri" in self.cdb[client_id]: + # Construct an IFrame + _iframe = self.do_front_channel_logout_iframe( + self.cdb[client_id], self.name, session_id + ) + logout_spec["front_channel"] = {client_id: _iframe} + + return logout_spec + + def end_session_endpoint( + self, + request: str = "", + cookie: Optional[Union[str, SimpleCookie]] = None, + **kwargs + ) -> Response: + """ + Handle a RP initiated Logout request. + + :param request: The logout request + :param cookie: + :param kwargs: + :return: Returns a dictionary with one key 'sjwt' and the value + being a signed JWT token with session information. + """ + _req = self.server.message_factory.get_request_type("endsession_endpoint") + esr = _req().from_urlencoded(request) + + logger.debug("End session request: %s", sanitize(esr.to_dict())) + + if self.events: + self.events.store("protocol request", esr) + + # 2 ways of find out client ID and user. Either through a cookie + # or using the id_token_hint. If I get information from both make sure they match + _, client_id, uid = self._get_uid_from_cookie(cookie) + + sid = "" + + if "id_token_hint" in esr: + id_token_hint = IdToken().from_jwt( + esr["id_token_hint"], keyjar=self.keyjar, verify=True + ) + far_away = 86400 * 30 # 30 days + + if client_id: + args = {"client_id": client_id} + else: + args = {} + + try: + id_token_hint.verify( + iss=self.baseurl, skew=far_away, nonce_storage_time=far_away, **args + ) + except (VerificationError, NotForMe) as err: + logger.warning("Verification error on id_token_hint: %s", err) + return error_response("invalid_request", "Bad Id Token hint") + + sub = id_token_hint["sub"] + + if uid is not None: + # verify that 'sub' are bound to 'uid' + if self.sdb.get_uid_by_sub(sub) != uid: + return error_response("invalid_request", "Wrong user") + else: + uid = self.sdb.get_uid_by_sub(sub) + + if client_id is None: + if len(id_token_hint["aud"]) == 1: + client_id = id_token_hint["aud"][0] + else: + client_id = id_token_hint["azp"] + + sids = session_get(self.sdb, "sub", sub) + + matching_client_id = False + for sid in sids: + if self.sdb[sid]["client_id"] == client_id: + matching_client_id = True + break + + if not matching_client_id: + return error_response( + "invalid_request", "Could not find a matching client ID" + ) + + if not client_id: + return error_response("invalid_request", "Could not find client ID") + if client_id not in self.cdb: + return error_response("invalid_request", "Unknown client") + + redirect_uri = None + if "post_logout_redirect_uri" in esr: + redirect_uri = self.verify_post_logout_redirect_uri(esr, client_id) + if not redirect_uri: + msg = "Post logout redirect URI verification failed!" + return error_response("invalid_request", msg) + else: # If only one registered use that one + try: + _ruri = self.cdb[client_id]["post_logout_redirect_uris"] + except KeyError: + msg = "Missing post_logout_redirect_uri" + return error_response("invalid_request", msg) + + if len(_ruri) == 1: + _base, _query = _ruri[0] + if _query: + query_string = urlencode( + [(key, v) for key in _query for v in _query[key]] + ) + redirect_uri = "%s?%s" % (_base, query_string) + else: + redirect_uri = _base + else: + return error_response( + "invalid_request", + descr="Missing post_logout_redirect_uri and more then one post_logout_redirect_uris", + ) + + # redirect user to OP logout verification page + payload = { + "uid": uid, + "client_id": client_id, + "redirect_uri": redirect_uri, + "sid": sid, + } + if "state" in esr: + payload["state"] = esr["state"] + + if self.events: + self.events.store("object args", "{}".format(payload)) + + # From me to me + _jws = JWT( + self.keyjar, iss=self.name, lifetime=86400, sign_alg=self.signing_alg + ) + sjwt = _jws.pack(aud=[self.name], **payload) + + location = "{}?{}".format(self.logout_verify_url, urlencode({"sjwt": sjwt})) + return SeeOther(location) + + def unpack_signed_jwt(self, sjwt: str): + """Will unpack a signed JWT.""" + verifier = JWT(self.keyjar) + try: + return verifier.unpack(sjwt) + except Exception as err: + raise ValueError(err) + + def do_verified_logout( + self, sid: str, client_id: str, alla: bool = False, **kwargs + ) -> Dict: + """ + Perform back channel logout and prepares the information needed for front channel logout. + + :param sid: Session ID + :param client_id: Client ID + :param alla: Whether logout should be attempted from all clients or just one specific client. + :param kwargs: + :return: + """ + if alla: + uid = self.sdb.get_uid_by_sid(sid) + logout_spec = self.logout_info_for_all_clients(uid) + # Find all the session IDs this user has gotten + sids = session_get(self.sdb, "uid", uid) + else: + logout_spec = self.logout_info_for_one_client( + session_id=sid, client_id=client_id + ) + sids = [sid] + + if self.events: + self.events.store("object args", "{}".format(logout_spec)) + + if not logout_spec["back_channel"] and not logout_spec["front_channel"]: + return {} + + # take care of Back channel logout first + if logout_spec["back_channel"]: + failed = [] + for _cid, spec in logout_spec["back_channel"].items(): + _url, sjwt = spec + logger.info("logging out from {} at {}".format(_cid, _url)) + + try: + res = self.httpc.http_request( + _url, "POST", data="logout_token={}".format(sjwt) + ) + except Exception as err: + # Can't be more specific because I don't know which http client are used + logger.error("failed to logout from {}".format(_cid)) + if self.events: + self.events.store("exception", "{}: {}".format(_cid, str(err))) + failed.append(_cid) + continue + + if res.status_code < 300: + logger.info("Logged out from {}".format(_cid)) + else: + _errstr = "failed to logout from {}".format(_cid) + if self.events: + self.events.store("fault", _errstr) + logger.error(_errstr) + failed.append(_cid) + # If no back-channel logout worked and there is no front-channel logout + # regard this as a failure. + if len(failed) == len(logout_spec["back_channel"]): + if not logout_spec["front_channel"]: + return {} + + # kill cookies + kaka1 = self.write_session_cookie("removed") + kaka2 = self.cookie_func( + "", typ="sso", cookie_name=self.sso_cookie_name, kill=True + ) + res = {"cookie": [kaka1, kaka2]} + + if logout_spec["front_channel"]: + for _cid in logout_spec["front_channel"].keys(): + logger.info("Adding logout iframe for {}".format(_cid)) + res["iframe"] = list(logout_spec["front_channel"].values()) + + # Clean out all sessions + self.clean_sessions(sids) + + return res + + @staticmethod + def do_front_channel_logout_iframe( + client_info: Dict, issuer: str, session_id: str + ) -> Optional[str]: + """ + Construct a front channel logout IFrame. + + :param client_info: Client info + :param issuer: Issuer ID + :param session_id: Session ID + :return: HTML IFrame string + """ + try: + frontchannel_logout_uri = client_info["frontchannel_logout_uri"] + except KeyError: + return None + + try: + flsr = client_info["frontchannel_logout_session_required"] + except KeyError: + flsr = False + + if flsr: + _query = {"iss": issuer, "sid": session_id} + if "?" in frontchannel_logout_uri: + p = urlparse(frontchannel_logout_uri) + _args = {k: v[0] for k, v in parse_qs(p.query).items()} + _args.update(_query) + _query = _args + _np = p._replace(query="") + frontchannel_logout_uri = _np.geturl() + + _iframe = '