Skip to content

[FEATURE] fork/join pattern #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
accupham opened this issue Jan 15, 2025 · 3 comments
Open

[FEATURE] fork/join pattern #17

accupham opened this issue Jan 15, 2025 · 3 comments
Assignees
Labels
enhancement New feature or request

Comments

@accupham
Copy link

Was reading the docs and playing around with the library. Is there not an elegant abstraction for feeding multiple inputs (which can be executed independently and thus in parallel) into a function as part of the pipeline? I'm basically using asyncio.gather() for this here, but this isn't really an improvement over the regular python async.

flowchart TB
    Start((Start)) --> Range[range10]
    
    subgraph ForkJoin["Fork-Join Stage 10 workers"]
        direction TB
        Range --> |fork|P1[generate_name]
        Range --> |fork|P2[generate_age]
        Range --> |fork|P3[generate_writing_prompt]
        
        P1 --> Join((Join))
        P2 --> Join
        P3 --> Join
    end
    
    Join --> Story[generate_tiny_story<br/>10 workers]
    Story --> Print[Print Story]
Loading
import asyncio
from openai import AsyncOpenAI
from pyper import task

class LLMActivities:
    def __init__(self, model, base_url, api_key):
        self.model = model
        self.llm = AsyncOpenAI(
            base_url=base_url,
            api_key=api_key,
        )

    async def prompt(self, prompt, temp=0.0, seed=0):
        r = await self.llm.chat.completions.create(
            temperature=temp,
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            seed=seed,
        )
        return r.choices[0].message.content


    async def generate_name(self, seed):
        return await self.prompt(
            "Generate a random name. Only output the name.",
            temp=1.0,
            seed=seed,
        )
    async def generate_age(self, seed):
        return await self.prompt(
            "Generate a random age. Only output the output the age",
            temp=1.0,
            seed=seed,
        )

    async def generate_writing_prompt(self, seed):
        return await self.prompt(
            "Generate a simple writing prompt for a super short story.",
            temp=1.0,
            seed=seed,
        )

    async def generate_writing_inputs(self, seed):
        return await asyncio.gather(
            asyncio.create_task(self.generate_name(seed)),
            asyncio.create_task(self.generate_age(seed)),
            asyncio.create_task(self.generate_writing_prompt(seed)),
        )

    async def generate_tiny_story(self, inputs):
        name, age, writing_prompt = inputs
        return await self.prompt(
            prompt=(
                f"Given the following writing prompt, generate a super short story "
                f"with {name}, age {age} as the main character.\n\n"
                f"{writing_prompt}"
            ),
            temp=1.0,
        )


async def main():
    base_url = "http://10.0.3.4:4000"
    llm = LLMActivities("gpt-4o", base_url, "x")

    pipeline = (
        task(lambda: range(10), branch=True)
        | task(llm.generate_writing_inputs, workers=10)
        | task(llm.generate_tiny_story, workers=10)
    )

    async for result in pipeline():
        print(f"------- BEGIN STORY------")
        print(result)
        print(f"------- END STORY ------")


if __name__ == "__main__":
    asyncio.run(main())

Perhaps another abstraction, taskgroup might be elegant for this:

    from pyper import task, taskgroup

    pipeline = (
        task(lambda: range(10), branch=True)
        | taskgroup(
            task(llm.generate_name),
            task(llm.generate_age),
            task(llm.generate_writing_prompt)
        )
        | task(llm.generate_tiny_story, workers=10)
    )
    
    # the taskgroup would return a tuple of the input tasks, which is "*unpacked" into the parameters of the next function, `llm.generate_writing_prompt`.

The key advantage of this library IMO is that you can think about the entire flow of the pipeline at a glance without having to jump all over the IDE.


