-
Notifications
You must be signed in to change notification settings - Fork 24
[FEATURE] fork/join pattern #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Great suggestion - 100% see the use for this feature, and agree that there's currently no built-in elegant way to do this. We'll have a brainstorm. |
I think I have some sensible abstractions in place, with a few more implementation details to refine-- at this stage it'd be nice to get some feedback on a proposed API. A working version of the proposed changes is installable with:
Usage
from pyper import task, Fork, AsyncFork
def spam(x: int):
return x + 1
fork1 = task.fork(spam)
assert isinstance(fork1, Fork)
async def ham(x: int):
return x + 2
fork2 = task.fork(ham)
assert isinstance(fork2, AsyncFork)
fork3 = fork1 & fork2
# OR
fork3 = fork1.attach(fork2)
async def main():
result = await fork3(0)
assert result == [1, 2]
Exampleimport asyncio
import time
from pyper import task
async def subtask1(data: int):
await asyncio.sleep(1)
return data
def subtask2(data: int):
time.sleep(1)
return data * 2
def subtask3(data: int):
for i in range(1, 10_000_000):
_ = i * i
return data * 3
def process_data(result1, result2, result3):
return [result1, result2, result3]
async def main():
pipeline = (
task(range, branch=True)
| task(
task.fork(subtask1) # Executed in asyncio.Task
& task.fork(subtask2) # Executed in thread
& task.fork(subtask3, multiprocess=True) # Executed in process
)
| task(process_data, unpack=True)
)
async for result in pipeline(5):
print(result)
#> [0, 0, 0]
#> [1, 2, 3]
#> [2, 4, 6]
#> [3, 6, 9]
#> [4, 8, 12]
if __name__ == "__main__":
import asyncio
asyncio.run(main()) Notes
task(
task.fork(subtask1)
& task.fork(subtask2)
& task.fork(subtask3, multiprocess=True),
workers=5
) FeedbackIt'd be very useful get some comments on the following:
Other general feedback also welcome obviously |
Wow, looks pretty good. I will try it out on a real use case and give feedback during this work week. |
Was reading the docs and playing around with the library. Is there not an elegant abstraction for feeding multiple inputs (which can be executed independently and thus in parallel) into a function as part of the pipeline? I'm basically using
asyncio.gather()
for this here, but this isn't really an improvement over the regular python async.Perhaps another abstraction,
taskgroup
might be elegant for this:The key advantage of this library IMO is that you can think about the entire flow of the pipeline at a glance without having to jump all over the IDE.
Syntax sugar like this should be possible too, because operator precedence for
&
is higher than|
. This syntax is also inspired by unix'sfork()
abstraction.Kind of like this:
Anyway, it's a cool library; thanks for making it!
The text was updated successfully, but these errors were encountered: