diff --git a/examples/example_bulkwriter.py b/examples/example_bulkwriter.py index 992fe7f72..4cd504288 100644 --- a/examples/example_bulkwriter.py +++ b/examples/example_bulkwriter.py @@ -20,6 +20,9 @@ import tensorflow as tf import logging + +from typing import List + logging.basicConfig(level=logging.INFO) from pymilvus import ( @@ -273,7 +276,7 @@ def _append_row(writer: LocalBulkWriter, begin: int, end: int): print("Data is correct") -def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list: +def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)-> List[List[str]]: print(f"\n===================== all field types ({file_type.name}) ====================") with RemoteBulkWriter( schema=schema, @@ -347,31 +350,47 @@ def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list: return remote_writer.batch_files -def call_bulkinsert(schema: CollectionSchema, batch_files: list): - print(f"\n===================== call bulkinsert ====================") +def call_bulkinsert(schema: CollectionSchema, batch_files: List[List[str]]): if utility.has_collection(ALL_TYPES_COLLECTION_NAME): utility.drop_collection(ALL_TYPES_COLLECTION_NAME) collection = Collection(name=ALL_TYPES_COLLECTION_NAME, schema=schema) print(f"Collection '{collection.name}' created") - task_ids = [] - for files in batch_files: - task_id = utility.do_bulk_insert(collection_name=ALL_TYPES_COLLECTION_NAME, files=files) - task_ids.append(task_id) - print(f"Create a bulkinert task, task id: {task_id}") + url = f"http://{HOST}:{PORT}" - while len(task_ids) > 0: - print("Wait 1 second to check bulkinsert tasks state...") + print(f"\n===================== import files to milvus ====================") + resp = bulk_import( + url=url, + collection_name=ALL_TYPES_COLLECTION_NAME, + files=batch_files, + ) + print(resp.json()) + job_id = resp.json()['data']['jobId'] + print(f"Create a bulkinsert job, job id: {job_id}") + + while True: + print("Wait 1 second to check bulkinsert job state...") time.sleep(1) - for id in task_ids: - state = utility.get_bulk_insert_state(task_id=id) - if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned: - print(f"The task {state.task_id} failed, reason: {state.failed_reason}") - task_ids.remove(id) - elif state.state == BulkInsertState.ImportCompleted: - print(f"The task {state.task_id} completed") - task_ids.remove(id) + + print(f"\n===================== get import job progress ====================") + resp = get_import_progress( + url=url, + job_id=job_id, + ) + + state = resp.json()['data']['state'] + progress = resp.json()['data']['progress'] + if state == "Importing": + print(f"The job {job_id} is importing... {progress}%") + continue + if state == "Failed": + reason = resp.json()['data']['reason'] + print(f"The job {job_id} failed, reason: {reason}") + break + if state == "Completed" and progress == 100: + print(f"The job {job_id} completed") + break print(f"Collection row number: {collection.num_entities}") @@ -427,13 +446,13 @@ def cloud_bulkinsert(): object_url_secret_key = "_your_object_storage_service_secret_key_" resp = bulk_import( url=url, - api_key=api_key, + collection_name=collection_name, + partition_name=partition_name, object_url=object_url, + cluster_id=cluster_id, + api_key=api_key, access_key=object_url_access_key, secret_key=object_url_secret_key, - cluster_id=cluster_id, - collection_name=collection_name, - partition_name=partition_name, ) print(resp.json()) @@ -441,17 +460,17 @@ def cloud_bulkinsert(): job_id = resp.json()['data']['jobId'] resp = get_import_progress( url=url, - api_key=api_key, job_id=job_id, cluster_id=cluster_id, + api_key=api_key, ) print(resp.json()) print(f"\n===================== list import jobs ====================") resp = list_import_jobs( url=url, - api_key=api_key, cluster_id=cluster_id, + api_key=api_key, page_size=10, current_page=1, ) diff --git a/pymilvus/bulk_writer/bulk_import.py b/pymilvus/bulk_writer/bulk_import.py index 5dbc521fa..1c110d230 100644 --- a/pymilvus/bulk_writer/bulk_import.py +++ b/pymilvus/bulk_writer/bulk_import.py @@ -12,6 +12,7 @@ import json import logging +from typing import List, Optional import requests @@ -77,36 +78,43 @@ def _get_request( ## bulkinsert RESTful api wrapper def bulk_import( url: str, - api_key: str, - object_url: str, - access_key: str, - secret_key: str, - cluster_id: str, collection_name: str, + files: Optional[List[List[str]]] = None, + object_url: str = "", + cluster_id: str = "", + api_key: str = "", + access_key: str = "", + secret_key: str = "", **kwargs, ) -> requests.Response: """call bulkinsert restful interface to import files Args: url (str): url of the server - object_url (str): data files url - access_key (str): access key to access the object storage - secret_key (str): secret key to access the object storage - cluster_id (str): id of a milvus instance(for cloud) collection_name (str): name of the target collection partition_name (str): name of the target partition + files (list of list of str): The files that contain the data to import. + A sub-list contains a single JSON or Parquet file, or a set of Numpy files. + object_url (str): The URL of the object to import. + This URL should be accessible to the S3-compatible + object storage service, such as AWS S3, GCS, Azure blob storage. + cluster_id (str): id of a milvus instance(for cloud) + api_key (str): API key to authenticate your requests. + access_key (str): access key to access the object storage + secret_key (str): secret key to access the object storage Returns: - json: response of the restful interface + response of the restful interface """ request_url = url + "/v2/vectordb/jobs/import/create" partition_name = kwargs.pop("partition_name", "") params = { - "clusterId": cluster_id, "collectionName": collection_name, "partitionName": partition_name, + "files": files, "objectUrl": object_url, + "clusterId": cluster_id, "accessKey": access_key, "secretKey": secret_key, } @@ -117,7 +125,7 @@ def bulk_import( def get_import_progress( - url: str, api_key: str, job_id: str, cluster_id: str, **kwargs + url: str, job_id: str, cluster_id: str = "", api_key: str = "", **kwargs ) -> requests.Response: """get job progress @@ -125,9 +133,10 @@ def get_import_progress( url (str): url of the server job_id (str): a job id cluster_id (str): id of a milvus instance(for cloud) + api_key (str): API key to authenticate your requests. Returns: - json: response of the restful interface + response of the restful interface """ request_url = url + "/v2/vectordb/jobs/import/describe" @@ -142,22 +151,31 @@ def get_import_progress( def list_import_jobs( - url: str, api_key: str, cluster_id: str, page_size: int, current_page: int, **kwargs + url: str, + collection_name: str = "", + cluster_id: str = "", + api_key: str = "", + page_size: int = 10, + current_page: int = 1, + **kwargs, ) -> requests.Response: """list jobs in a cluster Args: url (str): url of the server + collection_name (str): name of the target collection cluster_id (str): id of a milvus instance(for cloud) + api_key (str): API key to authenticate your requests. page_size (int): pagination size current_page (int): pagination Returns: - json: response of the restful interface + response of the restful interface """ request_url = url + "/v2/vectordb/jobs/import/list" params = { + "collectionName": collection_name, "clusterId": cluster_id, "pageSize": page_size, "currentPage": current_page,