-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmailer.py
84 lines (63 loc) · 2.79 KB
/
mailer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import email.message
import email.policy
import logging
import mimetypes
import smtplib
import subprocess
class Mailer(object):
def __init__(self, host, port=None, authmethod=None, user=None, pwd=None, oauthcmd=None):
if authmethod is None:
if pwd and oauthcmd:
raise ValueError("SMTP password and oauthcmd provided")
if user and not oauthcmd:
authmethod = "login"
elif not pwd and oauthcmd:
authmethod = "oauth"
self._host = host
self._port = port
self._auth = authmethod
self._user = user
self._pass = pwd
self._oauthcmd = oauthcmd.lower()
def _oauthcb(self, x=None):
if x is not None:
return ""
logging.debug("Running OAuth token generation command: %s", self._oauthcmd)
token = subprocess.check_output(self._oauthcmd, shell=True)
token = token.decode().strip()
logging.debug("Got token: %s", token)
auth_string = "user=%s\1auth=Bearer %s\1\1" % (self._user, token)
return auth_string
def message(self, to, subj, msg, attachments=None):
if attachments is None:
attachments = []
policy = email.policy.EmailPolicy(raise_on_defect=True, linesep="\r\n", utf8=True)
mail = email.message.EmailMessage(policy=policy)
mail['Subject'] = "[BOT Paul Emploi] %s" % subj
mail['From'] = "Bot Paul-Emploi <%s>" % self._user
mail['To'] = "Chômeur <%s>" % to
mail.set_content(msg, disposition='inline')
for name, content in attachments:
mime, encoding = mimetypes.guess_type(name)
if mime is None or encoding is not None:
mime = "application/octet-stream"
maintype, subtype = mime.split("/")
mail.add_attachment(content, maintype=maintype, subtype=subtype, filename=name)
logging.debug("Connecting to SMTP server %s:%r", self._host, self._port)
smtp = smtplib.SMTP_SSL(self._host, port=self._port)
if not self._auth:
logging.info("No SMTP authentication method provided")
elif self._auth == "login":
logging.debug("Login to SMTP server with username: %s", self._user)
smtp.login(self._user, self._pass)
elif self._auth == "oauth":
logging.debug("OAuth to SMTP server with username: %s", self._user)
smtp.ehlo_or_helo_if_needed()
smtp.auth("XOAUTH2", self._oauthcb)
else:
raise ValueError("Unknown SMTP authentication method " + self._auth)
logging.debug("Sending message of %d bytes", len(mail.as_bytes()))
smtp.send_message(mail)
smtp.quit()
def error(self, to, *args, **kwargs):
self.message(to, "Error", *args, **kwargs)