Syntax sugar like this should be possible too, because operator precedence for & is higher than |. This syntax is also inspired by unix's fork() abstraction.

    pipeline = (
        task(lambda: range(10), branch=True)
         & task(llm.generate_name
         & task(llm.generate_age)
         & task(llm.generate_writing_prompt)
        | task(llm.generate_tiny_story, workers=10)
    )

Kind of like this:

for i in {1..10}; do
    (generate_name & generate_age & generate_writing_prompt) | generate_story
done

Anyway, it's a cool library; thanks for making it!

@RichardZhu2 RichardZhu2 changed the title [QUESTION] What's the best way to implement the fork/join pattern? [FEATURE] fork/join pattern Jan 15, 2025
@RichardZhu2
Copy link
Collaborator

Great suggestion - 100% see the use for this feature, and agree that there's currently no built-in elegant way to do this. We'll have a brainstorm.

@pyper-dev pyper-dev added the enhancement New feature or request label Jan 19, 2025
@RichardZhu2
Copy link
Collaborator

RichardZhu2 commented Jan 25, 2025

I think I have some sensible abstractions in place, with a few more implementation details to refine-- at this stage it'd be nice to get some feedback on a proposed API.

A working version of the proposed changes is installable with:

pip install git+https://github.com/pyper-dev/pyper.git@ft-forking

Usage

  • Use task.fork to create a Fork which represents a set of independent tasks to be executed concurrently.
  • Use the & operator (syntactic sugar for an underlying Fork.attach method) to create a Fork of multiple tasks
from pyper import task, Fork, AsyncFork

def spam(x: int):
    return x + 1

fork1 = task.fork(spam)
assert isinstance(fork1, Fork)

async def ham(x: int):
    return x + 2

fork2 = task.fork(ham)
assert isinstance(fork2, AsyncFork)

fork3 = fork1 & fork2
# OR
fork3 = fork1.attach(fork2)
  • A Fork returns a list of gathered results
async def main():
    result = await fork3(0)
    assert result == [1, 2]
  • Use task in the normal way to incorporate a Fork into a Pipeline
  • The unpack parameter has been introduced to allow defining functions with multiple parameters to receive multiple arguments from the previous task.
    • This is an independent feature that works without Fork; the previous task must simply return a dict (unpacked as **kwargs) or list | tuple (unpacked as *args)

Example

import asyncio
import time

from pyper import task


async def subtask1(data: int):
    await asyncio.sleep(1)
    return data


def subtask2(data: int):
    time.sleep(1)
    return data * 2


def subtask3(data: int):
    for i in range(1, 10_000_000):
        _ = i * i
    return data * 3


def process_data(result1, result2, result3):
    return [result1, result2, result3]


async def main():
    pipeline = (
        task(range, branch=True)
        | task(
            task.fork(subtask1)  # Executed in asyncio.Task
            & task.fork(subtask2)  #  Executed in thread
            & task.fork(subtask3, multiprocess=True)  # Executed in process
        )
        | task(process_data, unpack=True)
    )
    async for result in pipeline(5):
        print(result)
        #> [0, 0, 0]
        #> [1, 2, 3]
        #> [2, 4, 6]
        #> [3, 6, 9]
        #> [4, 8, 12]


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Notes

  • The mode of execution for each subtask is independent and is determined in the usual way:
    • Asynchronous tasks are executed in an asyncio.Task
    • Synchronous functions are threaded, or multiprocessed if multiprocess=True is set
  • The task wrapping the fork takes the normal parameters -- for example, the second stage could have been run with multiple workers:
task(
    task.fork(subtask1)
    & task.fork(subtask2)
    & task.fork(subtask3, multiprocess=True),
    workers=5
)

Feedback

It'd be very useful get some comments on the following:

  • Are there any areas of the syntax that are unclear/unintuitive from the explanation above?
  • Are there any use cases you have which are not elegantly supported by this usage API?

Other general feedback also welcome obviously

@accupham
Copy link
Author

Wow, looks pretty good. I will try it out on a real use case and give feedback during this work week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

When branches are created from issues, their pull requests are automatically linked.

3 participants