Skip to content

Commit

Permalink
Add RefreshLoad/GetPartitionStats interface (#1243)
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <yihua.mo@zilliz.com>
  • Loading branch information
yhmo authored Dec 20, 2024
1 parent 01e43c8 commit 70135b4
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 20 deletions.
13 changes: 10 additions & 3 deletions examples/main/java/io/milvus/v2/BulkWriterExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ private void createCollection(String collectionName, CreateCollectionReq.Collect
} else {
milvusClient.createCollection(requestCreate);
}
// milvusClient.loadCollection(LoadCollectionReq.builder().collectionName(collectionName).build());

System.out.printf("Collection %s created%n", collectionName);
}

Expand Down Expand Up @@ -694,12 +694,19 @@ private void createIndex() {
.collectionName(ALL_TYPES_COLLECTION_NAME)
.indexParams(indexes)
.build());

milvusClient.loadCollection(LoadCollectionReq.builder()
.collectionName(ALL_TYPES_COLLECTION_NAME)
.build());
}

private void loadCollection() {
System.out.println("Loading Collection...");
System.out.println("Refresh load collection...");
checkMilvusClientIfExist();
milvusClient.loadCollection(LoadCollectionReq.builder()
// RefreshLoad is a new interface from v2.5.3,
// mainly used when there are new segments generated by bulkinsert request,
// force the new segments to be loaded into memory.
milvusClient.refreshLoad(RefreshLoadReq.builder()
.collectionName(ALL_TYPES_COLLECTION_NAME)
.build());
System.out.println("Collection row number: " + getCollectionRowCount());
Expand Down
39 changes: 30 additions & 9 deletions sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.milvus.v2.service.index.response.*;
import io.milvus.v2.service.partition.PartitionService;
import io.milvus.v2.service.partition.request.*;
import io.milvus.v2.service.partition.response.*;
import io.milvus.v2.service.rbac.RBACService;
import io.milvus.v2.service.rbac.request.*;
import io.milvus.v2.service.rbac.response.*;
Expand Down Expand Up @@ -306,7 +307,7 @@ public ListDatabasesResp listDatabases() {
}
/**
* Alter database with key value pair. (Available from Milvus v2.4.4)
* Deprecated, replaced by alterDatabaseProperties from v2.5.2, to keep consistence with other SDKs
* Deprecated, replaced by alterDatabaseProperties from v2.5.3, to keep consistence with other SDKs
* @param request alter database request
*/
@Deprecated
Expand All @@ -317,14 +318,14 @@ public void alterDatabase(AlterDatabaseReq request) {
.build());
}
/**
* Alter a database's properties (Available from Milvus v2.5.2)
* Alter a database's properties (Available from Milvus v2.5.3)
* @param request alter database properties request
*/
public void alterDatabaseProperties(AlterDatabasePropertiesReq request) {
retry(()-> databaseService.alterDatabaseProperties(this.getRpcStub(), request));
}
/**
* drop a database's properties (Available from Milvus v2.5.2)
* drop a database's properties (Available from Milvus v2.5.3)
* @param request alter database properties request
*/
public void dropDatabaseProperties(DropDatabasePropertiesReq request) {
Expand Down Expand Up @@ -374,7 +375,7 @@ public void dropCollection(DropCollectionReq request) {
}
/**
* Alter a collection in Milvus.
* Deprecated, replaced by alterCollectionProperties from v2.5.2, to keep consistence with other SDKs
* Deprecated, replaced by alterCollectionProperties from v2.5.3, to keep consistence with other SDKs
*
* @param request alter collection request
*/
Expand All @@ -387,15 +388,15 @@ public void alterCollection(AlterCollectionReq request) {
.build());
}
/**
* Alter a collection's properties (Available from Milvus v2.5.2).
* Alter a collection's properties (Available from Milvus v2.5.3).
*
* @param request alter collection properties request
*/
public void alterCollectionProperties(AlterCollectionPropertiesReq request) {
retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request));
}
/**
* drop a collection's properties (Available from Milvus v2.5.2)
* drop a collection's properties (Available from Milvus v2.5.3)
* @param request drop collection properties request
*/
public void dropCollectionProperties(DropCollectionPropertiesReq request) {
Expand Down Expand Up @@ -444,6 +445,16 @@ public void renameCollection(RenameCollectionReq request) {
public void loadCollection(LoadCollectionReq request) {
retry(()-> collectionService.loadCollection(this.getRpcStub(), request));
}
/**
* Refresh loads a collection. Mainly used when there are new segments generated by bulkinsert request.
* Force the new segments to be loaded into memory.
* Note: this interface will ignore the LoadCollectionReq.refresh flag
*
* @param request refresh load collection request
*/
public void refreshLoad(RefreshLoadReq request) {
retry(()-> collectionService.refreshLoad(this.getRpcStub(), request));
}
/**
* Releases a collection from memory in Milvus.
*
Expand Down Expand Up @@ -481,7 +492,7 @@ public void dropIndex(DropIndexReq request) {
}
/**
* Alter an index in Milvus.
* Deprecated, replaced by alterIndexProperties from v2.5.2, to keep consistence with other SDKs
* Deprecated, replaced by alterIndexProperties from v2.5.3, to keep consistence with other SDKs
*
* @param request alter index request
*/
Expand All @@ -495,15 +506,15 @@ public void alterIndex(AlterIndexReq request) {
.build());
}
/**
* Alter an index's properties (Available from Milvus v2.5.2)
* Alter an index's properties (Available from Milvus v2.5.3)
*
* @param request alter index request
*/
public void alterIndexProperties(AlterIndexPropertiesReq request) {
retry(()->indexService.alterIndexProperties(this.getRpcStub(), request));
}
/**
* drop an index's properties (Available from Milvus v2.5.2)
* drop an index's properties (Available from Milvus v2.5.3)
* @param request drop index properties request
*/
public void dropIndexProperties(DropIndexPropertiesReq request) {
Expand Down Expand Up @@ -654,6 +665,16 @@ public List<String> listPartitions(ListPartitionsReq request) {
return retry(()->partitionService.listPartitions(this.getRpcStub(), request));
}

/**
* get a partition stats in Milvus.
*
* @param request get partition stats request
* @return GetPartitionStatsResp
*/
public GetPartitionStatsResp getPartitionStats(GetPartitionStatsReq request) {
return retry(()-> partitionService.getPartitionStats(this.getRpcStub(), request));
}

/**
* Loads partitions in a collection in Milvus.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,22 @@ public Void loadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingS
Status status = blockingStub.loadCollection(loadCollectionRequest);
rpcUtils.handleResponse(title, status);
if (request.getAsync()) {
WaitForLoadCollection(blockingStub, request);
WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
}

return null;
}

public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RefreshLoadReq request) {
String title = String.format("RefreshLoadRequest collectionName:%s", request.getCollectionName());
LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
.setCollectionName(request.getCollectionName())
.setRefresh(true)
.build();
Status status = blockingStub.loadCollection(loadCollectionRequest);
rpcUtils.handleResponse(title, status);
if (request.getAsync()) {
WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
}

return null;
Expand Down Expand Up @@ -339,16 +354,17 @@ public void waitForCollectionRelease(MilvusServiceGrpc.MilvusServiceBlockingStub
}
}

private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadCollectionReq request) {
private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
String collectionName, long timeoutMs) {
boolean isLoaded = false;
long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)

while (!isLoaded) {
// Call the getLoadState method
isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(request.getCollectionName()).build());
isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(collectionName).build());
if (!isLoaded) {
// Check if timeout is exceeded
if (System.currentTimeMillis() - startTime > request.getTimeout()) {
if (System.currentTimeMillis() - startTime > timeoutMs) {
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
}
// Wait for a certain period before checking again
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.collection.request;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class RefreshLoadReq {
private String collectionName;
@Builder.Default
private Boolean async = Boolean.TRUE;
@Builder.Default
private Long timeout = 60000L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package io.milvus.v2.service.partition;

import io.milvus.grpc.CreatePartitionRequest;
import io.milvus.grpc.MilvusServiceGrpc;
import io.milvus.grpc.Status;
import io.milvus.grpc.*;
import io.milvus.v2.service.BaseService;
import io.milvus.v2.service.partition.request.*;
import io.milvus.v2.service.partition.response.*;

import java.util.List;

Expand Down Expand Up @@ -79,6 +78,21 @@ public List<String> listPartitions(MilvusServiceGrpc.MilvusServiceBlockingStub b
return showPartitionsResponse.getPartitionNamesList();
}

public GetPartitionStatsResp getPartitionStats(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, GetPartitionStatsReq request) {
String title = String.format("GetCollectionStatisticsRequest collectionName:%s", request.getCollectionName());
GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
.setCollectionName(request.getCollectionName())
.setPartitionName(request.getPartitionName())
.build();
GetPartitionStatisticsResponse response = blockingStub.getPartitionStatistics(getPartitionStatisticsRequest);

rpcUtils.handleResponse(title, response.getStatus());
GetPartitionStatsResp getPartitionStatsResp = GetPartitionStatsResp.builder()
.numOfEntities(response.getStatsList().stream().filter(stat -> stat.getKey().equals("row_count")).map(stat -> Long.parseLong(stat.getValue())).findFirst().get())
.build();
return getPartitionStatsResp;
}

public Void loadPartitions(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadPartitionsReq request) {
String title = String.format("Load partitions %s in collection %s", request.getPartitionNames(), request.getCollectionName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.partition.request;

import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class GetPartitionStatsReq {
private String collectionName;
private String partitionName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package io.milvus.v2.service.partition.response;

import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class GetPartitionStatsResp {
private Long numOfEntities;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import io.milvus.grpc.*;
import io.milvus.v2.common.CompactionState;
import io.milvus.v2.exception.ErrorCode;
import io.milvus.v2.exception.MilvusClientException;
import io.milvus.v2.service.BaseService;
import io.milvus.v2.service.utility.request.*;
import io.milvus.v2.service.utility.response.*;
Expand All @@ -32,7 +34,8 @@ public FlushResp flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
List<String> collectionNames = request.getCollectionNames();
String title = String.format("Flush collections %s", collectionNames);
if (collectionNames.isEmpty()) {
return null; // maybe do flushAll in future
// consistent with python sdk behavior, throw an error if collection names list is null or empty
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Collection name list can not be null or empty");
}

FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()
Expand Down

0 comments on commit 70135b4

Please sign in to comment.