Skip to content

Commit

Permalink
Python: Introducing search for CosmosDB NoSQL Collections (#9698)
Browse files Browse the repository at this point in the history
### Motivation and Context

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->
Adds the search functions for Cosmos DB NoSQL.

Closes #6835 

### Description

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
Adds the search pieces and turns out the upsert cannot work cross
partition key with execute batch, so refactored to using gather(upsert).

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [x] The code builds clean without any errors or warnings
- [x] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [x] All unit tests pass, and I have added new tests where possible
- [x] I didn't break anyone 😄

---------

Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>
  • Loading branch information
eavanvalkenburg and moonbox3 authored Nov 15, 2024
1 parent 90cf210 commit 76052b6
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 62 deletions.
3 changes: 1 addition & 2 deletions python/samples/concepts/memory/new_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DataModelList:
collection_name = "test"
# Depending on the vector database, the index kind and distance function may need to be adjusted,
# since not all combinations are supported by all databases.
DataModel = get_data_model_array(IndexKind.HNSW, DistanceFunction.COSINE_DISTANCE)
DataModel = get_data_model_array(IndexKind.HNSW, DistanceFunction.COSINE_SIMILARITY)

# A list of VectorStoreRecordCollection that can be used.
# Available collections are:
Expand Down Expand Up @@ -159,7 +159,6 @@ class DataModelList:
),
"azure_cosmos_nosql": lambda: AzureCosmosDBNoSQLCollection(
data_model_type=DataModel,
database_name="sample_database",
collection_name=collection_name,
create_database=True,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ async def _does_database_exist(self) -> bool:
except CosmosResourceNotFoundError:
return False
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to check if database '{self.database_name}' exists.") from e
raise MemoryConnectorResourceNotFound(
f"Failed to check if database '{self.database_name}' exists, with message {e}"
) from e

async def _get_database_proxy(self, **kwargs) -> DatabaseProxy:
"""Gets the database proxy."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@
)
from semantic_kernel.connectors.memory.azure_cosmos_db.const import COSMOS_ITEM_ID_PROPERTY_NAME
from semantic_kernel.connectors.memory.azure_cosmos_db.utils import (
build_query_parameters,
create_default_indexing_policy,
create_default_vector_embedding_policy,
get_key,
get_partition_key,
)
from semantic_kernel.data.filter_clauses.any_tags_equal_to_filter_clause import AnyTagsEqualTo
from semantic_kernel.data.filter_clauses.equal_to_filter_clause import EqualTo
from semantic_kernel.data.kernel_search_results import KernelSearchResults
from semantic_kernel.data.record_definition.vector_store_model_definition import VectorStoreRecordDefinition
from semantic_kernel.data.vector_storage.vector_store_record_collection import VectorStoreRecordCollection
from semantic_kernel.data.record_definition.vector_store_record_fields import VectorStoreRecordDataField
from semantic_kernel.data.vector_search.vector_search import VectorSearchBase
from semantic_kernel.data.vector_search.vector_search_filter import VectorSearchFilter
from semantic_kernel.data.vector_search.vector_search_options import VectorSearchOptions
from semantic_kernel.data.vector_search.vector_search_result import VectorSearchResult
from semantic_kernel.data.vector_search.vector_text_search import VectorTextSearchMixin
from semantic_kernel.data.vector_search.vectorized_search import VectorizedSearchMixin
from semantic_kernel.exceptions.memory_connector_exceptions import (
MemoryConnectorException,
MemoryConnectorResourceNotFound,
Expand All @@ -41,7 +49,12 @@


@experimental_class
class AzureCosmosDBNoSQLCollection(AzureCosmosDBNoSQLBase, VectorStoreRecordCollection[TKey, TModel]):
class AzureCosmosDBNoSQLCollection(
AzureCosmosDBNoSQLBase,
VectorSearchBase[TKey, TModel],
VectorizedSearchMixin[TModel],
VectorTextSearchMixin[TModel],
):
"""An Azure Cosmos DB NoSQL collection stores documents in a Azure Cosmos DB NoSQL account."""

partition_key: PartitionKey
Expand Down Expand Up @@ -106,12 +119,10 @@ async def _inner_upsert(
records: Sequence[Any],
**kwargs: Any,
) -> Sequence[TKey]:
batch_operations = [("upsert", (record,)) for record in records]
partition_key = [record[self.partition_key.path.strip("/")] for record in records]
container_proxy = await self._get_container_proxy(self.collection_name, **kwargs)
try:
container_proxy = await self._get_container_proxy(self.collection_name, **kwargs)
results = await container_proxy.execute_item_batch(batch_operations, partition_key, **kwargs)
return [result["resourceBody"][COSMOS_ITEM_ID_PROPERTY_NAME] for result in results]
results = await asyncio.gather(*(container_proxy.upsert_item(record) for record in records))
return [result[COSMOS_ITEM_ID_PROPERTY_NAME] for result in results]
except CosmosResourceNotFoundError as e:
raise MemoryConnectorResourceNotFound(
"The collection does not exist yet. Create the collection first."
Expand All @@ -122,12 +133,15 @@ async def _inner_upsert(
@override
async def _inner_get(self, keys: Sequence[TKey], **kwargs: Any) -> OneOrMany[Any] | None:
include_vectors = kwargs.pop("include_vectors", False)
query, parameters = build_query_parameters(self.data_model_definition, keys, include_vectors)
query = (
f"SELECT {self._build_select_clause(include_vectors)} FROM c WHERE " # nosec: B608
f"c.id IN ({', '.join([f'@id{i}' for i in range(len(keys))])})" # nosec: B608
) # nosec: B608
parameters: list[dict[str, Any]] = [{"name": f"@id{i}", "value": get_key(key)} for i, key in enumerate(keys)]

container_proxy = await self._get_container_proxy(self.collection_name, **kwargs)
try:
container_proxy = await self._get_container_proxy(self.collection_name, **kwargs)
results = container_proxy.query_items(query=query, parameters=parameters)
return [item async for item in results]
return [item async for item in container_proxy.query_items(query=query, parameters=parameters)]
except CosmosResourceNotFoundError as e:
raise MemoryConnectorResourceNotFound(
"The collection does not exist yet. Create the collection first."
Expand All @@ -146,6 +160,95 @@ async def _inner_delete(self, keys: Sequence[TKey], **kwargs: Any) -> None:
if exceptions:
raise MemoryConnectorException("Failed to delete item(s).", exceptions)

@override
async def _inner_search(
self,
options: VectorSearchOptions,
search_text: str | None = None,
vectorizable_text: str | None = None,
vector: list[float | int] | None = None,
**kwargs: Any,
) -> KernelSearchResults[VectorSearchResult[TModel]]:
params = [{"name": "@top", "value": options.top}]
if search_text is not None:
query = self._build_search_text_query(options)
params.append({"name": "@search_text", "value": search_text})
elif vector is not None:
query = self._build_vector_query(options)
params.append({"name": "@vector", "value": vector})
else:
raise ValueError("Either search_text or vector must be provided.")
container_proxy = await self._get_container_proxy(self.collection_name, **kwargs)
try:
results = container_proxy.query_items(query, parameters=params)
except Exception as e:
raise MemoryConnectorException("Failed to search items.") from e
return KernelSearchResults(
results=self._get_vector_search_results_from_results(results, options),
total_count=None,
)

def _build_search_text_query(self, options: VectorSearchOptions) -> str:
where_clauses = self._build_where_clauses_from_filter(options.filter)
contains_clauses = " OR ".join(
f"CONTAINS(c.{field}, @search_text)"
for field in self.data_model_definition.fields
if isinstance(field, VectorStoreRecordDataField) and field.is_full_text_searchable
)
return (
f"SELECT TOP @top {self._build_select_clause(options.include_vectors)} " # nosec: B608
f"FROM c WHERE ({contains_clauses}) AND {where_clauses}" # nosec: B608
)

def _build_vector_query(self, options: VectorSearchOptions) -> str:
where_clauses = self._build_where_clauses_from_filter(options.filter)
if where_clauses:
where_clauses = f"WHERE {where_clauses}"
vector_field_name: str = self.data_model_definition.try_get_vector_field(options.vector_field_name).name # type: ignore
return (
f"SELECT TOP @top {self._build_select_clause(options.include_vectors)}," # nosec: B608
f" VectorDistance(c.{vector_field_name}, @vector) AS distance FROM c ORDER " # nosec: B608
f"BY VectorDistance(c.{vector_field_name}, @vector) {where_clauses}" # nosec: B608
)

def _build_select_clause(self, include_vectors: bool) -> str:
"""Create the select clause for a CosmosDB query."""
included_fields = [
field
for field in self.data_model_definition.field_names
if include_vectors or field not in self.data_model_definition.vector_field_names
]
if self.data_model_definition.key_field_name != COSMOS_ITEM_ID_PROPERTY_NAME:
# Replace the key field name with the Cosmos item id property name
included_fields = [
field if field != self.data_model_definition.key_field_name else COSMOS_ITEM_ID_PROPERTY_NAME
for field in included_fields
]

return ", ".join(f"c.{field}" for field in included_fields)

def _build_where_clauses_from_filter(self, filters: VectorSearchFilter | None) -> str:
if filters is None:
return ""
clauses = []
for filter in filters.filters:
match filter:
case EqualTo():
clauses.append(f"c.{filter.field_name} = {filter.value}")
case AnyTagsEqualTo():
clauses.append(f"{filter.value} IN c.{filter.field_name}")
case _:
raise ValueError(f"Unsupported filter: {filter}")
return " AND ".join(clauses)

@override
def _get_record_from_result(self, result: dict[str, Any]) -> dict[str, Any]:
return result

@override
def _get_score_from_result(self, result: dict[str, Any]) -> float | None:
return result.get("distance")

@override
def _serialize_dicts_to_store_models(self, records: Sequence[dict[str, Any]], **kwargs: Any) -> Sequence[Any]:
serialized_records = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AzureCosmosDBNoSQLSettings(KernelBaseSettings):
"""Azure CosmosDB NoSQL settings.
The settings are first loaded from environment variables with
the prefix 'COSMOS_DB_NOSQL_'.
the prefix 'AZURE_COSMOS_DB_NO_SQL_'.
If the environment variables are not found, the settings can
be loaded from a .env file with the encoding 'utf-8'.
If the settings are not found in the .env file, the settings
Expand Down
37 changes: 0 additions & 37 deletions python/semantic_kernel/connectors/memory/azure_cosmos_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import contextlib
from collections.abc import Sequence
from typing import Any

from azure.cosmos.aio import CosmosClient
Expand All @@ -12,7 +11,6 @@
AzureCosmosDBNoSQLCompositeKey,
)
from semantic_kernel.connectors.memory.azure_cosmos_db.const import (
COSMOS_ITEM_ID_PROPERTY_NAME,
DATATYPES_MAPPING,
DISTANCE_FUNCTION_MAPPING,
INDEX_KIND_MAPPING,
Expand Down Expand Up @@ -174,41 +172,6 @@ def get_partition_key(key: str | AzureCosmosDBNoSQLCompositeKey) -> str:
return key


def build_query_parameters(
data_model_definition: VectorStoreRecordDefinition,
keys: Sequence[str | AzureCosmosDBNoSQLCompositeKey],
include_vectors: bool,
) -> tuple[str, list[dict[str, Any]]]:
"""Builds the query and parameters for the Azure Cosmos DB NoSQL query item operation.
Args:
data_model_definition (VectorStoreRecordDefinition): The definition of the data model.
keys (Sequence[str | AzureCosmosDBNoSQLCompositeKey]): The keys.
include_vectors (bool): Whether to include the vectors in the query.
Returns:
tuple[str, list[dict[str, str]]]: The query and parameters.
"""
included_fields = [
field
for field in data_model_definition.field_names
if include_vectors or field not in data_model_definition.vector_field_names
]
if data_model_definition.key_field_name != COSMOS_ITEM_ID_PROPERTY_NAME:
# Replace the key field name with the Cosmos item id property name
included_fields = [
field if field != data_model_definition.key_field_name else COSMOS_ITEM_ID_PROPERTY_NAME
for field in included_fields
]

select_clause = ", ".join(f"c.{field}" for field in included_fields)

return (
f"SELECT {select_clause} FROM c WHERE c.id IN ({', '.join([f'@id{i}' for i in range(len(keys))])})", # nosec: B608
[{"name": f"@id{i}", "value": get_key(key)} for i, key in enumerate(keys)],
)


class CosmosClientWrapper(CosmosClient):
"""Wrapper to make sure the CosmosClient is closed properly."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from semantic_kernel.connectors.memory.azure_cosmos_db.azure_cosmos_db_no_sql_collection import (
AzureCosmosDBNoSQLCollection,
)
from semantic_kernel.connectors.memory.azure_cosmos_db.const import COSMOS_ITEM_ID_PROPERTY_NAME
from semantic_kernel.connectors.memory.azure_cosmos_db.utils import (
COSMOS_ITEM_ID_PROPERTY_NAME,
CosmosClientWrapper,
create_default_indexing_policy,
create_default_vector_embedding_policy,
Expand Down Expand Up @@ -397,13 +397,11 @@ async def test_azure_cosmos_db_no_sql_upsert(

vector_collection._get_container_proxy = AsyncMock(return_value=mock_container_proxy)

mock_container_proxy.execute_item_batch = AsyncMock(
return_value=[{"resourceBody": {COSMOS_ITEM_ID_PROPERTY_NAME: item["id"]}}]
)
mock_container_proxy.upsert_item = AsyncMock(return_value={COSMOS_ITEM_ID_PROPERTY_NAME: item["id"]})

result = await vector_collection.upsert(item)

mock_container_proxy.execute_item_batch.assert_called_once_with([("upsert", (item,))], [item["id"]])
mock_container_proxy.upsert_item.assert_called_once_with(item)
assert result == item["id"]


Expand All @@ -426,13 +424,11 @@ async def test_azure_cosmos_db_no_sql_upsert_without_id(

vector_collection._get_container_proxy = AsyncMock(return_value=mock_container_proxy)

mock_container_proxy.execute_item_batch = AsyncMock(
return_value=[{"resourceBody": {COSMOS_ITEM_ID_PROPERTY_NAME: item["key"]}}]
)
mock_container_proxy.upsert_item = AsyncMock(return_value={COSMOS_ITEM_ID_PROPERTY_NAME: item["key"]})

result = await vector_collection.upsert(item)

mock_container_proxy.execute_item_batch.assert_called_once_with([("upsert", (item_with_id,))], [item["key"]])
mock_container_proxy.upsert_item.assert_called_once_with(item_with_id)
assert result == item["key"]


Expand Down

0 comments on commit 76052b6

Please sign in to comment.