Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update cached collection schema by updatetime #1297

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.index.IndexService;
import io.milvus.v2.service.vector.request.*;
import io.milvus.v2.service.vector.response.*;
import io.milvus.v2.utils.DataUtils;
Expand Down Expand Up @@ -72,10 +71,10 @@ private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusSe
* If insert/upsert get server error, remove the cached collection info.
*/
private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
String databaseName, String collectionName) {
String databaseName, String collectionName, boolean forceUpdate) {
String key = combineCacheKey(databaseName, collectionName);
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
if (info == null) {
if (info == null || forceUpdate) {
info = describeCollection(blockingStub, databaseName, collectionName);
cacheCollectionInfo.put(key, info);
}
Expand Down Expand Up @@ -110,14 +109,26 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle
}
}

private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
InsertRequest rpcRequest = requestBuilder.convertGrpcInsertRequest(request, descColl);
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
}

public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
String title = String.format("InsertRequest collectionName:%s", request.getCollectionName());

// TODO: set the database name
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
MutationResult response = blockingStub.insert(requestBuilder.convertGrpcInsertRequest(request, descColl));
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
InsertRequest rpcRequest = buildInsertRequest(request, descResp);
MutationResult response = blockingStub.insert(rpcRequest);
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
rpcRequest = buildInsertRequest(request, descResp);
response = blockingStub.insert(rpcRequest);
}

cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());
GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
Expand All @@ -137,14 +148,26 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
}
}

private UpsertRequest buildUpsertRequest(UpsertReq request, DescribeCollectionResponse descResp) {
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
UpsertRequest rpcRequest = requestBuilder.convertGrpcUpsertRequest(request, descColl);
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
}

public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName());

// TODO: set the database name
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
MutationResult response = blockingStub.upsert(requestBuilder.convertGrpcUpsertRequest(request, descColl));
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
UpsertRequest rpcRequest = buildUpsertRequest(request, descResp);
MutationResult response = blockingStub.upsert(rpcRequest);
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
rpcRequest = buildUpsertRequest(request, descResp);
response = blockingStub.upsert(rpcRequest);
}

cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());
GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
Expand Down Expand Up @@ -235,9 +258,9 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
}

DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
if (request.getFilter() == null) {
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
}
DeleteRequest.Builder builder = DeleteRequest.newBuilder()
Expand Down
Loading