This repository was archived by the owner on Jan 30, 2021. It is now read-only.
forked from sashka/flask-googleauth
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflask_googleauth.py
339 lines (283 loc) · 13 KB
/
flask_googleauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
Implementation of OpenID authentication schema.
No discovery is supported in order to keep the code simple.
This is a partial port of tornado.auth to be used with Flask.
Example usage for Google Federated Login:
from flask import Flask
from flask_googleauth import GoogleFederated
# Setup Flask
app = Flask(__name__)
app.secret_key = "random secret key"
# Setup Google Auth
auth = GoogleFederated("mokote.com", app)
@app.route("/")
@auth.required
def secret():
return "ssssshhhhh (c) kennethreitz"
"""
import functools
import logging
import urllib
import urlparse
import blinker
import requests
from flask import Blueprint, request, session, redirect, url_for, abort, g, current_app
signals = blinker.Namespace()
login = signals.signal("login")
logout = signals.signal("logout")
login_error = signals.signal("login-error")
class ObjectDict(dict):
"""Makes a dictionary behave like an object."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self[name] = value
class OpenIdMixin(object):
"""
Abstract implementation of OpenID and Attribute Exchange.
The primary methods are authenticate_redirect(), and get_authenticated_user().
The former should be called to redirect the user to, e.g., the OpenID
authentication page on the third party service, and the latter should
be called upon return to get the user data from the data returned by
the third party service.
See GoogleAuth below for example implementations.
"""
def authenticate_redirect(self, callback_uri=None, ask_for=["name", "email", "language", "username"]):
"""
Performs a redirect to the authentication URL for this service.
After authentication, the service will redirect back to the given
callback URI.
We request the given attributes for the authenticated user by
default (name, email, language, and username). If you don't need
all those attributes for your app, you can request fewer with
the |ask_for| keyword argument.
"""
callback_uri = callback_uri or request.url
args = self._openid_args(callback_uri, ax_attrs=ask_for)
return redirect(self._OPENID_ENDPOINT + ("&" if "?" in self._OPENID_ENDPOINT else "?") + urllib.urlencode(args))
def get_authenticated_user(self, callback):
"""Fetches the authenticated user data upon redirect.
This method should be called by the handler that receives the
redirect from the authenticate_redirect() or authorize_redirect()
methods.
"""
# Verify the OpenID response via direct request to the OP
args = dict((k, v) for k, v in request.args.items())
args["openid.mode"] = u"check_authentication"
r = requests.post(self._OPENID_ENDPOINT, data=args)
return self._on_authentication_verified(callback, r)
def _openid_args(self, callback_uri, ax_attrs=[]):
url = urlparse.urljoin(request.url, callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.return_to": url,
"openid.realm": urlparse.urljoin(url, "/"),
"openid.mode": "checkid_setup",
}
if ax_attrs:
args.update({
"openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request",
})
ax_attrs = set(ax_attrs)
required = []
if "name" in ax_attrs:
ax_attrs -= set(["name", "firstname", "fullname", "lastname"])
required += ["firstname", "fullname", "lastname"]
args.update({
"openid.ax.type.firstname": "http://axschema.org/namePerson/first",
"openid.ax.type.fullname": "http://axschema.org/namePerson",
"openid.ax.type.lastname": "http://axschema.org/namePerson/last",
})
known_attrs = {
"email": "http://axschema.org/contact/email",
"language": "http://axschema.org/pref/language",
"username": "http://axschema.org/namePerson/friendly",
}
for name in ax_attrs:
args["openid.ax.type." + name] = known_attrs[name]
required.append(name)
args["openid.ax.required"] = ",".join(required)
return args
def _on_authentication_verified(self, callback, response):
ok = response.status_code == requests.codes.ok
if not ok or "is_valid:true" not in response.content:
logging.warning("Invalid OpenID response: %s", response.content)
return callback(None)
# Make sure we got back at least an email from attribute exchange
ax_ns = None
for name in request.args:
if name.startswith("openid.ns.") and request.args.get(name) == u"http://openid.net/srv/ax/1.0":
ax_ns = name[10:]
break
def get_ax_arg(uri):
if not ax_ns:
return u""
prefix = "openid.%s.type." % ax_ns
ax_name = None
for name in request.args:
if request.args.get(name) == uri and name.startswith(prefix):
part = name[len(prefix):]
ax_name = "openid.%s.value.%s" % (ax_ns, part)
break
if not ax_name:
return u""
return request.args.get(ax_name, u"")
email = get_ax_arg("http://axschema.org/contact/email")
name = get_ax_arg("http://axschema.org/namePerson")
first_name = get_ax_arg("http://axschema.org/namePerson/first")
last_name = get_ax_arg("http://axschema.org/namePerson/last")
username = get_ax_arg("http://axschema.org/namePerson/friendly")
locale = get_ax_arg("http://axschema.org/pref/language").lower()
identity = request.args.get("openid.claimed_id", u"")
user = ObjectDict()
name_parts = []
if first_name:
user["first_name"] = first_name
name_parts.append(first_name)
if last_name:
user["last_name"] = last_name
name_parts.append(last_name)
if name:
user["name"] = name
elif name_parts:
user["name"] = u" ".join(name_parts)
elif email:
user["name"] = email.split("@")[0]
if email:
user["email"] = email
if locale:
user["locale"] = locale
if username:
user["username"] = username
if identity:
user["identity"] = identity
return callback(user)
class GoogleAuth(OpenIdMixin):
"""
Google OpenID authentication.
Sign-in and sign-out links will be registered automatically.
No application registration is necessary to use Google for authentication
or to access Google resources on behalf of a user. To authenticate with
Google, redirect with authenticate_redirect(). On return, parse the
response with get_authenticated_user(). We send a dict containing the
values for the user, including 'email', 'name', 'locale', and 'identity'.
You can invalidate old sessions by adjusting the 'cookie_name' parameter.
By default, authentication is not required on any endpoints. You can opt-in
specific endpoints by using the decorator @auth.required.
You can require auth on all endpoints, by setting the parameter 'force'
to True. However, there's no way to bypass this setting -- every page in
the entire app will require login.
You can use the 'authorized_users' parameter to specify a list of users who
are authorized to use this application. The list should contain email
addresses or OpenID identity values for the authorized users. If
'authorized_users' is not specified, all Google Accounts for the given
domain will be able to use the application.
You can also specify a list of access tokens in the 'access_tokens'
parameter. If session-based auth fails, we check for an Authorization
header matching this pattern: "Authorization: token <TOKEN-VALUE>".
"""
_OPENID_ENDPOINT = "https://www.google.com/accounts/o8/ud"
def __init__(self, app=None, url_prefix=None, name="GoogleAuth", cookie_name="openid", force=False, access_tokens=[], authorized_users=None):
self.app = app
self.url_prefix = url_prefix
self.name = name
self.cookie_name = cookie_name
self.force = force
self.auth_not_required = []
self.access_tokens = access_tokens
self.authorized_users = authorized_users
if app:
self.init_app(app, url_prefix, name)
def init_app(self, app, url_prefix=None, name=None):
url_prefix = url_prefix or self.url_prefix
name = name or self.name
blueprint = Blueprint(name, __name__, url_prefix=url_prefix)
blueprint.add_url_rule("/login/", "login", self._login, methods=["GET", "POST"])
blueprint.add_url_rule("/logout/", "logout", self._logout, methods=["GET", "POST"])
blueprint.add_url_rule("/auth/", "auth", self._auth, methods=["GET"])
self.auth_not_required.append("%s.login" % blueprint.name)
self.auth_not_required.append("%s.logout" % blueprint.name)
self.auth_not_required.append("%s.auth" % blueprint.name)
app.register_blueprint(blueprint)
app.before_request(self._before_request)
app.extensions['googleauth'] = ObjectDict(blueprint=blueprint)
def _before_request(self):
if self.force and request.endpoint not in self.auth_not_required and not self._check_auth():
blueprint = current_app.extensions['googleauth'].blueprint
return redirect(url_for("%s.login" % blueprint.name, next=request.url))
g.user = None
if self.cookie_name in session:
g.user = session[self.cookie_name]
def _auth(self):
if self._check_auth():
return "OK", 200
else:
return "Unauthorized", 401
def _login(self):
if request.args.get("openid.mode", None):
# After OpenID response:
return self.get_authenticated_user(self._on_auth)
return self.authenticate_redirect()
def _on_auth(self, user):
"""
This is called when login with OpenID succeeded and it's not
necessary to figure out if this is the users's first login or not.
"""
app = current_app._get_current_object()
if not user:
# Google auth failed.
login_error.send(app, user=None)
abort(403)
session[self.cookie_name] = user
login.send(app, user=user)
return redirect(request.args.get("next", None) or request.referrer or "/")
def _logout(self):
user = session.pop(self.cookie_name, None)
app = current_app._get_current_object()
logout.send(app, user=user)
return redirect(request.args.get("next", None) or "/")
def _check_auth(self):
if self.cookie_name in session:
return self._is_authorized(session[self.cookie_name])
if "Authorization" in request.headers:
v = request.headers['Authorization']
if v.startswith("token "):
token = v[6:]
if token in self.access_tokens:
return True
return False
def _is_authorized(self, user):
"""Determines if the given user is a valid user for this application."""
if self.authorized_users is None:
return True
elif 'email' in user and user['email'] in self.authorized_users:
return True
elif 'identity' in user and user['identity'] in self.authorized_users:
return True
else:
return False
def required(self, fn):
"""Request decorator. Forces authentication."""
if self.force:
# Auth is required on all pages, so this decorator can be ignored.
return
@functools.wraps(fn)
def decorated(*args, **kwargs):
if not self._check_auth():
blueprint = current_app.extensions['googleauth'].blueprint
return redirect(url_for("%s.login" % blueprint.name, next=request.url))
return fn(*args, **kwargs)
return decorated
class GoogleFederated(GoogleAuth):
"""
Super simple Google Federated Auth for a given domain.
"""
def __init__(self, domain, app=None, url_prefix=None, name='GoogleAuth', cookie_name="openid", force=False, access_tokens=[], authorized_users=None):
self._OPENID_ENDPOINT = "https://www.google.com/a/%s/o8/ud?be=o8" % domain
super(GoogleFederated, self).__init__(app, url_prefix, name, cookie_name, force, access_tokens, authorized_users)