-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RQ integration #333
Open
prepin
wants to merge
7
commits into
reagento:develop
Choose a base branch
from
prepin:rq-integration
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
RQ integration #333
Changes from 5 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
45ca190
Implemented RQ integration
prepin 5945e53
Add RQ integration to nox test suite
prepin 95771de
Add example for RQ integration
prepin dfdbf25
Add documentation for RQ integration
prepin e1761db
Restores original formatting of noxfile
prepin 8c33ca4
Remove patching Worker from unit tests
prepin ed39c57
Update tests for request-level job
prepin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
.. _rq: | ||
|
||
rq | ||
=========================================== | ||
|
||
Though it is not required, you can use dishka-rq integration. It features: | ||
|
||
* automatic REQUEST and SESSION scope management using Worker subclass | ||
* automatic injection of dependencies into job function. | ||
|
||
How to use | ||
**************** | ||
|
||
1. Create provider and container as usual. | ||
|
||
.. code-block:: python | ||
|
||
class StrProvider(Provider): | ||
@provide(scope=Scope.REQUEST) | ||
def hello(self) -> str: | ||
return "Hello" | ||
|
||
provider = StrProvider() | ||
container = make_container(provider) | ||
|
||
2. Import. | ||
|
||
.. code-block:: python | ||
|
||
from dishka import FromDishka | ||
|
||
3. Mark those of your job functions parameters which are to be injected with ``FromDishka[]`` | ||
|
||
.. code-block:: python | ||
|
||
def hello_world(hello: FromDishka[str]): | ||
return f"{hello} world!" | ||
|
||
4. Run you worker using your container and DishkaWorker subclass. | ||
|
||
.. code-block:: python | ||
|
||
conn = Redis() | ||
queues = ["default"] | ||
worker = DishkaWorker(container=container, queues=queues, connection=conn) | ||
worker.work(with_scheduler=True) | ||
|
||
.. code-block:: shell | ||
|
||
python run_worker.py |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from time import sleep | ||
|
||
from redis import Redis | ||
from rq import Queue | ||
from tasks import hello_world | ||
|
||
if __name__ == "__main__": | ||
connection = Redis() | ||
queue = Queue( | ||
name="default", | ||
connection=connection, | ||
) | ||
job = queue.enqueue(hello_world) | ||
|
||
res = job.result | ||
while not res: | ||
res = job.result | ||
sleep(1) | ||
print(res) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from redis import Redis | ||
|
||
from dishka import Provider, Scope, make_container, provide | ||
from dishka.integrations.rq import DishkaWorker | ||
|
||
|
||
class StrProvider(Provider): | ||
@provide(scope=Scope.REQUEST) | ||
def hello(self) -> str: | ||
return "Hello" | ||
|
||
|
||
def setup_worker() -> DishkaWorker: | ||
provider = StrProvider() | ||
container = make_container(provider) | ||
queues = ["default"] | ||
conn = Redis() | ||
worker = DishkaWorker(container=container, queues=queues, connection=conn) | ||
return worker | ||
|
||
|
||
if __name__ == "__main__": | ||
worker = setup_worker() | ||
worker.work(with_scheduler=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from dishka import FromDishka | ||
|
||
|
||
def hello_world(hello: FromDishka[str]): | ||
return f"{hello} world!" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
-r test.txt | ||
rq==2.0.0 | ||
fakeredis==2.26.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
-r test.txt | ||
rq==2.0.0 | ||
fakeredis==2.26.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
from collections.abc import Callable | ||
from inspect import signature | ||
from typing import Any, get_type_hints | ||
|
||
from rq import Queue, Worker | ||
from rq.job import Job | ||
|
||
from dishka import Container | ||
from dishka.integrations.base import default_parse_dependency | ||
|
||
|
||
class DishkaWorker(Worker): | ||
"""Custom RQ Worker class with Dishka DI support.""" | ||
|
||
def __init__( | ||
self, | ||
*args, | ||
container: Container, | ||
**kwargs, | ||
) -> None: | ||
"""Sets up class and container.""" | ||
super().__init__(*args, **kwargs) | ||
self.dishka_container = container | ||
|
||
def perform_job(self, job: Job, queue: Queue) -> bool: | ||
"""Performs job call""" | ||
request_container = self.dishka_container().__enter__() | ||
self.inject_deps(job, request_container) | ||
job_result = super().perform_job(job, queue) | ||
request_container.close() | ||
return job_result | ||
|
||
def inject_deps(self, job: Job, container: Container) -> None: | ||
"""Injects dependencies into using the Dishka container. | ||
|
||
Args: | ||
job: The RQ job to inject dependencies into. | ||
""" | ||
if job.func: | ||
dependencies = self._build_dependencies(job.func) | ||
updated_kwargs = self._build_kwargs(dependencies, container) | ||
if isinstance(job.kwargs, dict): | ||
job.kwargs.update(updated_kwargs) | ||
|
||
def teardown(self) -> None: | ||
"""Closes DI container on worker shutdown.""" | ||
self.dishka_container.close() | ||
super().teardown() | ||
|
||
@classmethod | ||
def _build_dependencies( | ||
cls, callable_: Callable[..., Any], | ||
) -> dict[str, Any]: | ||
"""Builds dependencies for the given callable.""" | ||
dependencies = {} | ||
|
||
for name, parameter in signature(callable_).parameters.items(): | ||
dep = default_parse_dependency( | ||
parameter, | ||
get_type_hints(callable_, include_extras=True).get(name, Any), | ||
) | ||
if dep is None: | ||
continue | ||
dependencies[name] = dep | ||
|
||
return dependencies | ||
|
||
def _build_kwargs( | ||
self, | ||
dependencies: dict, | ||
request_container: Container, | ||
) -> dict[str, Any]: | ||
"""Buld kwargs dict for RQ job run.""" | ||
return { | ||
name: request_container.get(dep.type_hint, component=dep.component) | ||
for name, dep in dependencies.items() | ||
} |
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the only way to setup integration? How like is it that user has their own custom worker class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible, but I'm not sure how severe the use of custom workers is. The official documentation suggests some cases such as running on Heroku or running a worker in the same thread as the main app (in case someone needs to run a Worker within a test suite).
Another option besides subclassing Worker is to patch the Worker class at runtime, as they did in the Sentry RQ integration. This allows for any custom Worker subclasses, but it's still not the cleanest way to plug in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to reopen this discussion. For all integrations we have
@inject
decorator which actually does injection. This is required to have full control on when it is done (some users implement their own decorators but still have scope control in framework, or just can test it per-funcion). For some of them we also have "autoinjection"-mode, which uses less straightforward approach which allows to skip manually placing inject decorator, but is usually more dangerous or less flexible.Can we implement same approach here? Answer
no, I've checked
is acceptableThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I don't see how to implement this without rewriting core RQ logic upstream. RQ is retrieving job function via
getattr(module,'func_name')
. That returns original undecorated function.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works for me:
rq log:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with another idea on how to implement such integration without monkeypatching or subclassing original worker (at least for cases without autoinjection).
What if instead of attaching container to Worker instance — we wrap it inside
@inject
decorator itself?E.g.
setup_dishka(...)
on startup, we are creating injectorDo you see any downsides of such implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. In this case we need global container to be created before tasks and indirectly imported to any of them. I can't say I like this approach more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we somehow access rq app inside inject, so that we can get container from there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, there is no such thing as Celery's
task.app.conf
or other kind of context object. Task can only access it's ownJob
object which does not provide any references to global scope.So third option is go Sentry's way and monkeypatch worker
In that case patched function will inject
Request
scope container into eachJob
instance from which it can be accessed by@inject
decorator.I guess in that case auto inject behavior can also be controlled by flag passed to
setup_dishka()
, like that:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, looks like we have not much options. I can agree with currently implemented one.