Skip to content

Commit

Permalink
refactor(account): Email verification login & stage
Browse files Browse the repository at this point in the history
  • Loading branch information
pennersr committed Aug 23, 2024
1 parent dc59356 commit 560cb24
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 113 deletions.
6 changes: 0 additions & 6 deletions allauth/account/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ def unstash_verified_email(self, request):
request.session["account_verified_email"] = None
return ret

def stash_user(self, request, user):
request.session["account_user"] = user

def unstash_user(self, request):
return request.session.pop("account_user", None)

def is_email_verified(self, request, email):
"""
Checks whether or not the email address is already verified
Expand Down
53 changes: 52 additions & 1 deletion allauth/account/internal/flows/email_verification.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional, Tuple

from django.contrib import messages
from django.http import HttpRequest
from django.http import HttpRequest, HttpResponse
from django.urls import reverse

from allauth.account import app_settings, signals
Expand All @@ -20,6 +22,16 @@ def verify_email_indirectly(request: HttpRequest, user, email: str) -> bool:
return True


def verify_email_and_resume(
request: HttpRequest, verification
) -> Tuple[Optional[EmailAddress], Optional[HttpResponse]]:
email_address = verification.confirm(request)
if not email_address:
return None, None
response = login_on_verification(request, verification)
return email_address, response


def verify_email(request: HttpRequest, email_address: EmailAddress) -> bool:
"""
Marks the email address as confirmed on the db
Expand Down Expand Up @@ -75,3 +87,42 @@ def get_email_verification_url(request: HttpRequest, emailconfirmation) -> str:
url = reverse("account_confirm_email", args=[emailconfirmation.key])
url = build_absolute_uri(request, url)
return url


def login_on_verification(request, verification) -> Optional[HttpResponse]:
"""Simply logging in the user may become a security issue. If you
do not take proper care (e.g. don't purge used email
confirmations), a malicious person that got hold of the link
will be able to login over and over again and the user is
unable to do anything about it. Even restoring their own mailbox
security will not help, as the links will still work. For
password reset this is different, this mechanism works only as
long as the attacker has access to the mailbox. If they no
longer has access they cannot issue a password request and
intercept it. Furthermore, all places where the links are
listed (log files, but even Google Analytics) all of a sudden
need to be secured. Purging the email confirmation once
confirmed changes the behavior -- users will not be able to
repeatedly confirm (in case they forgot that they already
clicked the mail).
All in all, we only login on verification when the user that is in the
process of signing up is present in the session to avoid all of the above.
This may not 100% work in case the user closes the browser (and the session
gets lost), but at least we're secure.
"""
from allauth.account.stages import (
EmailVerificationStage,
LoginStageController,
)

if not app_settings.LOGIN_ON_EMAIL_CONFIRMATION:
return None
if request.user.is_authenticated:
return None
stage = LoginStageController.enter(request, EmailVerificationStage.key)
if not stage or not stage.login.user:
return None
if stage.login.user.pk != verification.email_address.user_id:
return None
return stage.exit()
76 changes: 26 additions & 50 deletions allauth/account/tests/test_email_verification.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from datetime import timedelta
from unittest.mock import Mock, patch
from unittest.mock import Mock

from django.conf import settings
from django.contrib.auth import SESSION_KEY, get_user_model
from django.core.cache import cache
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.utils.timezone import now

Expand All @@ -22,9 +21,6 @@
EmailConfirmationHMAC,
)
from allauth.account.signals import user_logged_in
from allauth.account.utils import user_pk_to_url_str

from .test_models import UUIDUser


@pytest.mark.parametrize(
Expand All @@ -34,21 +30,31 @@
("?next=/foo", "/foo"),
],
)
def test_login_on_confirm(user_factory, client, query, expected_location):
def test_login_on_verification(
client, db, query, expected_location, password_factory, settings
):
settings.ACCOUNT_EMAIL_VERIFICATION = app_settings.EmailVerificationMethod.MANDATORY
settings.ACCOUNT_EMAIL_CONFIRMATION_HMAC = True
settings.ACCOUNT_LOGIN_ON_EMAIL_CONFIRMATION = True
user = user_factory(email_verified=False)
email = EmailAddress.objects.get_for_user(user, user.email)
password = password_factory()
resp = client.post(
reverse("account_signup"),
data={
"username": "john",
"email": "a@a.com",
"password1": password,
"password2": password,
},
)
assert resp.status_code == 302
assert resp["location"] == reverse("account_email_verification_sent")

