From f19515d7b2eeac25a4b17c3e657c1f4671051a80 Mon Sep 17 00:00:00 2001 From: Roman Koshel Date: Thu, 10 Oct 2024 15:29:16 +0300 Subject: [PATCH] Improvements and fixes --- rmqaio/locales/rmqaio.pot | 38 ++++----- rmqaio/locales/ru/LC_MESSAGES/rmqaio.mo | Bin 2190 -> 2190 bytes rmqaio/locales/ru/LC_MESSAGES/rmqaio.po | 52 +++++------- rmqaio/rmqaio.py | 104 ++++++++++-------------- tests/conftest.py | 2 +- tests/test_rmqaio.py | 60 ++++++++++++-- 6 files changed, 135 insertions(+), 121 deletions(-) diff --git a/rmqaio/locales/rmqaio.pot b/rmqaio/locales/rmqaio.pot index c44d258..19a9143 100644 --- a/rmqaio/locales/rmqaio.pot +++ b/rmqaio/locales/rmqaio.pot @@ -5,7 +5,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" -"POT-Creation-Date: 2024-10-09 20:18+0300\n" +"POT-Creation-Date: 2024-10-10 15:25+0300\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" @@ -19,75 +19,75 @@ msgstr "" msgid "%s (%s %s) retry(%s) in %s second(s)" msgstr "" -#: ../rmqaio/rmqaio.py:222 +#: ../rmqaio/rmqaio.py:213 msgid "len(url) not match len(ssl_context)" msgstr "" -#: ../rmqaio/rmqaio.py:272 +#: ../rmqaio/rmqaio.py:263 msgid "%s unclosed" msgstr "" -#: ../rmqaio/rmqaio.py:298 +#: ../rmqaio/rmqaio.py:289 msgid "%s execute callback[tp=%s, name=%s, reraise=%s]" msgstr "" -#: ../rmqaio/rmqaio.py:309 +#: ../rmqaio/rmqaio.py:300 msgid "%s callback[tp=%s, name=%s, callback=%s] error" msgstr "" -#: ../rmqaio/rmqaio.py:319 +#: ../rmqaio/rmqaio.py:310 msgid "%s set callback[tp=%s, name=%s, callback=%s]" msgstr "" -#: ../rmqaio/rmqaio.py:378 +#: ../rmqaio/rmqaio.py:369 msgid "%s connection lost" msgstr "" -#: ../rmqaio/rmqaio.py:401 +#: ../rmqaio/rmqaio.py:385 msgid "%s connecting[timeout=%s]..." msgstr "" -#: ../rmqaio/rmqaio.py:427 +#: ../rmqaio/rmqaio.py:388 msgid "%s connected" msgstr "" -#: ../rmqaio/rmqaio.py:479 +#: ../rmqaio/rmqaio.py:459 msgid "%s close underlying connection" msgstr "" -#: ../rmqaio/rmqaio.py:493 +#: ../rmqaio/rmqaio.py:475 msgid "%s closed" msgstr "" -#: ../rmqaio/rmqaio.py:553 ../rmqaio/rmqaio.py:764 +#: ../rmqaio/rmqaio.py:535 ../rmqaio/rmqaio.py:746 msgid "close %s" msgstr "" -#: ../rmqaio/rmqaio.py:581 ../rmqaio/rmqaio.py:732 +#: ../rmqaio/rmqaio.py:563 ../rmqaio/rmqaio.py:714 msgid "exchange[name='%s'] channel[%s] publish[routing_key='%s'] %s" msgstr "" -#: ../rmqaio/rmqaio.py:643 ../rmqaio/rmqaio.py:836 +#: ../rmqaio/rmqaio.py:625 ../rmqaio/rmqaio.py:818 msgid "close[delete=%s] %s" msgstr "" -#: ../rmqaio/rmqaio.py:678 ../rmqaio/rmqaio.py:871 +#: ../rmqaio/rmqaio.py:660 ../rmqaio/rmqaio.py:853 msgid "declare[restore=%s, force=%s] %s" msgstr "" -#: ../rmqaio/rmqaio.py:933 +#: ../rmqaio/rmqaio.py:915 msgid "bind queue '%s' to exchange '%s' with routing_key '%s'" msgstr "" -#: ../rmqaio/rmqaio.py:968 +#: ../rmqaio/rmqaio.py:950 msgid "unbind queue '%s' from exchange '%s' for routing_key '%s'" msgstr "" -#: ../rmqaio/rmqaio.py:1031 +#: ../rmqaio/rmqaio.py:1013 msgid "consume %s" msgstr "" -#: ../rmqaio/rmqaio.py:1057 +#: ../rmqaio/rmqaio.py:1039 msgid "stop consume %s" msgstr "" diff --git a/rmqaio/locales/ru/LC_MESSAGES/rmqaio.mo b/rmqaio/locales/ru/LC_MESSAGES/rmqaio.mo index 4a0f1636d2c1813b06366e8d7e0828068a8cc252..4c410d1a5e7d40415fd9270bf5f283d9f13c44ef 100644 GIT binary patch delta 21 ccmeAZ>=WD|#lm4|pkQcfWn{Kli6w^x06Gi==Kufz delta 21 ccmeAZ>=WD|#lm4=sbFMaWoWTki6w^x06I+s>;M1& diff --git a/rmqaio/locales/ru/LC_MESSAGES/rmqaio.po b/rmqaio/locales/ru/LC_MESSAGES/rmqaio.po index 3cf33c3..522c5c3 100644 --- a/rmqaio/locales/ru/LC_MESSAGES/rmqaio.po +++ b/rmqaio/locales/ru/LC_MESSAGES/rmqaio.po @@ -5,8 +5,8 @@ msgid "" msgstr "" "Project-Id-Version: \n" -"POT-Creation-Date: 2024-10-09 20:18+0300\n" -"PO-Revision-Date: 2024-10-09 20:18+0300\n" +"POT-Creation-Date: 2024-10-10 15:25+0300\n" +"PO-Revision-Date: 2024-10-10 15:26+0300\n" "Last-Translator: \n" "Language-Team: \n" "Language: ru\n" @@ -20,86 +20,74 @@ msgstr "" msgid "%s (%s %s) retry(%s) in %s second(s)" msgstr "%s (%s %s) повтор(%s) через %s секунд(у)" -#: ../rmqaio/rmqaio.py:222 +#: ../rmqaio/rmqaio.py:213 msgid "len(url) not match len(ssl_context)" msgstr "" -#: ../rmqaio/rmqaio.py:272 +#: ../rmqaio/rmqaio.py:263 msgid "%s unclosed" msgstr "%s не закрыто" -#: ../rmqaio/rmqaio.py:298 +#: ../rmqaio/rmqaio.py:289 msgid "%s execute callback[tp=%s, name=%s, reraise=%s]" msgstr "%s выполняю callback[tp=%s, name=%s, reraise=%s]" -#: ../rmqaio/rmqaio.py:309 +#: ../rmqaio/rmqaio.py:300 msgid "%s callback[tp=%s, name=%s, callback=%s] error" msgstr "%s ошибка выполнения callback[tp=%s, name=%s, callback=%s]" -#: ../rmqaio/rmqaio.py:319 +#: ../rmqaio/rmqaio.py:310 msgid "%s set callback[tp=%s, name=%s, callback=%s]" msgstr "%s устанавливаю callback[tp=%s, name=%s, callback=%s]" -#: ../rmqaio/rmqaio.py:378 +#: ../rmqaio/rmqaio.py:369 msgid "%s connection lost" msgstr "%s соединение потеряно" -#: ../rmqaio/rmqaio.py:401 +#: ../rmqaio/rmqaio.py:385 msgid "%s connecting[timeout=%s]..." msgstr "%s подключаюсь[тайм-аут=%s]..." -#: ../rmqaio/rmqaio.py:427 +#: ../rmqaio/rmqaio.py:388 msgid "%s connected" msgstr "%s подключено" -#: ../rmqaio/rmqaio.py:479 +#: ../rmqaio/rmqaio.py:459 msgid "%s close underlying connection" msgstr "%s закрываю внутреннее соединение" -#: ../rmqaio/rmqaio.py:493 +#: ../rmqaio/rmqaio.py:475 msgid "%s closed" msgstr "%s закрыто" -#: ../rmqaio/rmqaio.py:553 ../rmqaio/rmqaio.py:764 +#: ../rmqaio/rmqaio.py:535 ../rmqaio/rmqaio.py:746 msgid "close %s" msgstr "закрываю %s" -#: ../rmqaio/rmqaio.py:581 ../rmqaio/rmqaio.py:732 +#: ../rmqaio/rmqaio.py:563 ../rmqaio/rmqaio.py:714 msgid "exchange[name='%s'] channel[%s] publish[routing_key='%s'] %s" msgstr "обменник[name='%s'] канал[%s] публикую[routing_key='%s'] %s" -#: ../rmqaio/rmqaio.py:643 ../rmqaio/rmqaio.py:836 +#: ../rmqaio/rmqaio.py:625 ../rmqaio/rmqaio.py:818 msgid "close[delete=%s] %s" msgstr "закрываю[delete=%s] %s" -#: ../rmqaio/rmqaio.py:678 ../rmqaio/rmqaio.py:871 +#: ../rmqaio/rmqaio.py:660 ../rmqaio/rmqaio.py:853 msgid "declare[restore=%s, force=%s] %s" msgstr "объявляю[restore=%s, force=%s] %s" -#: ../rmqaio/rmqaio.py:933 +#: ../rmqaio/rmqaio.py:915 msgid "bind queue '%s' to exchange '%s' with routing_key '%s'" msgstr "привязываю очередь '%s' к обменнику '%s' с ключом маршрутизации '%s'" -#: ../rmqaio/rmqaio.py:968 +#: ../rmqaio/rmqaio.py:950 msgid "unbind queue '%s' from exchange '%s' for routing_key '%s'" msgstr "отвязываю очередь '%s' от обменника '%s' для ключа маршрутизации '%s'" -#: ../rmqaio/rmqaio.py:1031 +#: ../rmqaio/rmqaio.py:1013 msgid "consume %s" msgstr "слушаю %s" -#: ../rmqaio/rmqaio.py:1057 +#: ../rmqaio/rmqaio.py:1039 msgid "stop consume %s" msgstr "перестаю слушать %s" - -#~ msgid "%s close" -#~ msgstr "%s закрываю" - -#~ msgid "%s consuming" -#~ msgstr "%s слушаю" - -#~ msgid "RabbitMQ queue type" -#~ msgstr "Тип очереди RabbitMQ" - -#~ msgid "Retry decorator." -#~ msgstr "Декоратор для осуществлеия повтора в случае возникновения ошибки." diff --git a/rmqaio/rmqaio.py b/rmqaio/rmqaio.py index b410c2d..52587f7 100644 --- a/rmqaio/rmqaio.py +++ b/rmqaio/rmqaio.py @@ -131,29 +131,18 @@ async def wrapper(*args, **kwds): class _LoopIter: - """Infinity iterator. + """Repeatable iterator. Args: data: list of items to iterate. - - Examples: - >>> loop_iter = _LoopIter([1, 2]) - >>> next(loop_iter) - 1 - >>> next(loop_iter) - 2 - >>> next(loop_ipter) - 1 - ... """ - __slots__ = ("_data", "_i", "_j", "_iter") + __slots__ = ("_data", "_i", "_j") def __init__(self, data: list): self._data = data self._i = -1 self._j = 0 - self._iter = iter(data) def __next__(self): if self._j == len(self._data): @@ -164,7 +153,8 @@ def __next__(self): return self._data[self._i] def reset(self): - self._j = 1 + self._i = max(-1, self._i - 1) + self._j = 0 class Connection: @@ -208,23 +198,24 @@ def __init__( exc_filter: Callable[[Exception], bool] | None = None, ): if not isinstance(url, (list, tuple, set)): - self.urls = [url] + urls = [url] else: - self.urls = list(url) - - self._urls_iter = _LoopIter(self.urls) + urls = list(url) - if not isinstance(ssl_context, (list, tuple, set)): + if ssl_context is None: + ssl_contexts = [None] * len(urls) + elif not isinstance(ssl_context, (list, tuple, set)): ssl_contexts = [ssl_context] else: ssl_contexts = list(ssl_context) - if ssl_context and len(self.urls) != len(ssl_contexts): + + if ssl_context and len(urls) != len(ssl_contexts): raise Exception(_("len(url) not match len(ssl_context)")) - self._ssl_contexts_iter = _LoopIter(ssl_contexts) - self.url = next(self._urls_iter) + self._iter = _LoopIter(list(zip(urls, ssl_contexts))) - self.ssl_context = next(self._ssl_contexts_iter) + self.url = urls[0] + self.ssl_context = ssl_contexts[0] self.name = name or uuid4().hex[-4:] @@ -238,7 +229,7 @@ def __init__( self._closed: Future = Future() - self._key: tuple = (name, get_event_loop(), tuple(sorted(self.urls))) + self._key: tuple = (name, get_event_loop(), tuple(sorted(urls))) if self._key not in self.__shared: self.__shared[self._key] = { @@ -382,15 +373,8 @@ async def _watcher(self): self._reconnect_task = create_task(self.open(retry_timeouts=iter(chain((0, 3), repeat(5))))) await self._execute_callbacks("on_lost") - async def _connect( - self, - retry_timeouts: Iterable[int] | None = None, - exc_filter: Callable[[Exception], bool] | None = None, - ): - if retry_timeouts is None: - retry_timeouts = self._retry_timeouts - if exc_filter is None: - exc_filter = self._exc_filter + async def _connect(self): + self.url, self.ssl_context = next(self._iter) while not self.is_closed: connect_timeout = yarl.URL(self.url).query.get("connection_timeout") if connect_timeout is not None: @@ -399,33 +383,20 @@ async def _connect( connect_timeout = CONNECT_TIMEOUT try: logger.info(_("%s connecting[timeout=%s]..."), self, connect_timeout) - async with asyncio.timeout(connect_timeout): - if self._retry_timeouts: - self._conn = await _retry( - retry_timeouts=retry_timeouts, - exc_filter=exc_filter, - )(aiormq.connect)( - self.url, - context=self.ssl_context, - ) - else: - self._conn = await aiormq.connect(self.url, context=self.ssl_context) - self._urls_iter.reset() - self._ssl_contexts_iter.reset() + self._conn = await aiormq.connect(self.url, context=self.ssl_context) + logger.info(_("%s connected"), self) + self._iter.reset() break - except (asyncio.TimeoutError, ConnectionError, aiormq.exceptions.ConnectionClosed) as e: + except (asyncio.TimeoutError, ConnectionError, aiormq.exceptions.AMQPConnectionError) as e: try: - url = next(self._urls_iter) - ssl_context = next(self._ssl_contexts_iter) + url, ssl_context = next(self._iter) except StopIteration: raise e logger.warning("%s %s %s", self, e.__class__, e) self.url = url self.ssl_context = ssl_context - logger.info(_("%s connected"), self) - async def open( self, retry_timeouts: Iterable[int] | None = None, @@ -441,7 +412,16 @@ async def open( async with self._shared["connect_lock"]: if self._conn is None or self._conn.is_closed: - self._open_task = create_task(self._connect(retry_timeouts=retry_timeouts, exc_filter=exc_filter)) + if retry_timeouts is None: + retry_timeouts = self._retry_timeouts + if exc_filter is None: + exc_filter = self._exc_filter + if retry_timeouts: + self._open_task = create_task( + _retry(retry_timeouts=retry_timeouts, exc_filter=exc_filter)(self._connect)() + ) + else: + self._open_task = create_task(self._connect()) await self._open_task if self._watcher_task is None: @@ -485,6 +465,8 @@ async def close(self): self._watcher_task = None if self._reconnect_task: + if not self._reconnect_task.done(): + self._reconnect_task.cancel() try: await self._reconnect_task except Exception: @@ -550,7 +532,7 @@ def __post_init__(self): async def close(self): """Close exchange.""" - logger.debug(_("close %s"), self) + logger.info(_("close %s"), self) try: if self.conn_factory: self.conn.remove_callbacks(cancel=True) @@ -640,7 +622,7 @@ async def close(self, delete: bool | None = None, timeout: int | None = None): if self.conn.is_closed: raise Exception("already closed") - logger.debug(_("close[delete=%s] %s"), delete, self) + logger.info(_("close[delete=%s] %s"), delete, self) try: if self.conn_factory: @@ -675,7 +657,7 @@ async def declare( if self.name == "": return - logger.debug(_("declare[restore=%s, force=%s] %s"), restore, force, self) + logger.info(_("declare[restore=%s, force=%s] %s"), restore, force, self) async def fn(): channel = await self.conn.channel() @@ -761,7 +743,7 @@ class Consumer: async def close(self): """Close consumer channel.""" - logger.debug(_("close %s"), self) + logger.info(_("close %s"), self) await self.channel.close() @@ -833,7 +815,7 @@ async def close(self, delete: bool | None = None, timeout: int | None = None): if self.conn.is_closed: raise Exception("already closed") - logger.debug(_("close[delete=%s] %s"), delete, self) + logger.info(_("close[delete=%s] %s"), delete, self) try: await self.stop_consume() @@ -868,7 +850,7 @@ async def declare( force: Force redeclare queue if it has already been declared with different parameters. """ - logger.debug(_("declare[restore=%s, force=%s] %s"), restore, force, self) + logger.info(_("declare[restore=%s, force=%s] %s"), restore, force, self) async def fn(): channel = await self.conn.channel() @@ -929,7 +911,7 @@ async def bind( restore: Restore this binding on connection issue. """ - logger.debug( + logger.info( _("bind queue '%s' to exchange '%s' with routing_key '%s'"), self.name, exchange.name, @@ -964,7 +946,7 @@ async def unbind(self, exchange: Exchange, routing_key: str, timeout: int | None timeout: Operation timeout. If `None` `self.timeout` will be used. """ - logger.debug( + logger.info( _("unbind queue '%s' from exchange '%s' for routing_key '%s'"), self.name, exchange.name, @@ -1054,7 +1036,7 @@ async def stop_consume(self, timeout: int | None = None): timeout: Operation timeout. If `None` `self.timeout` will be used. """ - logger.debug(_("stop consume %s"), self) + logger.info(_("stop consume %s"), self) self.conn.remove_callback("on_lost", f"on_lost_queue_{self.name}_consume", cancel=True) diff --git a/tests/conftest.py b/tests/conftest.py index a38f54f..b63f2f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,7 @@ from tests import utils -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.INFO) rmqaio.LOG_SANITIZE = False diff --git a/tests/test_rmqaio.py b/tests/test_rmqaio.py index 96a93a4..de531b5 100644 --- a/tests/test_rmqaio.py +++ b/tests/test_rmqaio.py @@ -10,6 +10,40 @@ import rmqaio +def test_LoopIter(): + it = rmqaio.rmqaio._LoopIter(["a", "b"]) + for idx in range(3): + assert next(it) == "a", idx + assert next(it) == "b", idx + with pytest.raises(StopIteration): + next(it) + + it = rmqaio.rmqaio._LoopIter(["a", "b"]) + for idx in range(3): + assert next(it) == "a", idx + it.reset() + assert next(it) == "a", idx + assert next(it) == "b", idx + with pytest.raises(StopIteration): + next(it) + + assert next(it) == "a" + assert next(it) == "b" + it.reset() + assert next(it) == "b" + assert next(it) == "a" + with pytest.raises(StopIteration): + next(it) + assert next(it) == "b" + assert next(it) == "a" + with pytest.raises(StopIteration): + next(it) + assert next(it) == "b" + assert next(it) == "a" + with pytest.raises(StopIteration): + next(it) + + async def assert_has_connection(api): for _ in range(10): resp = api.get("/api/connections") @@ -106,7 +140,6 @@ async def test__init(self): conn = rmqaio.Connection("amqp://admin@example.com", name="abc") try: assert str(conn.url) == "amqp://admin@example.com" - assert conn.urls == ["amqp://admin@example.com"] assert conn.name == "abc" assert conn._key is not None assert conn._key in conn._Connection__shared @@ -219,12 +252,15 @@ async def test_connection(self, rabbitmq): api = httpx.Client(base_url=f"http://{rabbitmq['ip']}:15672", auth=("guest", "guest")) conn = rmqaio.Connection( - f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}", + [ + "amqp://invalid", + f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}", + ], name="abc", retry_timeouts=[1, 3, 5], ) assert conn - assert f"{conn}" == f"Connection[{rabbitmq['ip']}]#abc" + assert f"{conn}" == f"Connection[invalid]#abc" assert conn.is_open is False assert conn.is_closed is False @@ -253,7 +289,9 @@ async def test_exchange(self, rabbitmq): api = httpx.Client(base_url=f"http://{rabbitmq['ip']}:15672", auth=("guest", "guest")) exchange = rmqaio.Exchange( - conn_factory=lambda: rmqaio.Connection(f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}") + conn_factory=lambda: rmqaio.Connection( + [f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}"], + ) ) try: await exchange.declare() @@ -263,7 +301,10 @@ async def test_exchange(self, rabbitmq): exchange = rmqaio.Exchange( name="test", conn_factory=lambda: rmqaio.Connection( - f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}", + [ + "amqp://invalid", + f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}", + ], retry_timeouts=[1, 3, 5], ), ) @@ -294,11 +335,14 @@ async def test_queue(self, rabbitmq): @pytest.mark.asyncio async def test_lost_connection(self, rabbitmq): - rmqaio.logger.setLevel("DEBUG") api = httpx.Client(base_url=f"http://{rabbitmq['ip']}:15672", auth=("guest", "guest")) conn = rmqaio.Connection( - f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}", + [ + "amqp://invalid1?connection_timeout=1000", + "amqp://invalid2?connection_timeout=2000", + f"amqp://{rabbitmq['ip']}:{rabbitmq['port']}", + ], name="abc", retry_timeouts=repeat(1), ) @@ -318,7 +362,7 @@ async def test_lost_connection(self, rabbitmq): await asyncio.sleep(5) - for _ in range(30): + for _ in range(20): if conn.is_open: break await asyncio.sleep(1)