Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi Datasource #579

Draft
wants to merge 4 commits into
base: feature_multi_data_source
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pebblo/app/api/req_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ReqDiscover(BaseModel):
owner: str
description: Optional[str] = None
load_id: Optional[str] = None
run_id: Optional[str] = None
runtime: Runtime
framework: Framework
chains: Optional[List[ChainInfo]] = None
Expand All @@ -61,6 +62,7 @@ class ReqLoaderDoc(BaseModel):
docs: list[dict] = None
plugin_version: str
load_id: str
run_id: Optional[str] = None
loader_details: dict
loading_end: bool
source_owner: str
Expand Down
11 changes: 11 additions & 0 deletions pebblo/app/models/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ class LoaderMetadata(BaseModel):
sourceType: str
sourceSize: int
sourceFiles: Optional[list] = []
docEntities: Optional[dict] = {}
docTopics: Optional[dict] = {}
lastModified: Optional[str] = None


class AiDataLoader(AiBaseApp):
loaders: Optional[List[LoaderMetadata]] = []
runId: Optional[str] = None
# documents: Optional[List[UUID]] = [] # list of doc ids, TODO: need confirmation
docEntities: Optional[Dict] = {}
docTopics: Optional[Dict] = {}
Expand All @@ -145,16 +148,19 @@ class AiDataLoader(AiBaseApp):

class AiDataSource(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
runId: Optional[str] = None
appName: str
loadId: str
metadata: Metadata
sourcePath: str
sourceSize: int
sourceType: str
loader: str


class AiDocument(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
runId: Optional[str] = None
appName: str
loadId: str
dataSourceId: str
Expand All @@ -171,9 +177,11 @@ class AiDocument(BaseModel):

class AiSnippet(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
runId: Optional[str] = None
appName: str
loadId: str
dataSourceId: str
dataSourceName: str
documentId: str
metadata: Metadata
doc: Optional[str] = None
Expand Down Expand Up @@ -223,6 +231,7 @@ class DataSource(BaseModel):
class TopFindings(BaseModel):
fileName: str
fileOwner: str
dataSource: Optional[str] = None
sourceSize: int
findingsEntities: int
findingsTopics: int
Expand Down Expand Up @@ -255,9 +264,11 @@ class ReportModel(BaseModel):
framework: Optional[FrameworkInfo] = Field(default_factory=FrameworkInfo)
reportSummary: Optional[Summary] = None
loadHistory: Optional[dict] = None
findings: Optional[list] = None
topFindings: Optional[List[TopFindings]] = None
instanceDetails: Optional[InstanceDetails] = None
dataSources: Optional[List[DataSource]] = None
pebbloServerVersion: Optional[str] = None
pebbloClientVersion: Optional[str] = None
clientVersion: Optional[dict] = None
snippets: Optional[dict] = {}
7 changes: 6 additions & 1 deletion pebblo/app/service/discovery/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ def get_or_create_app(db, app_name, app_class, data, app_type):
ai_app = {"name": app_name}

if app_type == ApplicationTypes.LOADER.value:
ai_app["id"] = data["load_id"]
if "run_id" in data.keys():
ai_app["runId"] = data["run_id"]
# ai_app["id"] = data["load_id"]
else:
ai_app["id"] = data["load_id"]
elif app_type == ApplicationTypes.RETRIEVAL.value:
pass

Expand All @@ -30,6 +34,7 @@ def get_or_create_app(db, app_name, app_class, data, app_type):
)

# Inserting app details
ai_app["id"] = data["load_id"]
response, app_object = db.insert_data(app_class, ai_app)

if response:
Expand Down
3 changes: 2 additions & 1 deletion pebblo/app/service/discovery/discovery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def _get_app_type_and_class(self):
AppClass = None
app_type = None
load_id = self.data.get("load_id") or None
if load_id:
run_id = self.data.get("run_id") or None
if load_id or run_id:
AppClass = AiDataLoaderTable
app_type = ApplicationTypes.LOADER.value
else:
Expand Down
22 changes: 16 additions & 6 deletions pebblo/app/service/loader/document/document.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from pebblo.app.enums.enums import ApplicationTypes
from pebblo.app.models.db_models import AiDocument
from pebblo.app.models.sqltables import AiDocumentTable
from pebblo.app.service.loader.snippet.snippet import AiSnippetHandler
Expand All @@ -20,9 +19,14 @@ def _get_or_create_document(self, doc: dict, data_source: dict) -> AiDocumentTab
logger.debug("Create or update AIDocument")
filter_query = {
"appName": self.app_name,
"loadId": self.data.get("load_id"),
"sourcePath": doc.get("source_path"),
}

if "run_id" in self.data.keys():
filter_query["runId"] = self.data.get("run_id")
else:
filter_query["loadId"] = self.data.get("load_id")

status, output = self.db.query(AiDocumentTable, filter_query)
if output and len(output) > 0:
data = output[0].data
Expand All @@ -47,6 +51,9 @@ def _get_or_create_document(self, doc: dict, data_source: dict) -> AiDocumentTab
"userIdentities": doc.get("authorized_identities", []),
"lastIngested": get_current_time(),
}
if "run_id" in self.data.keys():
ai_documents["runId"] = self.data.get("run_id")

ai_document_obj = AiDocument(**ai_documents)
ai_document_data = ai_document_obj.model_dump()

Expand Down Expand Up @@ -139,9 +146,7 @@ def _update_document(document: dict, snippet: dict) -> dict:
return document

@timeit
def create_or_update_document(
self, app_loader_details: ApplicationTypes.LOADER.value, data_source: dict
):
def create_or_update_document(self, app_loader_details: dict, data_source: dict):
logger.debug("Create or update document snippet")
input_doc_list = self.data.get("docs", [])
for doc in input_doc_list:
Expand All @@ -154,7 +159,12 @@ def create_or_update_document(
app_loader_details = self._update_loader_documents(
app_loader_details, existing_document
)
app_loader_details = self.snippet_handler.update_loader_with_snippet(

app_loader_details = self.snippet_handler.update_loader_doc_findings(
app_loader_details, snippet
)

app_loader_details = self.snippet_handler.update_app_doc_findings(
app_loader_details, snippet
)

Expand Down
18 changes: 16 additions & 2 deletions pebblo/app/service/loader/loader_doc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ def _update_loader_details(self, app_loader_details):
loader_exist = False
for loader in loader_list:
# If loader exist, update loader SourcePath and SourceType
if loader and loader.get("name", "") == loader_name:
if (
loader
and loader.get("name", "") == loader_name
and loader["sourcePath"] == source_path
):
loader["sourcePath"] = source_path
loader["sourceType"] = source_type
loader["sourceSize"] = source_size
Expand Down Expand Up @@ -258,8 +262,13 @@ def _get_or_create_data_source(self):
"appName": self.app_name,
"sourcePath": loader_details.get("source_path"),
"sourceType": loader_details.get("source_type"),
"loadId": self.data.get("load_id"),
}

if "run_id" in self.data.keys():
filter_query["runId"] = self.data.get("run_id")
else:
filter_query["loadId"] = self.data.get("load_id")

status, output = self.db.query(AiDataSourceTable, filter_query)
if status and output and len(output) > 0:
logger.debug("Data Source details are already existed.")
Expand All @@ -275,9 +284,14 @@ def _get_or_create_data_source(self):
"modifiedAt": get_current_time(),
},
"sourcePath": loader_details.get("source_path"),
"sourceSize": loader_details.get("source_aggregate_size", 0),
"sourceType": loader_details.get("source_type"),
"loader": loader_details.get("loader"),
}

