Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log exceptions in channel reader #134

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ Simple consumer
# Declaring queue
declare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
declare_ok.queue, on_message, no_ack=True
declare_ok.queue, on_message, no_ack=True,
)
print("Consumer started")


loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -563,6 +564,8 @@ RPC server
.. code-block:: python

import asyncio
import functools
from contextlib import suppress
import aiormq
import aiormq.abc

Expand All @@ -576,46 +579,55 @@ RPC server
return fib(n-1) + fib(n-2)


async def on_message(message:aiormq.abc.DeliveredMessage):
n = int(message.body.decode())
async def on_message(num, message: aiormq.types.DeliveredMessage):
try:
n = int(message.body.decode())

print(f" [.] fib({n})")
response = str(fib(n)).encode()
print(f" [.] fib({n}) from {num} channel")

await message.channel.basic_publish(
response, routing_key=message.header.properties.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.header.properties.correlation_id
),
fib_n = await asyncio.run_in_executor(None, fib, n)
response = str(fib(n)).encode()

)
await message.channel.basic_publish(
response, routing_key=message.header.properties.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.header.properties.correlation_id
),

await message.channel.basic_ack(message.delivery.delivery_tag)
print('Request complete')
)
except Exception as exc:
print('Message exception:', exc)
finally:
await message.channel.basic_ack(message.delivery.delivery_tag)
print(f'Request from {num} channel complete')


async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")

# Creating a channel
channel = await connection.channel()
for i in range(10):
# Creating a channel
channel = await connection.channel()

# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')
# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')

# Start listening the queue with name 'hello'
await channel.basic_consume(declare_ok.queue, on_message)
# Start listening the queue with name 'hello'
# with several consumers simultaneously
on_message = functools.partial(on_message, i)
await channel.basic_consume(declare_ok.queue, on_message)
print(f'Consumer {i} started')

print(" [x] Awaiting RPC requests")
with suppress(asyncio.CancelledError):
await connection.closing

# waiting for the connections closing to release recources

loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()


RPC client
**********
Expand Down
8 changes: 6 additions & 2 deletions aiormq/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ async def _on_deliver(self, frame: spec.Basic.Deliver) -> None:
consumer = self.consumers.get(frame.consumer_tag)
if consumer is not None:
# noinspection PyAsyncCall
self.create_task(consumer(message))
try:
self.create_task(consumer(message))
except Exception:
log.exception('Unhandled consumer exception')

async def _on_get(
self, frame: Union[spec.Basic.GetOk, spec.Basic.GetEmpty],
Expand Down Expand Up @@ -438,7 +441,7 @@ async def _reader(self) -> None:
except asyncio.CancelledError:
return
except Exception as e: # pragma: nocover
log.debug("Channel reader exception %r", exc_info=e)
log.exception("Channel reader exception")
await self._cancel_tasks(e)
raise

Expand All @@ -453,6 +456,7 @@ async def _on_close(self, exc: Optional[ExceptionType] = None) -> None:
),
timeout=self.connection.connection_tune.heartbeat or None,
)

self.connection.channels.pop(self.number, None)

async def basic_get(
Expand Down
6 changes: 3 additions & 3 deletions aiormq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,16 @@ async def get_frame(self) -> ReceivedFrame:

async with self.lock:
try:
if self.reader is None:
raise ConnectionError

frame_header = await self.reader.readexactly(1)

if frame_header == b"\0x00":
raise AMQPFrameError(
await self.reader.read(),
)

if self.reader is None:
raise ConnectionError

frame_header += await self.reader.readexactly(6)

if not self.started and frame_header.startswith(b"AMQP"):
Expand Down