email = EmailAddress.objects.get(email="a@a.com")
key = EmailConfirmationHMAC(email).key

receiver_mock = Mock() # we've logged if signal was called
user_logged_in.connect(receiver_mock)

# fake post-signup account_user stash
session = client.session
session["account_user"] = user_pk_to_url_str(user)
session.save()

resp = client.post(reverse("account_confirm_email", args=[key]) + query)
assert resp["location"] == expected_location
email = EmailAddress.objects.get(pk=email.pk)
Expand All @@ -58,42 +64,13 @@ def test_login_on_confirm(user_factory, client, query, expected_location):
sender=get_user_model(),
request=resp.wsgi_request,
response=resp,
user=user,
user=email.user,
signal=user_logged_in,
)

user_logged_in.disconnect(receiver_mock)


@patch("allauth.account.views.perform_login")
@patch("allauth.account.utils.get_user_model", return_value=UUIDUser)
def test_login_on_confirm_uuid_user(
mocked_gum, mock_perform_login, settings, client, db
):
settings.ACCOUNT_EMAIL_CONFIRMATION_HMAC = True
settings.ACCOUNT_LOGIN_ON_EMAIL_CONFIRMATION = True

user = UUIDUser(is_active=True, email="john@example.com", username="john")

# fake post-signup account_user stash
session = client.session
session["account_user"] = user_pk_to_url_str(user)
session.save()

# fake email and email confirmation to avoid swappable model hell
email = Mock(verified=False, user=user)
key = "mockkey"
confirmation = Mock(autospec=EmailConfirmationHMAC, key=key)
confirmation.email_address = email
confirmation.from_key.return_value = confirmation
mock_perform_login.return_value = HttpResponseRedirect(redirect_to="/")

with patch("allauth.account.models.EmailConfirmationHMAC", confirmation):
client.post(reverse("account_confirm_email", args=[key]))

assert mock_perform_login.called


def test_email_verification_failed(settings, user_factory, client):
settings.ACCOUNT_EMAIL_CONFIRMATION_HMAC = False
user_factory(email_verified=True, email="foo@bar.org")
Expand Down Expand Up @@ -216,7 +193,7 @@ def test_optional_email_verification(settings, client, db, mailoutbox):
assert len(mailoutbox) == 1


def test_email_confirmation_hmac(settings, client, user_factory, mailoutbox, rf):
def test_email_verification_hmac(settings, client, user_factory, mailoutbox, rf):
settings.ACCOUNT_EMAIL_CONFIRMATION_HMAC = True
user = user_factory(email_verified=False)
email = EmailAddress.objects.get_for_user(user, user.email)
Expand All @@ -229,7 +206,7 @@ def test_email_confirmation_hmac(settings, client, user_factory, mailoutbox, rf)
assert email.verified