if "run_id" in self.data.keys():
data_source["runId"] = self.data.get("run_id")

ai_data_source_obj = AiDataSource(**data_source)
ai_data_source = ai_data_source_obj.model_dump()
_, data_source_obj = self.db.insert_data(AiDataSourceTable, ai_data_source)
Expand Down
38 changes: 36 additions & 2 deletions pebblo/app/service/loader/snippet/snippet.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,37 @@ def _count_and_update_entities_topics(
logger.debug("Counting entities and topics finished.")
return restricted_data

def update_loader_with_snippet(self, app_loader_details, snippet):
# Update doc entities & topics details from snippets
def update_loader_doc_findings(self, app_loader_details, snippet):
# Update doc entities & topics for individual loader
# Fetching entities and topics
for loader in app_loader_details.get("loaders", []):
if loader.get("sourcePath") == snippet.get("sourcePath") and loader.get(
"name"
) == snippet.get("dataSourceName"):
entities_data = loader.get("docEntities", {})
topics_data = loader.get("docTopics", {})

if snippet.get("entities"):
# If entities exist in snippet
entities_data = self._count_and_update_entities_topics(
entities_data, snippet.get("entities"), snippet.get("id")
)
if snippet.get("topics"):
# If entities exist in snippet
topics_data = self._count_and_update_entities_topics(
topics_data, snippet.get("topics"), snippet.get("id")
)

loader["docEntities"] = entities_data
loader["docTopics"] = topics_data
logger.debug(f"Loader Doc entities: {loader}")
break

return app_loader_details

# def update_loader_with_snippet(self, app_loader_details, snippet):
def update_app_doc_findings(self, app_loader_details, snippet):
# Update doc entities & topics for app
# Fetching entities and topics
entities_data = app_loader_details.get("docEntities", {})
topics_data = app_loader_details.get("docTopics", {})
Expand Down Expand Up @@ -68,12 +97,17 @@ def create_snippet(self, doc, data_source, document):
"doc": doc.get("doc"),
# 'checksum': checksum,
"sourcePath": doc.get("source_path"),
"dataSourceName": data_source.get("loader"),
"loaderSourcePath": data_source.get("sourcePath"),
"entities": doc.get("entities", {}),
"topics": doc.get("topics", {}),
"entityDetails": doc.get("entity_details", {}),
"topicDetails": doc.get("topic_details", {}),
}

if "run_id" in self.data.keys():
snippet_details["runId"] = self.data.get("run_id")

ai_snippet_obj = AiSnippet(**snippet_details)
ai_snippet = ai_snippet_obj.model_dump()
status, snippet_obj = self.db.insert_data(AiSnippetsTable, ai_snippet)
Expand Down
Loading
Loading