Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demo: conversational pipeline - implementation 1 #324

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio

from rich import print as pprint

from ragbits.conversations.piepline.pipeline import ConversationPiepline
from ragbits.core.llms.litellm import LiteLLM


async def main() -> None:
"""
Example of using convcrsation pipeline
"""
llm = LiteLLM("gpt-4o")
pipeline = ConversationPiepline(llm)

question = "What is my favorite fruit?"
result = await pipeline.run(question)
pprint("[b][blue]The user asked:[/blue][b]")
pprint(question)
print()

pprint("[b][blue]The LLM generated the following response:[/blue][b]\n")
async for response in result.output_stream:
pprint(response, end="", flush=True)

pprint("\n\n[b][blue]The plugin metadata is:[/blue][b]\n")
pprint(result.plugin_metadata)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio

from rich import print as pprint

from ragbits.conversations.piepline.pipeline import ConversationPiepline
from ragbits.conversations.piepline.plugins import DocumentSearchRAGPlugin
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.llms.litellm import LiteLLM
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search._main import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta


async def _ingest_documents() -> DocumentSearch:
documents = [
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite fruit is pinaple.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite dessert is ice cream.
"""
),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
)

await document_search.ingest(documents)
return document_search


async def main() -> None:
"""
Example of using convcrsation pipeline
"""
llm = LiteLLM("gpt-4o")
document_search = await _ingest_documents()

pipeline = ConversationPiepline(
llm,
plugins=[
DocumentSearchRAGPlugin(document_search),
],
)
question = "What is my favorite fruit?"
result = await pipeline.run(question)
pprint("[b][blue]The user asked:[/blue][b]")
pprint(question)
print()

pprint("[b][blue]The LLM generated the following response:[/blue][b]\n")
async for response in result.output_stream:
pprint(response, end="", flush=True)

pprint("\n\n[b][blue]The plugin metadata is:[/blue][b]\n")
pprint(result.plugin_metadata)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio

from rich import print as pprint

from ragbits.conversations.piepline.pipeline import ConversationPiepline
from ragbits.conversations.piepline.plugins import AddHistoryPlugin, DocumentSearchRAGPlugin
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.llms.litellm import LiteLLM
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search._main import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta


async def _ingest_documents() -> DocumentSearch:
documents = [
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite fruit is pinaple.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite dessert is ice cream.
"""
),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
)

await document_search.ingest(documents)
return document_search


history = [
{"role": "user", "content": "Remember that whenever I talk about 'fruit', I mean 'dessert'. It's our secret code."},
{"role": "assistant", "content": "I understand. I will now interpret 'fruit' as 'dessert'."},
]


async def main() -> None:
"""
Example of using convcrsation pipeline
"""
llm = LiteLLM("gpt-4o")
document_search = await _ingest_documents()

pipeline = ConversationPiepline(
llm,
plugins=[
DocumentSearchRAGPlugin(document_search),
AddHistoryPlugin(history),
],
)
question = "What is my favorite fruit?"
result = await pipeline.run(question)
pprint("[b][blue]The user asked:[/blue][b]")
pprint(question)
print()

pprint("[b][blue]The LLM generated the following response:[/blue][b]\n")
async for response in result.output_stream:
pprint(response, end="", flush=True)

pprint("\n\n[b][blue]The plugin metadata is:[/blue][b]\n")
pprint(result.plugin_metadata)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio

from rich import print as pprint

from ragbits.conversations.piepline.pipeline import ConversationPiepline
from ragbits.conversations.piepline.plugins import AddHistoryPlugin, DocumentSearchRAGPlugin, HistoryCompressionPlugin
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.llms.litellm import LiteLLM
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search._main import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta


