From 45ca1902bcc0e8f85c706d58c1ff0d9aef2982b8 Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Wed, 8 Jan 2025 15:14:59 +0300 Subject: [PATCH 1/7] Implemented RQ integration --- src/dishka/integrations/rq.py | 77 ++++++++++++ tests/integrations/rq/__init__.py | 0 tests/integrations/rq/test_rq.py | 194 ++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+) create mode 100644 src/dishka/integrations/rq.py create mode 100644 tests/integrations/rq/__init__.py create mode 100644 tests/integrations/rq/test_rq.py diff --git a/src/dishka/integrations/rq.py b/src/dishka/integrations/rq.py new file mode 100644 index 00000000..c3ca5181 --- /dev/null +++ b/src/dishka/integrations/rq.py @@ -0,0 +1,77 @@ +from collections.abc import Callable +from inspect import signature +from typing import Any, get_type_hints + +from rq import Queue, Worker +from rq.job import Job + +from dishka import Container +from dishka.integrations.base import default_parse_dependency + + +class DishkaWorker(Worker): + """Custom RQ Worker class with Dishka DI support.""" + + def __init__( + self, + *args, + container: Container, + **kwargs, + ) -> None: + """Sets up class and container.""" + super().__init__(*args, **kwargs) + self.dishka_container = container + + def perform_job(self, job: Job, queue: Queue) -> bool: + """Performs job call""" + request_container = self.dishka_container().__enter__() + self.inject_deps(job, request_container) + job_result = super().perform_job(job, queue) + request_container.close() + return job_result + + def inject_deps(self, job: Job, container: Container) -> None: + """Injects dependencies into using the Dishka container. + + Args: + job: The RQ job to inject dependencies into. + """ + if job.func: + dependencies = self._build_dependencies(job.func) + updated_kwargs = self._build_kwargs(dependencies, container) + if isinstance(job.kwargs, dict): + job.kwargs.update(updated_kwargs) + + def teardown(self) -> None: + """Closes DI container on worker shutdown.""" + self.dishka_container.close() + super().teardown() + + @classmethod + def _build_dependencies( + cls, callable_: Callable[..., Any], + ) -> dict[str, Any]: + """Builds dependencies for the given callable.""" + dependencies = {} + + for name, parameter in signature(callable_).parameters.items(): + dep = default_parse_dependency( + parameter, + get_type_hints(callable_, include_extras=True).get(name, Any), + ) + if dep is None: + continue + dependencies[name] = dep + + return dependencies + + def _build_kwargs( + self, + dependencies: dict, + request_container: Container, + ) -> dict[str, Any]: + """Buld kwargs dict for RQ job run.""" + return { + name: request_container.get(dep.type_hint, component=dep.component) + for name, dep in dependencies.items() + } diff --git a/tests/integrations/rq/__init__.py b/tests/integrations/rq/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integrations/rq/test_rq.py b/tests/integrations/rq/test_rq.py new file mode 100644 index 00000000..5747c3d0 --- /dev/null +++ b/tests/integrations/rq/test_rq.py @@ -0,0 +1,194 @@ +from unittest.mock import Mock, patch + +import pytest +from fakeredis import FakeStrictRedis +from rq import Queue, Worker +from rq.job import Job + +from dishka import FromDishka +from dishka.container import Container +from dishka.integrations.rq import DishkaWorker +from ..common import ( + APP_DEP_VALUE, + REQUEST_DEP_VALUE, + AppDep, + AppMock, + AppProvider, + RequestDep, +) + +# Supress CLIENT SETNAME warning from Worker. FakeRedis does not support it. +pytestmark = pytest.mark.filterwarnings("ignore:CLIENT SETNAME") + + +def app_job(a: FromDishka[AppDep], app_mock: FromDishka[AppMock]): + pass # pragma: no coverage + + +def request_job( + a: FromDishka[AppDep], + r: FromDishka[RequestDep], + mock: FromDishka[Mock], +): + pass # pragma: no coverage + + +def job_without_deps(): + pass # pragma: no coverage + + +@pytest.fixture +def fake_redis_conn(): + return FakeStrictRedis() + + +@pytest.fixture +def worker( + container: Container, + fake_redis_conn: FakeStrictRedis, +): + return DishkaWorker( + ["test_queue"], + container=container, + connection=fake_redis_conn, + ) + + +@pytest.fixture +def queue(fake_redis_conn: FakeStrictRedis): + return Queue(name="test_queue", connection=fake_redis_conn) + + +def test_worker_initialization( + container: Container, + fake_redis_conn: FakeStrictRedis, +): + worker = DishkaWorker( + ["test_queue"], + container=container, + connection=fake_redis_conn, + ) + assert worker.dishka_container == container + + +def test_inject_app_deps( + worker: DishkaWorker, + container: Container, + fake_redis_conn: FakeStrictRedis, + app_provider: AppProvider, +): + # Create a mock job with the example_job function + job = Job.create( + func=app_job, + kwargs={}, + connection=fake_redis_conn, + ) + + # Inject dependencies + request_container = container().__enter__() + worker.inject_deps(job, request_container) + + # Verify that the dependencies were injected correctly + assert job.kwargs["a"] == APP_DEP_VALUE + assert job.kwargs["app_mock"] == app_provider.app_mock + + +def test_inject_request_deps( + worker: DishkaWorker, + container: Container, + fake_redis_conn: FakeStrictRedis, + app_provider: AppProvider, +): + # Create a mock job with the example_job function + job = Job.create(func=request_job, kwargs={}, connection=fake_redis_conn) + + # Inject dependencies + request_container = container().__enter__() + worker.inject_deps(job, request_container) + + # Verify that the dependencies were injected correctly + assert job.kwargs["a"] == APP_DEP_VALUE + assert job.kwargs["r"] == REQUEST_DEP_VALUE + assert job.kwargs["mock"] == app_provider.mock + + +def test_inject_deps_with_existing_kwargs( + container: Container, + worker: DishkaWorker, + fake_redis_conn: FakeStrictRedis, +): + existing_kwargs = {"extra_param": "value"} + job = Job.create( + func=app_job, + kwargs=existing_kwargs.copy(), + connection=fake_redis_conn, + ) + + # Inject dependencies + request_container = container().__enter__() + worker.inject_deps(job, request_container) + + # Verify that existing kwargs are preserved and new ones are added + assert job.kwargs["extra_param"] == "value" + assert job.kwargs["a"] == APP_DEP_VALUE + + +def test_inject_deps_without_dependencies( + container: Container, + worker: DishkaWorker, + fake_redis_conn: FakeStrictRedis, +): + # Create a job without dependencies + job = Job.create( + func=job_without_deps, + kwargs={}, + connection=fake_redis_conn, + ) + + # Inject dependencies + request_container = container().__enter__() + worker.inject_deps(job, request_container) + + # Verify that kwargs remain empty + assert job.kwargs == {} + + +def test_perform_app_job( + worker: DishkaWorker, + fake_redis_conn: FakeStrictRedis, + app_provider: AppProvider, +): + mock_call = Mock(name="mock_perform_job", return_value=True) + mock_queue = Mock(spec=Queue) + + with patch.object(Worker, "perform_job", mock_call) as mock_perform_job: + job = Job.create(func=app_job, connection=fake_redis_conn) + + worker.perform_job(job, mock_queue) + + mock_perform_job.assert_called_once_with(job, mock_queue) + app_provider.app_released.assert_not_called() + + worker.teardown() + app_provider.app_released.assert_called() + + +def test_perform_request_job( + worker: DishkaWorker, + fake_redis_conn: FakeStrictRedis, + app_provider: AppProvider, +): + mock_call = Mock(name="mock_perform_job", return_value=True) + mock_queue = Mock(spec=Queue) + + with patch.object(Worker, "perform_job", mock_call) as mock_perform_job: + job = Job.create(func=request_job, connection=fake_redis_conn) + + worker.perform_job(job, mock_queue) + + mock_perform_job.assert_called_once_with(job, mock_queue) + app_provider.app_released.assert_not_called() + app_provider.request_released.assert_called() + + worker.teardown() + app_provider.app_released.assert_called() From 5945e533db94d2426dc3ef4ebbb8aa568f6b3dde Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Wed, 8 Jan 2025 15:23:08 +0300 Subject: [PATCH 2/7] Add RQ integration to nox test suite --- noxfile.py | 30 ++++++++++++++++++++++++------ requirements/rq-200.txt | 3 +++ requirements/rq-latest.txt | 3 +++ 3 files changed, 30 insertions(+), 6 deletions(-) create mode 100644 requirements/rq-200.txt create mode 100644 requirements/rq-latest.txt diff --git a/noxfile.py b/noxfile.py index d2d1b7ba..4fb6d1db 100644 --- a/noxfile.py +++ b/noxfile.py @@ -7,7 +7,13 @@ nox.options.default_venv_backend = "uv" nox.options.reuse_existing_virtualenvs = True -CMD = ("pytest", "--cov=dishka", "--cov-append", "--cov-report=term-missing", "-v") +CMD = ( + "pytest", + "--cov=dishka", + "--cov-append", + "--cov-report=term-missing", + "-v", +) INSTALL_CMD = ("pytest", "pytest-cov", "-e", ".") @@ -18,7 +24,9 @@ class IntegrationEnv: constraint: Callable[[], bool] = lambda: True def get_req(self) -> str: - return f"requirements/{self.library.replace('_', '-')}-{self.version}.txt" + return ( + f"requirements/{self.library.replace('_', '-')}-{self.version}.txt" + ) def get_tests(self) -> str: return f"tests/integrations/{self.library}" @@ -50,6 +58,8 @@ def get_tests(self) -> str: IntegrationEnv("grpcio", "latest"), IntegrationEnv("litestar", "230"), IntegrationEnv("litestar", "latest"), + IntegrationEnv("rq", "200"), + IntegrationEnv("rq", "latest"), IntegrationEnv("sanic", "23121"), IntegrationEnv("sanic", "latest"), IntegrationEnv("starlette", "0270"), @@ -62,13 +72,19 @@ def get_tests(self) -> str: for env in INTEGRATIONS: + @nox.session( name=f"{env.library}_{env.version}", - tags=[env.library, "latest" if env.version == "latest" else "non-latest"], + tags=[ + env.library, + "latest" if env.version == "latest" else "non-latest", + ], ) def session(session: nox.Session, env=env) -> None: if not env.constraint(): - session.skip("Skip tests on python 3.13 due to compatibility issues") + session.skip( + "Skip tests on python 3.13 due to compatibility issues" + ) session.install(*INSTALL_CMD, "-r", env.get_req()) session.run(*CMD, env.get_tests()) @@ -77,7 +93,8 @@ def session(session: nox.Session, env=env) -> None: def unit(session: nox.Session) -> None: session.install( *INSTALL_CMD, - "-r", "requirements/test.txt", + "-r", + "requirements/test.txt", ) session.run(*CMD, "tests/unit") @@ -86,6 +103,7 @@ def unit(session: nox.Session) -> None: def real_world(session: nox.Session) -> None: session.install( *INSTALL_CMD, - "-r", "examples/real_world/requirements_test.txt", + "-r", + "examples/real_world/requirements_test.txt", ) session.run(*CMD, "examples/real_world/tests/") diff --git a/requirements/rq-200.txt b/requirements/rq-200.txt new file mode 100644 index 00000000..6b7f3af3 --- /dev/null +++ b/requirements/rq-200.txt @@ -0,0 +1,3 @@ +-r test.txt +rq==2.0.0 +fakeredis==2.26.2 diff --git a/requirements/rq-latest.txt b/requirements/rq-latest.txt new file mode 100644 index 00000000..6b7f3af3 --- /dev/null +++ b/requirements/rq-latest.txt @@ -0,0 +1,3 @@ +-r test.txt +rq==2.0.0 +fakeredis==2.26.2 From 95771de536576e9c464e05810f5358ce0f436b3a Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Wed, 8 Jan 2025 15:51:22 +0300 Subject: [PATCH 3/7] Add example for RQ integration --- examples/integrations/rq/__init__.py | 0 examples/integrations/rq/enqueue.py | 19 +++++++++++++++++++ examples/integrations/rq/run_worker.py | 24 ++++++++++++++++++++++++ examples/integrations/rq/tasks.py | 5 +++++ 4 files changed, 48 insertions(+) create mode 100644 examples/integrations/rq/__init__.py create mode 100644 examples/integrations/rq/enqueue.py create mode 100644 examples/integrations/rq/run_worker.py create mode 100644 examples/integrations/rq/tasks.py diff --git a/examples/integrations/rq/__init__.py b/examples/integrations/rq/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/integrations/rq/enqueue.py b/examples/integrations/rq/enqueue.py new file mode 100644 index 00000000..6e08db1f --- /dev/null +++ b/examples/integrations/rq/enqueue.py @@ -0,0 +1,19 @@ +from time import sleep + +from redis import Redis +from rq import Queue +from tasks import hello_world + +if __name__ == "__main__": + connection = Redis() + queue = Queue( + name="default", + connection=connection, + ) + job = queue.enqueue(hello_world) + + res = job.result + while not res: + res = job.result + sleep(1) + print(res) diff --git a/examples/integrations/rq/run_worker.py b/examples/integrations/rq/run_worker.py new file mode 100644 index 00000000..757c76c3 --- /dev/null +++ b/examples/integrations/rq/run_worker.py @@ -0,0 +1,24 @@ +from redis import Redis + +from dishka import Provider, Scope, make_container, provide +from dishka.integrations.rq import DishkaWorker + + +class StrProvider(Provider): + @provide(scope=Scope.REQUEST) + def hello(self) -> str: + return "Hello" + + +def setup_worker() -> DishkaWorker: + provider = StrProvider() + container = make_container(provider) + queues = ["default"] + conn = Redis() + worker = DishkaWorker(container=container, queues=queues, connection=conn) + return worker + + +if __name__ == "__main__": + worker = setup_worker() + worker.work(with_scheduler=True) diff --git a/examples/integrations/rq/tasks.py b/examples/integrations/rq/tasks.py new file mode 100644 index 00000000..dc651998 --- /dev/null +++ b/examples/integrations/rq/tasks.py @@ -0,0 +1,5 @@ +from dishka import FromDishka + + +def hello_world(hello: FromDishka[str]): + return f"{hello} world!" From dfdbf25f909a6667fbaac80b7df6ce92b5071ded Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Wed, 8 Jan 2025 16:17:19 +0300 Subject: [PATCH 4/7] Add documentation for RQ integration --- docs/integrations/index.rst | 4 +-- docs/integrations/rq.rst | 50 +++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 docs/integrations/rq.rst diff --git a/docs/integrations/index.rst b/docs/integrations/index.rst index 4d28eebe..4c19407b 100644 --- a/docs/integrations/index.rst +++ b/docs/integrations/index.rst @@ -21,6 +21,7 @@ You can create custom integrations for your framework of choice. flask grpcio litestar + rq sanic starlette taskiq @@ -52,7 +53,7 @@ You can create custom integrations for your framework of choice. * - :ref:`Flask` - - - + - :ref:`RQ` - * - :ref:`Litestar` - @@ -112,4 +113,3 @@ For FastAPI it will look like: app = FastAPI() container = make_async_container(your_provider, FastapiProvider()) setup_dishka(container, app) - diff --git a/docs/integrations/rq.rst b/docs/integrations/rq.rst new file mode 100644 index 00000000..2364c599 --- /dev/null +++ b/docs/integrations/rq.rst @@ -0,0 +1,50 @@ +.. _rq: + +rq +=========================================== + +Though it is not required, you can use dishka-rq integration. It features: + +* automatic REQUEST and SESSION scope management using Worker subclass +* automatic injection of dependencies into job function. + +How to use +**************** + +1. Create provider and container as usual. + +.. code-block:: python + + class StrProvider(Provider): + @provide(scope=Scope.REQUEST) + def hello(self) -> str: + return "Hello" + + provider = StrProvider() + container = make_container(provider) + +2. Import. + +.. code-block:: python + + from dishka import FromDishka + +3. Mark those of your job functions parameters which are to be injected with ``FromDishka[]`` + +.. code-block:: python + + def hello_world(hello: FromDishka[str]): + return f"{hello} world!" + +4. Run you worker using your container and DishkaWorker subclass. + +.. code-block:: python + + conn = Redis() + queues = ["default"] + worker = DishkaWorker(container=container, queues=queues, connection=conn) + worker.work(with_scheduler=True) + +.. code-block:: shell + + python run_worker.py From e1761dba20d062898659ceefcb5ac376203b1296 Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Wed, 8 Jan 2025 16:36:33 +0300 Subject: [PATCH 5/7] Restores original formatting of noxfile --- noxfile.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/noxfile.py b/noxfile.py index 4fb6d1db..710802bd 100644 --- a/noxfile.py +++ b/noxfile.py @@ -7,13 +7,7 @@ nox.options.default_venv_backend = "uv" nox.options.reuse_existing_virtualenvs = True -CMD = ( - "pytest", - "--cov=dishka", - "--cov-append", - "--cov-report=term-missing", - "-v", -) +CMD = ("pytest", "--cov=dishka", "--cov-append", "--cov-report=term-missing", "-v") INSTALL_CMD = ("pytest", "pytest-cov", "-e", ".") @@ -24,9 +18,7 @@ class IntegrationEnv: constraint: Callable[[], bool] = lambda: True def get_req(self) -> str: - return ( - f"requirements/{self.library.replace('_', '-')}-{self.version}.txt" - ) + return f"requirements/{self.library.replace('_', '-')}-{self.version}.txt" def get_tests(self) -> str: return f"tests/integrations/{self.library}" @@ -72,19 +64,13 @@ def get_tests(self) -> str: for env in INTEGRATIONS: - @nox.session( name=f"{env.library}_{env.version}", - tags=[ - env.library, - "latest" if env.version == "latest" else "non-latest", - ], + tags=[env.library, "latest" if env.version == "latest" else "non-latest"], ) def session(session: nox.Session, env=env) -> None: if not env.constraint(): - session.skip( - "Skip tests on python 3.13 due to compatibility issues" - ) + session.skip("Skip tests on python 3.13 due to compatibility issues") session.install(*INSTALL_CMD, "-r", env.get_req()) session.run(*CMD, env.get_tests()) @@ -93,8 +79,7 @@ def session(session: nox.Session, env=env) -> None: def unit(session: nox.Session) -> None: session.install( *INSTALL_CMD, - "-r", - "requirements/test.txt", + "-r", "requirements/test.txt", ) session.run(*CMD, "tests/unit") @@ -103,7 +88,6 @@ def unit(session: nox.Session) -> None: def real_world(session: nox.Session) -> None: session.install( *INSTALL_CMD, - "-r", - "examples/real_world/requirements_test.txt", + "-r", "examples/real_world/requirements_test.txt", ) - session.run(*CMD, "examples/real_world/tests/") + session.run(*CMD, "examples/real_world/tests/") \ No newline at end of file From 8c33ca4bf1a41780b52570b256b860963de845e1 Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Thu, 9 Jan 2025 20:30:34 +0300 Subject: [PATCH 6/7] Remove patching Worker from unit tests --- tests/integrations/rq/test_rq.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/integrations/rq/test_rq.py b/tests/integrations/rq/test_rq.py index 5747c3d0..035d9651 100644 --- a/tests/integrations/rq/test_rq.py +++ b/tests/integrations/rq/test_rq.py @@ -22,7 +22,7 @@ def app_job(a: FromDishka[AppDep], app_mock: FromDishka[AppMock]): - pass # pragma: no coverage + app_mock(a) def request_job( @@ -158,15 +158,12 @@ def test_perform_app_job( fake_redis_conn: FakeStrictRedis, app_provider: AppProvider, ): - mock_call = Mock(name="mock_perform_job", return_value=True) - mock_queue = Mock(spec=Queue) + queue = Queue(connection=fake_redis_conn) + job = queue.enqueue(app_job) - with patch.object(Worker, "perform_job", mock_call) as mock_perform_job: - job = Job.create(func=app_job, connection=fake_redis_conn) - - worker.perform_job(job, mock_queue) + worker.perform_job(job, queue) - mock_perform_job.assert_called_once_with(job, mock_queue) + app_provider.app_mock.assert_called_once() app_provider.app_released.assert_not_called() worker.teardown() From ed39c57431c926f6c601325a1d5e9a0ba51432df Mon Sep 17 00:00:00 2001 From: Paul Repin Date: Fri, 17 Jan 2025 13:41:21 +0300 Subject: [PATCH 7/7] Update tests for request-level job --- tests/integrations/rq/test_rq.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/tests/integrations/rq/test_rq.py b/tests/integrations/rq/test_rq.py index 035d9651..0e1ee69c 100644 --- a/tests/integrations/rq/test_rq.py +++ b/tests/integrations/rq/test_rq.py @@ -1,8 +1,8 @@ -from unittest.mock import Mock, patch +from unittest.mock import Mock import pytest from fakeredis import FakeStrictRedis -from rq import Queue, Worker +from rq import Queue from rq.job import Job from dishka import FromDishka @@ -30,7 +30,7 @@ def request_job( r: FromDishka[RequestDep], mock: FromDishka[Mock], ): - pass # pragma: no coverage + mock(r) def job_without_deps(): @@ -175,17 +175,14 @@ def test_perform_request_job( fake_redis_conn: FakeStrictRedis, app_provider: AppProvider, ): - mock_call = Mock(name="mock_perform_job", return_value=True) - mock_queue = Mock(spec=Queue) - - with patch.object(Worker, "perform_job", mock_call) as mock_perform_job: - job = Job.create(func=request_job, connection=fake_redis_conn) + queue = Queue(connection=fake_redis_conn) + job = queue.enqueue(request_job) - worker.perform_job(job, mock_queue) + worker.perform_job(job, queue) - mock_perform_job.assert_called_once_with(job, mock_queue) - app_provider.app_released.assert_not_called() - app_provider.request_released.assert_called() + app_provider.mock.assert_called_once() + app_provider.app_released.assert_not_called() + app_provider.request_released.assert_called() worker.teardown() app_provider.app_released.assert_called()