Skip to content

Commit

Permalink
Change how we use the send pending email task (#4175)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoacierno authored Nov 24, 2024
1 parent 90be065 commit fa14b10
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 125 deletions.
6 changes: 3 additions & 3 deletions backend/notifications/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def send_email(
if not recipient and not recipient_email:
raise ValueError("Either recipient or recipient_email must be provided")

from notifications.tasks import send_pending_emails
from notifications.tasks import send_pending_email

recipient_email = recipient_email or recipient.email

Expand All @@ -207,7 +207,7 @@ def send_email(
or settings.DEFAULT_FROM_EMAIL
)

SentEmail.objects.create(
sent_email = SentEmail.objects.create(
email_template=self,
conference=self.conference,
from_email=from_email,
Expand All @@ -223,7 +223,7 @@ def send_email(
bcc_addresses=self.bcc_addresses,
)

transaction.on_commit(lambda: send_pending_emails.delay())
transaction.on_commit(lambda: send_pending_email.delay(sent_email.id))

@property
def is_custom(self):
Expand Down
80 changes: 42 additions & 38 deletions backend/notifications/tasks.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,63 @@
from django.db import transaction

import logging
from uuid import uuid4
from pycon.celery_utils import OnlyOneAtTimeTask
from notifications.models import SentEmail
from django.db import transaction
from pycon.celery import app
from django.core.mail import EmailMultiAlternatives
from django.core.mail import get_connection

logger = logging.getLogger(__name__)


@app.task(base=OnlyOneAtTimeTask)
def send_pending_emails():
pending_emails = (
SentEmail.objects.pending().order_by("created").values_list("id", flat=True)
def send_pending_email_failed(self, exc, task_id, args, kwargs, einfo):
sent_email_id = args[0]

sent_email = SentEmail.objects.get(id=sent_email_id)
sent_email.mark_as_failed()
logger.error(
"Failed to send email sent_email_id=%s",
sent_email.id,
)
total_pending_emails = pending_emails.count()

if total_pending_emails == 0:
return

logger.info("Found %s pending emails", pending_emails.count())
@app.task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=5,
default_retry_delay=2,
on_failure=send_pending_email_failed,
)
@transaction.atomic()
def send_pending_email(self, sent_email_id: int):
logger.info(
"Sending sent_email=%s (attempt=%s of %s)",
sent_email_id,
self.request.retries,
self.max_retries,
)

email_backend_connection = get_connection()
sent_email = (
SentEmail.objects.select_for_update(skip_locked=True)
.pending()
.filter(id=sent_email_id)
.first()
)

for email_id in pending_emails.iterator():
with transaction.atomic():
sent_email = (
SentEmail.objects.select_for_update(skip_locked=True)
.filter(
id=email_id,
)
.first()
)
if not sent_email:
return

if not sent_email or not sent_email.is_pending:
return
email_backend_connection = get_connection()

try:
message_id = send_email(sent_email, email_backend_connection)
sent_email.mark_as_sent(message_id)
message_id = send_email(sent_email, email_backend_connection)
sent_email.mark_as_sent(message_id)

logger.info(
"Email sent_email_id=%s sent with message_id=%s",
sent_email.id,
message_id,
)
except Exception as e:
sent_email.mark_as_failed()
logger.exception(
"Failed to send email sent_email_id=%s error=%s",
email_id,
e,
exc_info=e,
)
logger.info(
"Email sent_email_id=%s sent with message_id=%s",
sent_email.id,
message_id,
)


def send_email(sent_email, email_backend_connection):
Expand Down
8 changes: 4 additions & 4 deletions backend/notifications/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def test_send_email_template_to_recipient_email(
reply_to="replyto@example.com",
)

mock_send_pending_emails = mocker.patch(
"notifications.tasks.send_pending_emails.delay"
mock_send_pending_email = mocker.patch(
"notifications.tasks.send_pending_email.delay"
)

with django_capture_on_commit_callbacks(execute=True):
Expand All @@ -64,12 +64,12 @@ def test_send_email_template_to_recipient_email(
},
)

mock_send_pending_emails.assert_called_once()

sent_email = SentEmail.objects.get(
email_template=email_template,
)

mock_send_pending_email.assert_called_once_with(sent_email.id)

assert sent_email.recipient is None
assert sent_email.recipient_email == "example@example.com"

Expand Down
105 changes: 31 additions & 74 deletions backend/notifications/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import smtplib
from unittest.mock import patch
from uuid import uuid4
import time_machine
from django.core import mail
from notifications.tasks import send_pending_emails
from notifications.tasks import send_pending_email, send_pending_email_failed
from notifications.models import SentEmail
from notifications.tests.factories import SentEmailFactory


def test_send_pending_emails_does_nothing_with_no_pending_emails():
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
def test_send_pending_email_does_nothing_with_non_pending_email():
sent_email = SentEmailFactory(
status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z"
)

send_pending_emails()
send_pending_email(sent_email.id)

assert len(mail.outbox) == 0


def test_send_pending_emails_task_sends_data_correctly():
def test_send_pending_email_task_sends_data_correctly():
pending_email_1 = SentEmailFactory(
status=SentEmail.Status.pending,
reply_to="reply@example.com",
Expand All @@ -29,7 +30,8 @@ def test_send_pending_emails_task_sends_data_correctly():
subject="subject",
)

send_pending_emails()
with time_machine.travel("2021-01-01 12:00Z", tick=False):
send_pending_email(pending_email_1.id)

assert len(mail.outbox) == 1

Expand All @@ -42,8 +44,16 @@ def test_send_pending_emails_task_sends_data_correctly():
assert mail.outbox[0].alternatives == [("html body", "text/html")]
assert mail.outbox[0].subject == "subject"

pending_email_1.refresh_from_db()

assert len(mail.outbox) == 1

assert pending_email_1.status == SentEmail.Status.sent
assert pending_email_1.message_id.startswith("local-")
assert pending_email_1.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"


def test_send_pending_emails_task_doesnt_double_send():
def test_send_pending_email_task_doesnt_double_send():
pending_email_1 = SentEmailFactory(status=SentEmail.Status.pending)
original_qs = SentEmail.objects.select_for_update(skip_locked=True).filter(
id=pending_email_1.id
Expand All @@ -57,81 +67,28 @@ def side_effect(*args, **kwargs):
"notifications.tasks.SentEmail.objects.select_for_update",
side_effect=side_effect,
):
send_pending_emails()
send_pending_email(pending_email_1.id)

pending_email_1.refresh_from_db()

assert len(mail.outbox) == 0


def test_send_pending_emails_task():
pending_email_1 = SentEmailFactory(status=SentEmail.Status.pending)
pending_email_2 = SentEmailFactory(status=SentEmail.Status.pending)
sent_email_1 = SentEmailFactory(
status=SentEmail.Status.sent, message_id="abc-abc", sent_at="2020-01-01 12:00Z"
)

with time_machine.travel("2021-01-01 12:00Z", tick=False):
send_pending_emails()

pending_email_1.refresh_from_db()
pending_email_2.refresh_from_db()
sent_email_1.refresh_from_db()

assert len(mail.outbox) == 2

assert mail.outbox[0].to == [pending_email_1.recipient_email]
assert mail.outbox[1].to == [pending_email_2.recipient_email]

assert pending_email_1.status == SentEmail.Status.sent
assert pending_email_1.message_id.startswith("local-")
assert pending_email_1.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"

assert pending_email_2.status == SentEmail.Status.sent
assert pending_email_2.message_id.startswith("local-")
assert pending_email_2.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"

assert sent_email_1.status == SentEmail.Status.sent
assert sent_email_1.message_id == "abc-abc"
assert sent_email_1.sent_at.isoformat() == "2020-01-01T12:00:00+00:00"


def test_send_pending_emails_handles_failures(mocker):
def test_send_pending_email_failure():
pending_email_1 = SentEmailFactory(
status=SentEmail.Status.pending, created="2020-01-01 12:00Z"
)
pending_email_2 = SentEmailFactory(
status=SentEmail.Status.pending, created="2020-01-02 12:00Z"
)

original_method = SentEmail.mark_as_sent

def _side_effect(*args, **kwargs):
if _side_effect.counter == 0:
_side_effect.counter = 1
raise ValueError("test")

return original_method(pending_email_2, *args, **kwargs)

_side_effect.counter = 0

mocker.patch("notifications.tasks.SentEmail.mark_as_sent", side_effect=_side_effect)

with time_machine.travel("2021-01-01 12:00Z", tick=False):
send_pending_emails()
send_pending_email_failed(
None,
smtplib.SMTPException("test"),
uuid4().hex,
(pending_email_1.id,),
{},
None,
)

pending_email_1.refresh_from_db()
pending_email_2.refresh_from_db()

assert len(mail.outbox) == 2

assert mail.outbox[0].to == [pending_email_1.recipient_email]
assert mail.outbox[1].to == [pending_email_2.recipient_email]

assert len(mail.outbox) == 0
assert pending_email_1.status == SentEmail.Status.failed
assert pending_email_1.message_id == ""
assert pending_email_1.sent_at is None

assert pending_email_2.status == SentEmail.Status.sent
assert pending_email_2.message_id.startswith("local-")
assert pending_email_2.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"
6 changes: 0 additions & 6 deletions backend/pycon/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def setup_periodic_tasks(sender, **kwargs):
from schedule.tasks import process_schedule_items_videos_to_upload
from files_upload.tasks import delete_unused_files
from pycon.tasks import check_for_idle_heavy_processing_workers
from notifications.tasks import send_pending_emails

add = sender.add_periodic_task

Expand All @@ -48,8 +47,3 @@ def setup_periodic_tasks(sender, **kwargs):
check_for_idle_heavy_processing_workers,
name="Check for idle heavy processing workers",
)
add(
timedelta(minutes=1),
send_pending_emails,
name="Send pending emails",
)

0 comments on commit fa14b10

Please sign in to comment.