async def _ingest_documents() -> DocumentSearch:
documents = [
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite fruit is pinaple.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite dessert is ice cream.
"""
),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
)

await document_search.ingest(documents)
return document_search


history = [
{"role": "user", "content": "Remember that whenever I talk about 'fruit', I mean 'dessert'. It's our secret code."},
{"role": "assistant", "content": "I understand. I will now interpret 'fruit' as 'dessert'."},
]


async def main() -> None:
"""
Example of using convcrsation pipeline
"""
llm = LiteLLM("gpt-4o")
document_search = await _ingest_documents()

pipeline = ConversationPiepline(
llm,
plugins=[
DocumentSearchRAGPlugin(document_search),
AddHistoryPlugin(history),
HistoryCompressionPlugin(llm),
],
)
question = "What is my favorite fruit?"
result = await pipeline.run(question)
pprint("[b][blue]The user asked:[/blue][b]")
pprint(question)
print()

pprint("[b][blue]The LLM generated the following response:[/blue][b]\n")
async for response in result.output_stream:
pprint(response, end="", flush=True)

pprint("\n\n[b][blue]The plugin metadata is:[/blue][b]\n")
pprint(result.plugin_metadata)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import asyncio

from rich import print as pprint

from ragbits.conversations.piepline.pipeline import ConversationPiepline
from ragbits.conversations.piepline.plugins import (
AddHistoryPlugin,
CensorCreamPlugin,
DocumentSearchRAGPlugin,
HistoryCompressionPlugin,
)
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.llms.litellm import LiteLLM
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search._main import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta


async def _ingest_documents() -> DocumentSearch:
documents = [
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite fruit is pinaple.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
The user's favorite dessert is ice cream.
"""
),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
)

await document_search.ingest(documents)
return document_search


history = [
{"role": "user", "content": "Remember that whenever I talk about 'fruit', I mean 'dessert'. It's our secret code."},
{"role": "assistant", "content": "I understand. I will now interpret 'fruit' as 'dessert'."},
]


async def main() -> None:
"""
Example of using convcrsation pipeline
"""
llm = LiteLLM("gpt-4o")
document_search = await _ingest_documents()

pipeline = ConversationPiepline(
llm,
plugins=[
DocumentSearchRAGPlugin(document_search),
AddHistoryPlugin(history),
HistoryCompressionPlugin(llm),
CensorCreamPlugin(),
],
)
question = "What is my favorite fruit?"
result = await pipeline.run(question)
pprint("[b][blue]The user asked:[/blue][b]")
pprint(question)
print()

pprint("[b][blue]The LLM generated the following response:[/blue][b]\n")
async for response in result.output_stream:
pprint(response, end="", flush=True)

pprint("\n\n[b][blue]The plugin metadata is:[/blue][b]\n")
pprint(result.plugin_metadata)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from collections.abc import Sequence

from ragbits.conversations.piepline.plugins import ConversationPipelinePlugin
from ragbits.conversations.piepline.state import ConversationPipelineResult, ConversationPipelineState
from ragbits.conversations.piepline.state_to_chat import DefaultStateToChatConverter, StateToChatConverter
from ragbits.core.llms.base import LLM


class ConversationPiepline:
"""
Class that runs a conversation pipeline with the given plugins
"""

def __init__(
self,
llm: LLM,
plugins: Sequence[ConversationPipelinePlugin] = [],
state_to_chat: StateToChatConverter | None = None,
) -> None:
self.llm = llm
self.plugins = plugins
self.state_to_chat = state_to_chat or DefaultStateToChatConverter()

async def _process_state(self, state: ConversationPipelineState) -> ConversationPipelineState:
for plugin in self.plugins:
state = await plugin.process_state(state)
return state

async def _process_result(self, result: ConversationPipelineResult) -> ConversationPipelineResult:
for plugin in reversed(self.plugins):
result = await plugin.process_result(result)
return result

async def run(
self,
input: ConversationPipelineState | str,
) -> ConversationPipelineResult:
"""
Runs the conversation pipeline with the given input.
"""
# Create and the state to proccess it
state = ConversationPipelineState(user_question=input) if isinstance(input, str) else input
state = await self._process_state(state)

# Convert the state to chat conversation
chat = await self.state_to_chat.convert(state)

# Create output stream from the LLM
stream = self.llm.generate_streaming(chat)

# Create the result and apply plugins
result = ConversationPipelineResult(plugin_metadata=state.plugin_metadata, output_stream=stream)
result = await self._process_result(result)

return result
Loading
Loading