def test_email_confirmation_hmac_timeout(
def test_email_verification_hmac_timeout(
settings, user_factory, client, mailoutbox, rf
):
settings.ACCOUNT_EMAIL_CONFIRMATION_HMAC = True
Expand All @@ -245,10 +222,10 @@ def test_email_confirmation_hmac_timeout(
assert not email.verified


def test_confirm_email_with_another_user_logged_in(
def test_verify_email_with_another_user_logged_in(
settings, user_factory, client, mailoutbox
):
"""Test the email confirmation view. If User B clicks on an email
"""Test the email verification view. If User B clicks on an email
verification link while logged in as User A, ensure User A gets
logged out."""
settings.ACCOUNT_AUTHENTICATION_METHOD = app_settings.AuthenticationMethod.EMAIL
Expand All @@ -258,7 +235,6 @@ def test_confirm_email_with_another_user_logged_in(
assert len(mailoutbox) == 1
assert mailoutbox[0].to == [user.email]
client.logout()

body = mailoutbox[0].body
assert body.find("http://") > 0

Expand All @@ -281,7 +257,7 @@ def test_confirm_email_with_another_user_logged_in(
assertRedirects(resp, settings.LOGIN_URL, fetch_redirect_response=False)


def test_confirm_email_with_same_user_logged_in(
def test_verify_email_with_same_user_logged_in(
settings, user_factory, client, mailoutbox
):
"""If the user clicks on an email verification link while logged in, ensure
Expand All @@ -308,7 +284,7 @@ def test_confirm_email_with_same_user_logged_in(
assert user == resp.wsgi_request.user


def test_confirm_logs_out_user(auth_client, settings, user, user_factory):
def test_verify_logs_out_user(auth_client, settings, user, user_factory):
"""
When a user is signed in, and you follow an email confirmation link of
another user within the same browser/session, be sure to sign out the signed
Expand Down
2 changes: 0 additions & 2 deletions allauth/account/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,6 @@ def send_email_confirmation(request, user, signup=False, email=None):
"account/messages/email_confirmation_sent.txt",
{"email": email, "login": not signup, "signup": signup},
)
if signup:
adapter.stash_user(request, user_pk_to_url_str(user))
return sent


Expand Down
60 changes: 6 additions & 54 deletions allauth/account/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
perform_login,
send_email_confirmation,
sync_user_email_addresses,
url_str_to_user_pk,
user_display,
)
from allauth.core import ratelimit
Expand Down Expand Up @@ -242,23 +241,16 @@ def logout_other_user(self, confirmation):
self.logout()

def post(self, *args, **kwargs):
self.object = confirmation = self.get_object()
email_address = confirmation.confirm(self.request)
self.object = verification = self.get_object()
email_address, response = flows.email_verification.verify_email_and_resume(
self.request, verification
)
if response:
return response
if not email_address:
return self.respond(False)

self.logout_other_user(self.object)

if app_settings.LOGIN_ON_EMAIL_CONFIRMATION:
resp = self.login_on_confirm(confirmation)
if resp is not None:
return resp
# Don't -- allauth doesn't touch is_active so that sys admin can
# use it to block users et al
#
# user = confirmation.email_address.user
# user.is_active = True
# user.save()
return self.respond(True)

def respond(self, success):
Expand All @@ -268,46 +260,6 @@ def respond(self, success):
return self.render_to_response(ctx)
return redirect(redirect_url)

def login_on_confirm(self, confirmation):
"""
Simply logging in the user may become a security issue. If you
do not take proper care (e.g. don't purge used email
confirmations), a malicious person that got hold of the link
will be able to login over and over again and the user is
unable to do anything about it. Even restoring their own mailbox
security will not help, as the links will still work. For
password reset this is different, this mechanism works only as
long as the attacker has access to the mailbox. If they no
longer has access they cannot issue a password request and
intercept it. Furthermore, all places where the links are
listed (log files, but even Google Analytics) all of a sudden
need to be secured. Purging the email confirmation once
confirmed changes the behavior -- users will not be able to
repeatedly confirm (in case they forgot that they already
clicked the mail).
All in all, opted for storing the user that is in the process
of signing up in the session to avoid all of the above. This
may not 100% work in case the user closes the browser (and the
session gets lost), but at least we're secure.
"""
user_pk = None
user_pk_str = get_adapter(self.request).unstash_user(self.request)
if user_pk_str:
user_pk = url_str_to_user_pk(user_pk_str)
user = confirmation.email_address.user
if user_pk == user.pk and self.request.user.is_anonymous:
return perform_login(
self.request,
user,
email_verification=app_settings.EmailVerificationMethod.NONE,
# passed as callable, as this method
# depends on the authenticated state
redirect_url=self.get_redirect_url,
)

return None

def get_object(self, queryset=None):
key = self.kwargs["key"]
model = get_emailconfirmation_model()
Expand Down

0 comments on commit 560cb24

Please sign in to comment.