Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RQ integration #333

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/integrations/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ You can create custom integrations for your framework of choice.
flask
grpcio
litestar
rq
sanic
starlette
taskiq
Expand Down Expand Up @@ -52,7 +53,7 @@ You can create custom integrations for your framework of choice.

* - :ref:`Flask`
-
-
- :ref:`RQ`
-
* - :ref:`Litestar`
-
Expand Down Expand Up @@ -112,4 +113,3 @@ For FastAPI it will look like:
app = FastAPI()
container = make_async_container(your_provider, FastapiProvider())
setup_dishka(container, app)

50 changes: 50 additions & 0 deletions docs/integrations/rq.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
.. _rq:

rq
===========================================

Though it is not required, you can use dishka-rq integration. It features:

* automatic REQUEST and SESSION scope management using Worker subclass
* automatic injection of dependencies into job function.

How to use
****************

1. Create provider and container as usual.

.. code-block:: python

class StrProvider(Provider):
@provide(scope=Scope.REQUEST)
def hello(self) -> str:
return "Hello"

provider = StrProvider()
container = make_container(provider)

2. Import.

.. code-block:: python

from dishka import FromDishka

3. Mark those of your job functions parameters which are to be injected with ``FromDishka[]``

.. code-block:: python

def hello_world(hello: FromDishka[str]):
return f"{hello} world!"

4. Run you worker using your container and DishkaWorker subclass.

.. code-block:: python

conn = Redis()
queues = ["default"]
worker = DishkaWorker(container=container, queues=queues, connection=conn)
worker.work(with_scheduler=True)

.. code-block:: shell

python run_worker.py
Empty file.
19 changes: 19 additions & 0 deletions examples/integrations/rq/enqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from time import sleep

from redis import Redis
from rq import Queue
from tasks import hello_world

if __name__ == "__main__":
connection = Redis()
queue = Queue(
name="default",
connection=connection,
)
job = queue.enqueue(hello_world)

res = job.result
while not res:
res = job.result
sleep(1)
print(res)
24 changes: 24 additions & 0 deletions examples/integrations/rq/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from redis import Redis

from dishka import Provider, Scope, make_container, provide
from dishka.integrations.rq import DishkaWorker


class StrProvider(Provider):
@provide(scope=Scope.REQUEST)
def hello(self) -> str:
return "Hello"


