-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
130 lines (94 loc) · 3.74 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import os
from base64 import b64encode
from http import HTTPStatus
from django.conf.urls import url
from django.http import HttpResponse, HttpResponseRedirect, HttpResponseBadRequest, HttpResponseForbidden
from django.template.loader import render_to_string
from functools import wraps
from client import OIDCClient
DEBUG = True
SECRET_KEY = b64encode(os.urandom(32)).decode()
ROOT_URLCONF = __name__
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [
'./templates/'
],
},
]
INSTALLED_APPS = [
'django.contrib.sessions',
]
MIDDLEWARE = [
'django.contrib.sessions.middleware.SessionMiddleware',
]
SESSION_ENGINE = 'django.contrib.sessions.backends.file'
client = OIDCClient(os.getenv('ISSUER_BASE_URL'),
os.getenv('CLIENT_ID'),
os.getenv('CLIENT_SECRET'),
os.getenv('REDIRECT_URI'))
def index(request):
html = render_to_string('index.html')
return HttpResponse(html)
def login(request):
nonce = b64encode(os.urandom(32)).decode()
state = b64encode(os.urandom(32)).decode()
request.session['nonce'] = nonce
request.session['state'] = state
auth_url = client.auth_url(state, scope=['openid', 'profile', 'email'], nonce = nonce)
return HttpResponseRedirect(auth_url)
def handle_callback(request):
code = request.GET.get('code', '')
if code == '':
return HttpResponseBadRequest('bad or missing code')
state = request.GET.get('state', '')
if request.session.get('state', '') == '' or request.session['state'] != state:
return HttpResponseBadRequest('bad state')
request.session.pop('state')
tokens = client.exchange_token(code)
saved_nonce = request.session['nonce']
id_token = client.decode(tokens.id_token, nonce=saved_nonce)
logout_redirect_url = os.getenv('REDIRECT_URI').replace('/callback', '')
logout_url = client.end_session_endpoint + '?id_token_hint=' + tokens.id_token + '&post_logout_redirect_uri=' + logout_redirect_url
html = render_to_string('post_callback.html', {'name': id_token['name'], 'access_token': tokens.access_token, 'logout_url': logout_url})
return HttpResponse(html)
class HttpResponseUnauthorized(HttpResponse):
status_code = HTTPStatus.UNAUTHORIZED
def token_required(f):
@wraps(f)
def decorated_function(request, *args, **kwargs):
print(request.META)
tok = request.META.get('HTTP_AUTHORIZATION', '')
if 'Bearer ' not in tok:
return HttpResponseUnauthorized('missing authorization header')
token = tok.replace('Bearer ', '')
nonoce = request.session.get('nonce', '')
try:
access_token = client.decode(token, nonce = nonoce)
request.access_token = access_token
except Exception as e:
return HttpResponseUnauthorized(str(e))
return f(request, *args, **kwargs)
return decorated_function
def scopes_required(scopes):
def wrapd_function(f):
@wraps(f)
def decorated_function(request, *args, **kwargs):
if request.access_token == None:
return HttpResponseUnauthorized('missing token')
if len(set(request.access_token.get('scp', [])).intersection(scopes)) == 0:
return HttpResponseForbidden('missing scopes')
return f(request, *args, **kwargs)
return decorated_function
return wrapd_function
@token_required
@scopes_required(['profile'])
def protected(request):
return HttpResponse('You are authorized')
urlpatterns = [
url(r'^$', index, name='index'),
url(r'^login$', login, name='login'),
url(r'^callback$', handle_callback, name='handle_callback'),
url(r'^protected$', protected, name='protected'),
]