diff --git a/datachecks/__init__.py b/datachecks/__init__.py index 3bede1d..a33adec 100644 --- a/datachecks/__init__.py +++ b/datachecks/__init__.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel +from pydantic import BaseModel, Field, root_validator from typing import Union, Optional # Embedding config @@ -11,6 +11,8 @@ class LanceDBConfig(BaseModel): loc: Optional[str] = "" class MongoDBConfig(BaseModel): + + index: Optional[str] = "" uri: Optional[str] = "" db: Optional[str] = "" collection_name: Optional[str] = "" @@ -22,8 +24,18 @@ class ProviderConfig(BaseModel): overlapping:int worker:int similarity_top_k:int - rag: Union[LanceDBConfig,MongoDBConfig] + rag: Union[MongoDBConfig, LanceDBConfig] = Field(union_mode="left_to_right") + @root_validator(pre=True) + def check_config_type(cls, values): + config = values.get('rag') + if isinstance(config, dict): + if 'uri' in config and 'collection_name' in config: + values['rag'] = MongoDBConfig(**config) + else: + values['rag'] = LanceDBConfig(**config) + return values + # Rag Config class RAGConfig(BaseModel): provider:str diff --git a/ingestion_server/__init__.py b/ingestion_server/__init__.py deleted file mode 100644 index 374fff0..0000000 --- a/ingestion_server/__init__.py +++ /dev/null @@ -1,177 +0,0 @@ -from ingestion_server.embeddings import EmbedProviders -from ingestion_server.rags import RAGProviders, BaseRAG -from ingestion_server.datachecks import RAGConfig, RAGTask, RAGTaskStatus -import asyncio -from threading import Thread -from ingestion_server.utils import configure_logger -from typing import Dict -from uuid import uuid4 -import tempfile -import os - -logger = configure_logger(__name__) - -class RAG: - """ - Retrieval-Augmented Generation (RAG) implementation. - - This class handles the ingestion and storage of documents for RAG. - """ - - def __init__(self, VectorDB: BaseRAG, Workers: int = 2) -> None: - """ - Initialize the RAG instance. - - Args: - VectorDB (BaseRAG): The vector database to use. - Workers (int, optional): Number of worker threads. Defaults to 2. - """ - self.file_process_task_queue: asyncio.Queue = asyncio.Queue() - self.file_store_task_queue: asyncio.Queue = asyncio.Queue() - - self.VectorDB: BaseRAG = VectorDB - self.Workers: int = Workers - - self.RAG_THREAD = Thread(target=self.start) - self.shutdown = False - - async def __shutdown_loop(self): - """Monitor the shutdown flag.""" - while not self.shutdown: - await asyncio.sleep(0.5) - - async def __ingestion_task(self): - """Process ingestion tasks from the queue.""" - while not self.shutdown: - task: RAGTask = await self.file_process_task_queue.get() - task._status = RAGTaskStatus.PROCESSING - try: - nodes = await self.VectorDB.generate_nodes_sentence_splitter(task.file_loc) - except Exception as e: - logger.error(f"ERROR in {e}") - task._status = RAGTaskStatus.ERROR - continue - task._nodes = nodes - await self.file_store_task_queue.put(task) - - async def __nodes_storage(self): - """Store processed nodes in the vector database.""" - while not self.shutdown: - task: RAGTask = await self.file_store_task_queue.get() - try: - index = await self.VectorDB.add_index(task._nodes) - except Exception as e: - logger.error(f"ERROR in {e}") - task._status = RAGTaskStatus.ERROR - continue - task._index = index - task._status = RAGTaskStatus.SUCESSFUL - - def start(self): - """Start the RAG processing loop.""" - loop = asyncio.new_event_loop() - ingestion_task_pool = [loop.create_task(self.__ingestion_task()) for _ in range(self.Workers)] - file_storage = loop.create_task(self.__nodes_storage()) - loop.run_until_complete(self.__shutdown_loop()) - file_storage.cancel() - for t in ingestion_task_pool: - t.cancel() - loop.close() - - -class RAGFactory: - """ - Factory class for creating and managing RAG instances. - """ - - def __init__(self) -> None: - """Initialize the RAGFactory.""" - self.RAGS: Dict[str, RAG] = dict() - - def make_rag(self, config: RAGConfig): - """ - Create a new RAG instance. - - Args: - config (RAGConfig): Configuration for the RAG instance. - - Returns: - str: Unique identifier for the created RAG instance. - """ - rag_name = f"RAG-{uuid4()}" - embedding_name = EmbedProviders[config.provider_config.embedding_name.provider](config.provider_config.embedding_name.embedding_model_name) - vector_db = RAGProviders[config.provider](embedding_name, config.provider_config) - rag = RAG(vector_db, config.provider_config.worker) - rag.RAG_THREAD.start() - self.RAGS[rag_name] = rag - return rag_name - - def stop_all(self): - """Stop all RAG instances.""" - for rag in self.RAGS.values(): - rag.shutdown = True - - def stop(self, rag_name: str): - """ - Stop a specific RAG instance. - - Args: - rag_name (str): Identifier of the RAG instance to stop. - - Raises: - ValueError: If the specified RAG instance doesn't exist. - """ - if rag_name in self.RAGS.keys(): - self.RAGS[rag_name].shutdown = True - self.RAGS.pop(rag_name) - else: - raise ValueError("No RAG with that ID exists") - - async def file_ingest(self, rag_name, file) -> RAGTask: - """ - Ingest a file into a RAG instance. - - Args: - rag_name (str): Identifier of the RAG instance. - file: File object to ingest. - - Returns: - RAGTask: Task object representing the ingestion process. - - Raises: - ValueError: If the specified RAG instance doesn't exist or if the file type is unsupported. - """ - if rag_name not in self.RAGS.keys(): - raise ValueError(f"RAG: {rag_name} does not exist") - if file.content_type not in ["application/pdf", "application/x-pdf"]: - raise ValueError("Only PDF files are supported for now") - - task_id = str(uuid4()) - temp_file = tempfile.NamedTemporaryFile() - temp_file.write(await file.read()) - prev = temp_file.name - file_name = f"/tmp/{task_id}.pdf" - os.rename(prev, file_name) - task = RAGTask(file_loc=file_name) - await self.RAGS[rag_name].file_process_task_queue.put(task) - - while task._status in [RAGTaskStatus.WAIT, RAGTaskStatus.PROCESSING]: - await asyncio.sleep(0.4) - - os.rename(file_name, prev) - return task - - async def retrieve_query(self, rag_name: str, index: str, query: str): - """ - Retrieve documents based on a query. - - Args: - rag_name (str): Identifier of the RAG instance. - index (str): Index to search in. - query (str): Query string. - - Returns: - List of relevant documents. - """ - rag = self.RAGS[rag_name] - return await rag.VectorDB.get_docs_index(query=query, index=index) \ No newline at end of file diff --git a/ingestion_server/datachecks/__init__.py b/ingestion_server/datachecks/__init__.py deleted file mode 100644 index 7b0b27f..0000000 --- a/ingestion_server/datachecks/__init__.py +++ /dev/null @@ -1,49 +0,0 @@ -from pydantic import BaseModel -from typing import Union, Optional - -# Embedding config -class embeddings(BaseModel): - provider:str - embedding_model_name:Optional[str] = "" - -# DB Configs -class LanceDBConfig(BaseModel): - loc: Optional[str] = "" - -class MongoDBConfig(BaseModel): - uri: Optional[str] = "" - db: Optional[str] = "" - collection_name: Optional[str] = "" - -# Provider Configs -class ProviderConfig(BaseModel): - embedding_name:embeddings - chunk_size:int - overlapping:int - worker:int - similarity_top_k:int - rag: Union[LanceDBConfig,MongoDBConfig] - -# Rag Config -class RAGConfig(BaseModel): - provider:str - provider_config:ProviderConfig - -class Query(BaseModel): - provider:str - index:str - query:str - -# Utility checks for RAG -class RAGTaskStatus: - WAIT = "WAITING" - PROCESSING = "PROCESSING" - ERROR = "ERROR" - SUCESSFUL = "SUCESSFUL" - -class RAGTask(BaseModel): - file_loc:str - _status:str = RAGTaskStatus.WAIT - _message:str = "" - _index:str = "" - _nodes:list = [] diff --git a/ingestion_server/embeddings/__init__.py b/ingestion_server/embeddings/__init__.py deleted file mode 100644 index f1dc8ff..0000000 --- a/ingestion_server/embeddings/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Dict - -from ingestion_server.embeddings.openai_emded import OpenAI -from ingestion_server.embeddings.base import BaseEmbed - -# EmbedProviders dictionary with the key being the name of the provider and the value being the class of the provider -EmbedProviders : Dict[str, BaseEmbed] = { - "OpenAI": OpenAI -} \ No newline at end of file diff --git a/ingestion_server/embeddings/base.py b/ingestion_server/embeddings/base.py deleted file mode 100644 index 37b4dd9..0000000 --- a/ingestion_server/embeddings/base.py +++ /dev/null @@ -1,10 +0,0 @@ -from abc import ABC, abstractmethod - -class BaseEmbed(ABC): - def __init__(self, name: str) -> None: - self.name = name - - @abstractmethod - def get_embedding(self): - """Method to retrieve the embedding representation.""" - raise NotImplementedError("Subclasses must implement this method.") \ No newline at end of file diff --git a/ingestion_server/embeddings/openai_emded.py b/ingestion_server/embeddings/openai_emded.py deleted file mode 100644 index d6677d6..0000000 --- a/ingestion_server/embeddings/openai_emded.py +++ /dev/null @@ -1,31 +0,0 @@ -from ingestion_server.embeddings.base import BaseEmbed -from llama_index.embeddings.openai import OpenAIEmbedding -import dotenv -import os - -dotenv.load_dotenv() - -class OpenAI(BaseEmbed): - def __init__(self, model: str) -> None: - """Initialize the OpenAI embedding provider. - - Args: - model (str): The model name to be used for embeddings. - - Raises: - ValueError: If the OpenAI API key is not found in the environment variables. - """ - super().__init__("OpenAI") - self.model = model - api_key = os.getenv("OPENAI_API_KEY") - if api_key is None: - raise ValueError("OPENAI KEY IS NOT FOUND") - self.embedding_instance = OpenAIEmbedding(model=model, api_key=api_key) - - def get_embedding(self): - """Retrieve the embedding instance. - - Returns: - OpenAIEmbedding: The instance of the OpenAI embedding. - """ - return self.embedding_instance \ No newline at end of file diff --git a/ingestion_server/main.py b/ingestion_server/main.py deleted file mode 100644 index d78deb7..0000000 --- a/ingestion_server/main.py +++ /dev/null @@ -1,119 +0,0 @@ -from fastapi import FastAPI, UploadFile -from ingestion_server.datachecks import RAGConfig -from ingestion_server import RAGFactory -import uvicorn -import time -from typing import Dict -import dotenv - -dotenv.load_dotenv() -DB: Dict[str, RAGConfig] = {} - -rag_factory = RAGFactory() -app = FastAPI() - - -@app.get("/") -def heartbeat() -> float: - """Health check endpoint that returns the current server time.""" - return time.time() - - -@app.post("/create-rag") -def create_rag(request: RAGConfig) -> Dict[str, str]: - """Create a RAG configuration and return its ID. - - Args: - request (RAGConfig): The RAG configuration to create. - - Returns: - Dict[str, str]: A dictionary containing the created RAG ID. - """ - print(request) - rag_id = rag_factory.make_rag(request) - return {"rag_id": rag_id} - - -@app.post("/rag-upload-file/{rag_id}") -async def rag_upload_file(file: UploadFile, rag_id: str) -> Dict[str, str]: - """Upload a file for a specific RAG ID. - - Args: - file (UploadFile): The file to upload. - rag_id (str): The ID of the RAG to associate with the file. - - Returns: - Dict[str, str]: A dictionary containing the upload status and index. - """ - try: - task = await rag_factory.file_ingest(rag_name=rag_id, file=file) - return {"index": task._index, "status": task._status, "message": "DONE"} - except Exception as e: - return {"index": None, "status": "ERROR", "message": f"{e}"} - - -@app.get("/rag-retrive/{rag_id}/{index}") -async def rag_retrive(query: str, rag_id: str, index: str) -> list: - """Retrieve documents based on a query for a specific RAG ID and index. - - Args: - query (str): The query string to search for. - rag_id (str): The ID of the RAG to search in. - index (str): The index to search in. - - Returns: - list: A list of documents matching the query. - """ - docs = await rag_factory.retrive_query(rag_name=rag_id, index=index, query=query) - send_filter = [{"text": node.text, "score": node.score} for node in docs] - return send_filter - -@app.post("/make-rag") -async def make_rag( - file: UploadFile = File(...), - config: str = Form(...) -) -> Dict[str, Any]: - """ - Create a RAG configuration, return its ID, and ingest the uploaded file. - - Args: - file (UploadFile): The file to upload and ingest. - config (str): The RAG configuration as a JSON string. - - Returns: - Dict[str, Any]: A dictionary containing the created RAG ID, upload status, and index. - """ - try: - # Parse the JSON string into a RAGConfig object - rag_config = RAGConfig.parse_raw(config) - - # Create RAG configuration - rag_id = rag_factory.make_rag(rag_config) - - # Ingest the file - task = await rag_factory.file_ingest(rag_name=rag_id, file=file) - - return { - "rag_id": rag_id, - "index": task._index, - "status": task._status, - "message": "RAG created and file ingested successfully" - } - except json.JSONDecodeError: - return { - "rag_id": None, - "index": None, - "status": "ERROR", - "message": "Invalid JSON in config parameter" - } - except Exception as e: - return { - "rag_id": None, - "index": None, - "status": "ERROR", - "message": f"Error creating RAG or ingesting file: {str(e)}" - } - - -if __name__ == "__main__": - uvicorn.run("main:app", port=8000, host="0.0.0.0", reload=True) \ No newline at end of file diff --git a/ingestion_server/rags/__init__.py b/ingestion_server/rags/__init__.py deleted file mode 100644 index d237f53..0000000 --- a/ingestion_server/rags/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from ingestion_server.rags.lancedb_rag import LanceDB -from ingestion_server.rags.mongoDB_rag import MongoDB -from ingestion_server.rags.base import BaseRAG - -from typing import Dict - -RAGProviders : Dict[str, BaseRAG] = { - "LanceDB": LanceDB, - "MongoDB": MongoDB -} \ No newline at end of file diff --git a/ingestion_server/rags/base.py b/ingestion_server/rags/base.py deleted file mode 100644 index 4a07ee7..0000000 --- a/ingestion_server/rags/base.py +++ /dev/null @@ -1,156 +0,0 @@ -import os -import dotenv -from uuid import uuid4 - -from llama_parse import LlamaParse -from llama_index.core.node_parser import ( - MarkdownElementNodeParser, - SentenceSplitter, - TextSplitter -) -from llama_index.llms.openai import OpenAI -from llama_index.core import Settings - -from ingestion_server.embeddings import BaseEmbed -from ingestion_server.utils import configure_logger - - -dotenv.load_dotenv() - -logger = configure_logger(__name__) - -class BaseRAG: - """ - Base class for Retrieval-Augmented Generation (RAG) systems. - - Attributes: - provider (str): The provider for the RAG system. - base_embed (BaseEmbed): The embedding model used. - embeding_model: The actual embedding model instance. - chunk_size (int): Size of the chunks for splitting documents. - overlapping (int): Overlap size for chunking. - LLAMA_CLOUD (str): API key for Llama Cloud. - parse (LlamaParse): Instance of LlamaParse for data parsing. - OPENAI_API_KEY (str): API key for OpenAI. - llm (OpenAI): Instance of OpenAI model. - """ - - def __init__(self, provider: str, embedding_name: BaseEmbed, chunk_size: int, overlapping: int) -> None: - """ - Initializes the BaseRAG instance with the specified parameters. - - Args: - provider (str): The provider for the RAG system. - embedding_name (BaseEmbed): The embedding model used. - chunk_size (int): Size of the chunks for splitting documents. - overlapping (int): Overlap size for chunking. - - Raises: - ValueError: If required environment variables are not set. - """ - self.provider = provider - self.base_embed: BaseEmbed = embedding_name - self.embeding_model = self.base_embed.get_embedding() - self.chunk_size = chunk_size - self.overlapping = overlapping - self.LLAMA_CLOUD = os.getenv("LLAMA_CLOUD_API_KEY") - - if self.LLAMA_CLOUD is None: - raise ValueError("LLAMA_CLOUD_API_KEY is not set in .env") - - self.parse = LlamaParse(api_key=self.LLAMA_CLOUD, result_type="markdown") - - self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") - - if self.OPENAI_API_KEY is None: - raise ValueError("OPENAI_API_KEY is not set in .env") - - self.llm = OpenAI(model="gpt-3.5-turbo", temperature=0.2, api_key=self.OPENAI_API_KEY) - - Settings.embed_model = self.embeding_model - Settings.llm = self.llm - - def generate_index_name(self) -> str: - """Generates a unique index name using UUID. - - Returns: - str: A unique index name. - """ - return str(uuid4()) - - async def generate_nodes_sentence_splitter(self, file_loc: str): - """Generates nodes using a sentence splitter. - - Args: - file_loc (str): The file location to load data from. - - Returns: - nodes: The generated nodes after processing. - """ - docs = await self.parse.aload_data(file_path=file_loc) - node_parser = MarkdownElementNodeParser(num_workers=8, llm=self.llm) - nodes = await node_parser.aget_nodes_from_documents(docs) - nodes, _ = node_parser.get_nodes_and_objects(nodes) - nodes = await SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.overlapping).aget_nodes_from_documents(nodes) - return nodes - - async def generate_nodes_text_splitter(self, file_loc: str): - """Generates nodes using a text splitter. - - Args: - file_loc (str): The file location to load data from. - - Returns: - nodes: The generated nodes after processing. - """ - docs = await self.parse.aload_data(file_path=file_loc) - node_parser = MarkdownElementNodeParser(num_workers=8, llm=self.llm) - nodes = await node_parser.aget_nodes_from_documents(docs) - nodes, _ = node_parser.get_nodes_and_objects(nodes) - nodes = await TextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.overlapping).aget_nodes_from_documents(nodes) - return nodes - - async def append_index(self, nodes) -> str: - """Appends nodes to the existing index. - - Args: - nodes: The nodes to append. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError - - async def add_index(self, nodes) -> str: - """Adds nodes to the index. - - Args: - nodes: The nodes to add. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError - - async def delete_index(self, index: str) -> bool: - """Deletes an index. - - Args: - index (str): The index to delete. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError - - async def get_docs_index(self, query: str, index: str): - """Retrieves documents from the index based on a query. - - Args: - query (str): The query to search for. - index (str): The index to search in. - - Raises: - NotImplementedError: This method should be implemented in subclasses. - """ - raise NotImplementedError \ No newline at end of file diff --git a/ingestion_server/rags/lancedb_rag.py b/ingestion_server/rags/lancedb_rag.py deleted file mode 100644 index ecf8507..0000000 --- a/ingestion_server/rags/lancedb_rag.py +++ /dev/null @@ -1,90 +0,0 @@ -from typing import Any, Coroutine -from ingestion_server.embeddings.base import BaseEmbed -from ingestion_server.rags.base import BaseRAG -from llama_index.vector_stores.lancedb import LanceDBVectorStore -from llama_index.core import VectorStoreIndex, StorageContext -from ingestion_server.datachecks import ProviderConfig, LanceDBConfig -from llama_index.core.retrievers import VectorIndexRetriever - -class LanceDB(BaseRAG): - """ - LanceDB class for managing vector storage and retrieval using LanceDB. - - Attributes: - similarity_top_k (int): Number of top similar items to retrieve. - config (LanceDBConfig): Configuration for LanceDB. - loc (str): Location for the vector database. - path (str): Path to the vector database data. - """ - - def __init__(self, embedding_name: BaseEmbed, config: ProviderConfig) -> None: - """ - Initializes the LanceDB instance. - - Args: - embedding_name (BaseEmbed): The embedding model used. - config (ProviderConfig): Configuration for the provider. - """ - super().__init__("LanceDB", embedding_name, config.chunk_size, config.overlapping) - self.similarity_top_k = config.similarity_top_k - self.config: LanceDBConfig = config.rag - self.loc = self.config.loc - self.path = f"vectordb_data/{self.loc}" - - async def append_index(self, nodes) -> Coroutine[Any, Any, str]: - """Appends nodes to the existing index. - - Args: - nodes: The nodes to append. - - Returns: - Coroutine: A coroutine that returns None. - """ - return None - - async def add_index(self, nodes) -> str: - """Adds nodes to the index and creates a new table. - - Args: - nodes: The nodes to add. - - Returns: - str: The name of the created table. - """ - table_name = self.generate_index_name() - # TODO: add reranking in the DB - vector_store = LanceDBVectorStore(self.path, table_name=table_name) - storage_context = StorageContext.from_defaults(vector_store=vector_store) - vector_index = VectorStoreIndex(nodes=nodes, storage_context=storage_context, embed_model=self.embeding_model) - return table_name - - async def delete_index(self, index: str) -> bool: - """Deletes an index. - - Args: - index (str): The index to delete. - - Returns: - bool: Result of the deletion operation. - """ - return await super().delete_index(index) - - async def get_docs_index(self, query: str, index: str): - """Retrieves documents from the index based on a query. - - Args: - query (str): The query to search for. - index (str): The index to search in. - - Returns: - Retrieved documents based on the query. - """ - vector_store = LanceDBVectorStore(uri=self.path, table_name=index) - storage_context = StorageContext.from_defaults(vector_store=vector_store) - vector_index = VectorStoreIndex(nodes=[], storage_context=storage_context) - query_engine = VectorIndexRetriever(vector_index, similarity_top_k=self.similarity_top_k) - return query_engine.retrieve(query) - - # query_engine = vector_index.as_query_engine(llm=self.llm) - - diff --git a/ingestion_server/rags/mongoDB_rag.py b/ingestion_server/rags/mongoDB_rag.py deleted file mode 100644 index 8d55c21..0000000 --- a/ingestion_server/rags/mongoDB_rag.py +++ /dev/null @@ -1,96 +0,0 @@ -from typing import Any, Coroutine -from pymongo import MongoClient -from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch - -from ingestion_server.embeddings.base import BaseEmbed -from ingestion_server.rags.base import BaseRAG -from ingestion_server.datachecks import ProviderConfig, MongoDBConfig - -from llama_index.core import VectorStoreIndex, StorageContext -from llama_index.core.retrievers import VectorIndexRetriever - -class MongoDB(BaseRAG): - """ - MongoDB class for managing vector storage and retrieval using MongoDB. - - Attributes: - similarity_top_k (int): Number of top similar items to retrieve. - config (MongoDBConfig): Configuration for MongoDB. - client (MongoClient): MongoDB client instance. - """ - - def __init__(self, embedding_name: BaseEmbed, config: ProviderConfig) -> None: - """ - Initializes the MongoDB instance. - - Args: - embedding_name (BaseEmbed): The embedding model used. - config (ProviderConfig): Configuration for the provider. - """ - super().__init__("MongoDB", embedding_name, config.chunk_size, config.overlapping) - self.similarity_top_k = config.similarity_top_k - self.config: MongoDBConfig = config.rag - self.client = MongoClient(self.config.uri) - - async def append_index(self, nodes) -> Coroutine[Any, Any, str]: - """Appends nodes to the existing index. - - Args: - nodes: The nodes to append. - - Returns: - Coroutine: A coroutine that calls the base class method. - """ - return await super().append_index(nodes) - - async def get_docs_index(self, query: str, index: str): - """Retrieves documents from the index based on a query. - - Args: - query (str): The query to search for. - index (str): The index to search in. - - Returns: - Retrieved documents based on the query. - """ - vector_store = MongoDBAtlasVectorSearch( - self.client, - db_name=self.config.db, - collection_name=self.config.collection_name, - vector_index_name=index - ) - vector_store_context = StorageContext.from_defaults(vector_store=vector_store) - vector_store_index = VectorStoreIndex(nodes=[], storage_context=vector_store_context) - vector_store_retriever = VectorIndexRetriever(index=vector_store_index, similarity_top_k=self.similarity_top_k) - return vector_store_retriever.retrieve(query) - - async def delete_index(self, index: str) -> Coroutine[Any, Any, bool]: - """Deletes an index. - - Args: - index (str): The index to delete. - - Returns: - Coroutine: A coroutine that calls the base class method. - """ - return await super().delete_index(index) - - async def add_index(self, nodes) -> str: - """Adds nodes to the index and creates a new index. - - Args: - nodes: The nodes to add. - - Returns: - str: The name of the created index. - """ - index = self.generate_index_name() - vector_store = MongoDBAtlasVectorSearch( - self.client, - db_name=self.config.db, - collection_name=self.config.collection_name, - index_name=index - ) - vector_store_context = StorageContext.from_defaults(vector_store=vector_store) - vector_store_index = VectorStoreIndex(nodes=nodes, storage_context=vector_store_context, embed_model=self.embeding_model) - return index \ No newline at end of file diff --git a/ingestion_server/utils/__init__.py b/ingestion_server/utils/__init__.py deleted file mode 100644 index 77bbf5d..0000000 --- a/ingestion_server/utils/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from ingestion_server.utils.log import configure_logger \ No newline at end of file diff --git a/ingestion_server/utils/log.py b/ingestion_server/utils/log.py deleted file mode 100644 index acc0815..0000000 --- a/ingestion_server/utils/log.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -VALID_LOGGING_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] - -def configure_logger(file_name, enabled=True, logging_level='INFO'): - """ - Configures a logger for the specified file. - - Parameters: - - file_name (str): The name of the file for which the logger is being configured. - - enabled (bool): Flag to enable or disable the logger. Default is True. - - logging_level (str): The logging level to set. Must be one of the valid levels. Default is 'INFO'. - - Returns: - - logger (logging.Logger): Configured logger instance. - """ - if logging_level not in VALID_LOGGING_LEVELS: - logging_level = "INFO" - - logging.basicConfig( - level=logging_level, - format="%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - logger = logging.getLogger(file_name) - - if not enabled: - logger.disabled = True - - return logger \ No newline at end of file diff --git a/main.py b/main.py index 388e20b..f105329 100644 --- a/main.py +++ b/main.py @@ -16,11 +16,12 @@ from uuid import uuid4 import tempfile import os +import json logger = configure_logger(__name__) -dotenv.load_dotenv() +dotenv.load_dotenv(".env") DB: Dict[str, RAGConfig] = {} @@ -199,34 +200,6 @@ def heartbeat() -> float: return time.time() -@app.post("/create-rag") -def create_rag(request: RAGConfig) -> Dict[str, str]: - """Create a RAG configuration and return its ID. - Args: - request (RAGConfig): The RAG configuration to create. - Returns: - Dict[str, str]: A dictionary containing the created RAG ID. - """ - print(request) - rag_id = rag_factory.make_rag(request) - return {"rag_id": rag_id} - - -@app.post("/rag-upload-file/{rag_id}") -async def rag_upload_file(file: UploadFile, rag_id: str) -> Dict[str, str]: - """Upload a file for a specific RAG ID. - Args: - file (UploadFile): The file to upload. - rag_id (str): The ID of the RAG to associate with the file. - Returns: - Dict[str, str]: A dictionary containing the upload status and index. - """ - try: - task = await rag_factory.file_ingest(rag_name=rag_id, file=file) - return {"index": task._index, "status": task._status, "message": "DONE"} - except Exception as e: - return {"index": None, "status": "ERROR", "message": f"{e}"} - @app.post("/make-rag") async def make_rag( file: UploadFile = File(...), @@ -244,7 +217,10 @@ async def make_rag( """ try: # Parse the JSON string into a RAGConfig object - rag_config = RAGConfig.parse_raw(config) + config = json.loads(config) + logger.info(f"Config : {config}") + rag_config = RAGConfig(**config) + logger.info(f"RAG Config : {rag_config}") # Create RAG configuration rag_id = rag_factory.make_rag(rag_config) diff --git a/rags/lancedb_rag.py b/rags/lancedb_rag.py index cec92de..ad80c00 100644 --- a/rags/lancedb_rag.py +++ b/rags/lancedb_rag.py @@ -85,5 +85,3 @@ async def get_docs_index(self, query: str, index: str): query_engine = VectorIndexRetriever(vector_index, similarity_top_k=self.similarity_top_k) return query_engine.retrieve(query) - # query_engine = vector_index.as_query_engine(llm=self.llm) - diff --git a/rags/mongoDB_rag.py b/rags/mongoDB_rag.py index 844626d..d939279 100644 --- a/rags/mongoDB_rag.py +++ b/rags/mongoDB_rag.py @@ -57,7 +57,7 @@ async def get_docs_index(self, query: str, index: str): self.client, db_name=self.config.db, collection_name=self.config.collection_name, - vector_index_name=index + vector_index_name=self.config.index ) vector_store_context = StorageContext.from_defaults(vector_store=vector_store) vector_store_index = VectorStoreIndex(nodes=[], storage_context=vector_store_context) @@ -84,13 +84,13 @@ async def add_index(self, nodes) -> str: Returns: str: The name of the created index. """ - index = self.generate_index_name() + # index = self.generate_index_name() vector_store = MongoDBAtlasVectorSearch( self.client, db_name=self.config.db, collection_name=self.config.collection_name, - index_name=index + vector_index_name=self.config.index ) vector_store_context = StorageContext.from_defaults(vector_store=vector_store) vector_store_index = VectorStoreIndex(nodes=nodes, storage_context=vector_store_context, embed_model=self.embeding_model) - return index \ No newline at end of file + return self.config.index \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c121965 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pylance==0.16.0 +pymongo==4.8.0 +python-multipart==0.0.6 +llama-index==0.10.65 +llama-index-vector-stores-lancedb==0.1.7 +llama-index-vector-stores-mongodb==0.1.8 +llama-index-llms-openai==0.1.29 \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py index 6f2c0c3..80c88a3 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -1 +1 @@ -from bolna.ingestion_server.utils.log import configure_logger \ No newline at end of file +from .log import configure_logger \ No newline at end of file diff --git a/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/_transactions/0-7d52c1d3-fecb-493d-b2ea-4bc0a09415ce.txn b/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/_transactions/0-7d52c1d3-fecb-493d-b2ea-4bc0a09415ce.txn deleted file mode 100644 index eecff1f..0000000 --- a/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/_transactions/0-7d52c1d3-fecb-493d-b2ea-4bc0a09415ce.txn +++ /dev/null @@ -1,3 +0,0 @@ -$7d52c1d3-fecb-493d-b2ea-4bc0a09415ce²Ñid ÿÿÿÿÿÿÿÿÿ*string08!doc_id ÿÿÿÿÿÿÿÿÿ*string085vector ÿÿÿÿÿÿÿÿÿ*fixed_size_list:float:153608text ÿÿÿÿÿÿÿÿÿ*string08!metadata ÿÿÿÿÿÿÿÿÿ*struct0 _node_content *string08 -_node_type *string08doc_id *string08 document_id *string08 -ref_doc_id *string08 \ No newline at end of file diff --git a/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/_transactions/1-a5103a06-2d62-4754-8034-4b664e420161.txn b/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/_transactions/1-a5103a06-2d62-4754-8034-4b664e420161.txn deleted file mode 100644 index 5e1deb6..0000000 Binary files a/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/_transactions/1-a5103a06-2d62-4754-8034-4b664e420161.txn and /dev/null differ diff --git a/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/data/b819d82e-ad4f-4e3d-954b-c85d55394c49.lance b/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/data/b819d82e-ad4f-4e3d-954b-c85d55394c49.lance deleted file mode 100644 index 1b4c628..0000000 Binary files a/vectordb_data/dev/b0d1b64a-32f8-47be-a0b2-3c614967c2a5.lance/data/b819d82e-ad4f-4e3d-954b-c85d55394c49.lance and /dev/null differ