def setup_worker() -> DishkaWorker:
provider = StrProvider()
container = make_container(provider)
queues = ["default"]
conn = Redis()
worker = DishkaWorker(container=container, queues=queues, connection=conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only way to setup integration? How like is it that user has their own custom worker class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, but I'm not sure how severe the use of custom workers is. The official documentation suggests some cases such as running on Heroku or running a worker in the same thread as the main app (in case someone needs to run a Worker within a test suite).

Another option besides subclassing Worker is to patch the Worker class at runtime, as they did in the Sentry RQ integration. This allows for any custom Worker subclasses, but it's still not the cleanest way to plug in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to reopen this discussion. For all integrations we have @inject decorator which actually does injection. This is required to have full control on when it is done (some users implement their own decorators but still have scope control in framework, or just can test it per-funcion). For some of them we also have "autoinjection"-mode, which uses less straightforward approach which allows to skip manually placing inject decorator, but is usually more dangerous or less flexible.

Can we implement same approach here? Answer no, I've checked is acceptable

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't see how to implement this without rewriting core RQ logic upstream. RQ is retrieving job function via getattr(module,'func_name'). That returns original undecorated function.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works for me:

from functools import wraps

def wrapped(something):
    def logged(func):
        @wraps(func)
        def with_logging(*args, **kwargs):
            print(func.__name__ + " was called", something)
            return func(*args, **kwargs)
        return with_logging
    return logged

rq log:

11:34:16 default: job.some_job() (fca8ef06-8d8e-4ee7-afe2-d52da9942ff1)
some_job was called 123

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with another idea on how to implement such integration without monkeypatching or subclassing original worker (at least for cases without autoinjection).

What if instead of attaching container to Worker instance — we wrap it inside @inject decorator itself?

E.g.

  1. Integrations provides injector factory
# dishka/integrations/rq.py

def make_inject(container: Container):
    def inject(func: Callable[P, T]) -> Callable[P, T]:
        with container() as request_container:
            wrapped = wrap_injection(
                func=func,
                remove_depends=True,
                container_getter=lambda _, __: request_container,
                is_async=False,
            )
            update_wrapper(wrapped, func)
            return wrapped

    return inject
  1. Instead of running setup_dishka(...) on startup, we are creating injector
# app/dependencies.py

from dishka.integrations.rq import Provider, Scope, make_container, make_inject

class StrProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def hello(self) -> str:
        return "Hello"
        
container = make_container(StrProvider())
inject = make_inject(container)
  1. In tasks file import newly created injector and decorate tasks as usual
# app/tasks.py

from dishka import FromDishka
from app.dependencies import inject

@inject
def download(name: FromDishka[str], redis: FromDishka[Redis]):
    print("Execute download job, with name=", name, " and redis=", redis)
    return f"{name} Downloaded"

Do you see any downsides of such implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. In this case we need global container to be created before tasks and indirectly imported to any of them. I can't say I like this approach more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow access rq app inside inject, so that we can get container from there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, there is no such thing as Celery's task.app.conf or other kind of context object. Task can only access it's own Job object which does not provide any references to global scope.

So third option is go Sentry's way and monkeypatch worker

def setup_dishka(worker: Worker, container: Container):
    worker_klass = worker.__class__
    old_perform_job = worker_klass.perform_job

    def dishka_patched_perform_job(self, job: Job, queue: Queue) -> bool:
        """Performs job call"""
        request_container = container().__enter__()
        setattr(job, "_dishka_request_container", request_container)

        job_result = old_perform_job(worker, job, queue)
        request_container.close()
        return job_result

    worker_klass.perform_job = dishka_patched_perform_job

In that case patched function will inject Request scope container into each Job instance from which it can be accessed by @inject decorator.

I guess in that case auto inject behavior can also be controlled by flag passed to setup_dishka(), like that:

def setup_dishka(worker: Worker, container: Container, auto_inject: bool = False):
  ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, looks like we have not much options. I can agree with currently implemented one.

return worker


if __name__ == "__main__":
worker = setup_worker()
worker.work(with_scheduler=True)
5 changes: 5 additions & 0 deletions examples/integrations/rq/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dishka import FromDishka


def hello_world(hello: FromDishka[str]):
return f"{hello} world!"
4 changes: 3 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def get_tests(self) -> str:
IntegrationEnv("grpcio", "latest"),
IntegrationEnv("litestar", "230"),
IntegrationEnv("litestar", "latest"),
IntegrationEnv("rq", "200"),
IntegrationEnv("rq", "latest"),
IntegrationEnv("sanic", "23121"),
IntegrationEnv("sanic", "latest"),
IntegrationEnv("starlette", "0270"),
Expand Down Expand Up @@ -88,4 +90,4 @@ def real_world(session: nox.Session) -> None:
*INSTALL_CMD,
"-r", "examples/real_world/requirements_test.txt",
)
session.run(*CMD, "examples/real_world/tests/")
session.run(*CMD, "examples/real_world/tests/")
3 changes: 3 additions & 0 deletions requirements/rq-200.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-r test.txt
rq==2.0.0
fakeredis==2.26.2
3 changes: 3 additions & 0 deletions requirements/rq-latest.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-r test.txt
rq==2.0.0
fakeredis==2.26.2
77 changes: 77 additions & 0 deletions src/dishka/integrations/rq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from collections.abc import Callable
from inspect import signature
from typing import Any, get_type_hints

from rq import Queue, Worker
from rq.job import Job

from dishka import Container
from dishka.integrations.base import default_parse_dependency


class DishkaWorker(Worker):
"""Custom RQ Worker class with Dishka DI support."""

def __init__(
self,
*args,
container: Container,
**kwargs,
) -> None:
"""Sets up class and container."""
super().__init__(*args, **kwargs)
self.dishka_container = container

def perform_job(self, job: Job, queue: Queue) -> bool:
"""Performs job call"""
request_container = self.dishka_container().__enter__()
self.inject_deps(job, request_container)
job_result = super().perform_job(job, queue)
request_container.close()
return job_result

def inject_deps(self, job: Job, container: Container) -> None:
"""Injects dependencies into using the Dishka container.

Args:
job: The RQ job to inject dependencies into.
"""
if job.func:
dependencies = self._build_dependencies(job.func)
updated_kwargs = self._build_kwargs(dependencies, container)
if isinstance(job.kwargs, dict):
job.kwargs.update(updated_kwargs)

def teardown(self) -> None:
"""Closes DI container on worker shutdown."""
self.dishka_container.close()
super().teardown()

@classmethod
def _build_dependencies(
cls, callable_: Callable[..., Any],
) -> dict[str, Any]:
"""Builds dependencies for the given callable."""
dependencies = {}

for name, parameter in signature(callable_).parameters.items():
dep = default_parse_dependency(
parameter,
get_type_hints(callable_, include_extras=True).get(name, Any),
)
if dep is None:
continue
dependencies[name] = dep

return dependencies

def _build_kwargs(
self,
dependencies: dict,
request_container: Container,
) -> dict[str, Any]:
"""Buld kwargs dict for RQ job run."""
return {
name: request_container.get(dep.type_hint, component=dep.component)
for name, dep in dependencies.items()
}
Empty file.
Loading