Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No way to know if a consumer has been cancelled server-side #187

Open
abompard opened this issue Sep 25, 2023 · 2 comments
Open

No way to know if a consumer has been cancelled server-side #187

abompard opened this issue Sep 25, 2023 · 2 comments
Labels
enhancement New feature or request need investigation

Comments

@abompard
Copy link

Hi! I'm using aio-pika and I'd like my consumer to stop and restart if it has been cancelled server-side, for exemple if the queue has been deleted on the server.
If I understand correctly, this is not currently possible because aiormq will silently drop the consumer when that happens:

async def _on_cancel_frame(

Is there another way to do it? Could aiormq cancel the task/future that the consumer is linked to?
Thanks.

@mosquito mosquito added enhancement New feature or request need investigation labels Sep 25, 2023
@vlmihnevich
Copy link

vlmihnevich commented Oct 26, 2023

@mosquito Hi, it seems like we have encountered the same problem in our applications. Sometimes, our RabbitMQ cluster with 3 nodes sends a CancelOK message, which indicates that a RabbitMQ replica node has moved to another machine in Kubernetes. As a result, we lose consumers and our application stops consuming queues. In order to resolve this issue, we have to manually restart the application.

Here a simple test for emulating problem

import asyncio
import os
import uuid

import aiormq
import pytest
from aiormq import Connection
from aiormq.abc import DeliveredMessage


@pytest.fixture()
def amqp_url():
    return os.environ.get('BROKER_CONNECTION_STRING')


@pytest.fixture
async def amqp_connection(amqp_url):
    connection = Connection(amqp_url)
    async with connection:
        yield connection


@pytest.mark.asyncio
async def test_cancel_ok_frame_removes_consumer_without_error(
    amqp_connection: aiormq.Connection,
) -> None:
    exchange_name = 'test_exchange'
    queue_name = f'cancel_ok_test.{uuid.uuid4()}'

    channel = await amqp_connection.channel()

    print('declare exchange %s', exchange_name)
    await channel.exchange_declare(
        exchange_name, exchange_type='topic', durable=True
    )

    print('declare queue %s', queue_name)
    await channel.queue_declare(queue_name, durable=True)
    await channel.queue_bind(queue_name, 'test_exchange')

    async def consumer_callback(
        msg: DeliveredMessage
    ) -> None:
        print('consumer_callback %s', msg.body)
        await channel.basic_ack(msg.delivery_tag)

    consumer_tag = uuid.uuid4().hex

    await channel.basic_consume(
        queue_name,
        consumer_tag=consumer_tag,
        consumer_callback=consumer_callback
    )

    async def emulate_cancel_frame() -> None:
        """
        Something like this happens time to time in our rabbitmq cluster.
        Looks like cluster node migrated to another machine, and we got CancelOk frame,
        but we can't catch it due to lack of callbacks in aiormq.
        """
        await asyncio.sleep(3)
        print('emulate cancel from rabbitmq node')
        # in our app test I deleted a queue from rabbitmq and got same result
        await channel.basic_cancel(consumer_tag)

    asyncio.create_task(emulate_cancel_frame())

    cancel_token = asyncio.get_event_loop().create_future()

    # In application we use same cancel_token and while loop.
    while cancel_token: # but if we changed it by `consumer_tag in channel._consumers` it will work
        await channel.basic_publish(
            b'hello', exchange=exchange_name
        )
        print('wait for cancel')
        await asyncio.sleep(1)

        # We need to know if consumer is cancelled
        # and call cancel_token.cancel(),
        # it will stop application and k8s will restart it
        assert not amqp_connection.is_closed
        assert not channel.is_closed # I'm not sure if channel closed in our case but in this test it seems OK

In our production solution, we have implemented a workaround by periodically monitoring for lost consumers. If a problem is detected, we will close the connection, which in turn causes the pod to restart.

    async def _watch_consumers_task(
        self,
        consumer_tag: str,
        channel: aio_pika.Channel,
    ) -> None:
        """
        Check if our consumer is still scheduled for receiving messages.
        Work around https://github.com/mosquito/aiormq/issues/116
        """

        class ConsumerNotFoundError(Exception):
            ...

        async def wait_for_cancel() -> None:
            channel_ = await channel.get_underlay_channel()
            while True:
                await asyncio.sleep(self._watch_consumers_interval)
                if consumer_tag not in channel_.consumers:
                    logger.warning(
                        'Consumer tag not found in `channel.consumers`. '
                        'Probably was cancelled due to queue deletion.',
                        tag=consumer_tag,
                    )
                    raise ConsumerNotFoundError('consumer not found')

        logger.debug('watching consumer')

        try:
            await wait_for_cancel()
        except ConsumerNotFoundError:
            logger.warning('consumer not found', consumer_tag=consumer_tag)
            await self._connector.close()
        except Exception as e:
            logger.error(
                'unexpected error',
                consumer_tag=consumer_tag,
                error=str(e),
            )
            await self._connector.close()

See #116

@13hakta
Copy link

13hakta commented Jan 6, 2024

I got same problem with failing RabbitMQ. On latest versions in Windows Server editions periodically it fails with receiving available memory volume. After fail it closes channel and consumer stops receiving messages. In recent flow I can't detect whether this channel open or not without interacting with this channel, which shouldn't occur, only listening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request need investigation
Projects
None yet
Development

No branches or pull requests

4 participants