Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic.Cancel sent from the broker is unactionable on user end #116

Open
hpointu opened this issue Sep 3, 2021 · 1 comment
Open

Basic.Cancel sent from the broker is unactionable on user end #116

hpointu opened this issue Sep 3, 2021 · 1 comment

Comments

@hpointu
Copy link

hpointu commented Sep 3, 2021

When the server sends a Basic.Cancel notification, aiormq simply removes the consumer tag from its consumer list.

This leaves the end-user of the lib with no way of knowing it'll not receive messages any more, although the server explicitly said so.

Shouldn't we expose this notification to the end user? That way the end user can react to this and stop listening.

@martin-schulze-e2m
Copy link

martin-schulze-e2m commented Aug 28, 2023

Just fell into this trap. My code follows pretty much aio-pika's asynchronous consumer example. When it gets a Basic.Cancel frame for its consume queue, it stops receiving messages without any warning. Ideally, I'd like to restart the service but have no way of telling when this happens.

Maybe we can simply forward the "message" to the consume callback by changing

spec.Basic.Cancel: (False, self._on_cancel_frame),
to should_add_to_rpc=True? However, that will likely be a breaking change.

I could also see an optional, specific on_cancelled callback that can be provided to basic_consume.

Update

For now I work around that with async polling on the channel's consumer list:

    async with connection:
        channel: aio_pika.abc.AbstractChannel = await connection.channel()
        queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(config.rmq.queue)
        consumer_tag = await queue.consume(process_message, timeout=5)
        async def wait_for_cancel(polling_interval=5):
            """
            Check every polling_interval seconds if our consumer is still scheduled for receiving messages.

            Work around https://github.com/mosquito/aiormq/issues/116
            """
            if isinstance(channel.channel, aiormq.Channel):
                while True:
                    await asyncio.sleep(polling_interval)
                    if consumer_tag not in channel.channel.consumers:
                        logger.warning(
                            f"Consumer tag {consumer_tag} not found in channel.consumers."
                            f" Probably was cancelled due to queue deletion."
                        )
                        break
            else:
                raise ValueError(
                    f"Can only work with aiormq.Channel channels, not {channel.channel.__class__}!"
                )

        try:
            # wait until cancelled from outside
            await wait_for_cancel()
        finally:
            logger.info("queue.consume() stopping")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants