diff --git a/app/configs/app_config.py b/app/configs/app_config.py index 3f931a07..bc6b3071 100644 --- a/app/configs/app_config.py +++ b/app/configs/app_config.py @@ -28,6 +28,10 @@ class Env: greenroom_bucket_prefix = 'gr' # the number of items to active interative mode interative_threshold = 10 + # set hard limit for pending jobs, otherwise cli will consume all memory + # to cache jobs. If later on the speed of chunk deliver become faster, we + # can increase the concurrency number. + num_of_jobs = 20 github_url = 'PilotDataPlatform/cli' diff --git a/app/services/file_manager/file_upload/upload_client.py b/app/services/file_manager/file_upload/upload_client.py index 81f1fd3c..230492cd 100644 --- a/app/services/file_manager/file_upload/upload_client.py +++ b/app/services/file_manager/file_upload/upload_client.py @@ -7,6 +7,7 @@ import json import math import os +import threading import time from logging import getLogger from multiprocessing.pool import ApplyResult @@ -302,6 +303,10 @@ def stream_upload(self, file_object: FileObject, pool: ThreadPool) -> List[Apply been uploaded. """ count = 0 + semaphore = threading.Semaphore(AppConfig.Env.num_of_jobs) + + def on_complete(result): + semaphore.release() # process on the file content f = open(file_object.local_path, 'rb') @@ -327,9 +332,11 @@ def stream_upload(self, file_object: FileObject, pool: ThreadPool) -> List[Apply chunk_size = chunk_info.get('chunk_size', self.chunk_size) file_object.update_progress(chunk_size) else: + semaphore.acquire() res = pool.apply_async( self.upload_chunk, args=(file_object, count + 1, chunk, local_chunk_etag, len(chunk)), + callback=on_complete, ) chunk_result.append(res) diff --git a/pyproject.toml b/pyproject.toml index 8cdd92d8..c123ee4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.11.0" +version = "3.12.0" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] diff --git a/tests/app/commands/test_dataset.py b/tests/app/commands/test_dataset.py index 5d31bf32..09f46eda 100644 --- a/tests/app/commands/test_dataset.py +++ b/tests/app/commands/test_dataset.py @@ -31,7 +31,6 @@ def test_dataset_list_total_less_than_10(mocker, cli_runner): question_mock = mocker.patch.object(questionary, 'select', return_value=questionary.select) result = cli_runner.invoke(dataset_list, ['--page', 0, '--page-size', page_size]) - assert result.exit_code == 0 assert '' == result.output assert question_mock.call_count == 0