Skip to content
This repository was archived by the owner on Mar 27, 2025. It is now read-only.

Commit bde9d85

Browse files
committed
wip queue
1 parent 56b36ae commit bde9d85

File tree

2 files changed

+27
-20
lines changed

2 files changed

+27
-20
lines changed

salmon/queue.py

+8
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,11 @@ def oversize(self, key):
188188
return os.path.getsize(file_name) > self.pop_limit, file_name
189189
else:
190190
return False, None
191+
192+
193+
class QueueWithMetadata(Queue):
194+
"""Just like Queue, except it stores envolope data"""
195+
def push(self, message, metadata):
196+
def pop(self):
197+
key, message = super().pop()
198+
return key, message, metadata

salmon/server.py

+19-20
Original file line numberDiff line numberDiff line change
@@ -292,40 +292,37 @@ def process_message(self, Peer, From, To, Data, **kwargs):
292292
return _deliver(self, Peer, From, To, Data, **kwargs)
293293

294294

295-
class SMTPOnlyOneRcpt(SMTP):
296-
async def smtp_RCPT(self, arg):
297-
if self.envelope.rcpt_tos:
298-
await self.push(SMTP_MULTIPLE_RCPTS_ERROR)
299-
else:
300-
await super().smtp_RCPT(arg)
301-
302-
303295
class SMTPHandler:
304296
def __init__(self, executor=None):
305297
self.executor = executor
306298

307299
async def handle_DATA(self, server, session, envelope):
308-
status = await server.loop.run_in_executor(self.executor, partial(
309-
_deliver,
310-
self,
311-
session.peer,
312-
envelope.mail_from,
313-
envelope.rcpt_tos[0],
314-
envelope.content,
315-
))
316-
return status or "250 Ok"
300+
try:
301+
await server.loop.run_in_executor(self.executor, partial())
302+
server.push_to_queue(session, envelope)
303+
status = "250 Ok"
304+
except Exception:
305+
# TODO log exception
306+
status = "550 Server error"
307+
return status
317308

318309

319310
class AsyncSMTPReceiver(Controller):
320311
"""Receives emails and hands it to the Router for further processing."""
321-
def __init__(self, handler=None, **kwargs):
312+
def __init__(self, handler=None, queue=None, **kwargs):
322313
if handler is None:
323314
handler = SMTPHandler()
315+
if queue is None:
316+
queue = QueueReceiver(IN_QUEUE)
317+
self.queue = queue
324318
super().__init__(handler, **kwargs)
325319

326320
def factory(self):
327-
# TODO implement a queue
328-
return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)
321+
return SMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)
322+
323+
def stop(self):
324+
super().stop()
325+
self.queue.stop()
329326

330327

331328
class LMTPHandler:
@@ -430,7 +427,9 @@ def start(self, one_shot=False):
430427

431428
logging.debug("Pulled message with key: %r off", key)
432429
self.workers.apply_async(self.process_message, args=(msg,))
430+
self.stop()
433431

432+
def stop(self):
434433
self.workers.close()
435434
self.workers.join()
436435

0 commit comments

Comments
 (0)