-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks_control.py
41 lines (34 loc) · 1.44 KB
/
tasks_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""Defines the tasks and jobs triggered for the control aspects of the application."""
from smtplib import SMTPAuthenticationError, SMTPServerDisconnected
from typing import Any
from flask_mail import Message
from app import app, celery, mail
@celery.task
def send_mail(
subject: str,
recipients: list[str],
text_body: str,
sender: str = None,
attachments: Any = None,
) -> None:
"""Sends a mail asynchronously with a celery job.
Args:
subject: mail subjects
recipients: list of email of recipients
text_body: the body content of the sent mail
sender: defines a sender if this one is different from app.config['MAIL_DEFAULT_SENDER']
attachments: file to be sent as attachment in mail"""
with app.app_context():
# requires to recall app context as celery job is not
# necessarily aware and we need flask-mail module and context
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body + "\nKind regards,\nSecret Race Strolling Team"
if attachments:
for attachment in attachments:
msg.attach(*attachment)
try:
mail.send(msg)
except SMTPServerDisconnected:
print("Please, verify your connection parameters for mail support", flush=True)
except SMTPAuthenticationError:
print("Please, verify your connection parameters for mail support", flush=True)