diff --git a/requirements.txt b/requirements.txt index 1124e79..f2db147 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ uda zarr rich pydantic +s5cmd # git+ssh://git@git.ccfe.ac.uk/MAST-U/mastcodes.git@release/1.3.10#subdirectory=uda/python diff --git a/src/upload.py b/src/upload.py new file mode 100644 index 0000000..7c77d5e --- /dev/null +++ b/src/upload.py @@ -0,0 +1,44 @@ +import os +import shutil +import subprocess + +from src.config import UploadConfig +from src.log import logger + + +class UploadS3: + def __init__(self, config: UploadConfig, mode: str = "s5cmd"): + self.config = config + self.mode = mode + + def upload(self, local_file: str, remote_file: str): + logger.info(f'Uploading "{local_file}" to "{remote_file}"') + self.upload_s5cmd(local_file, remote_file) + # clean up local file + shutil.rmtree(local_file) + + def upload_s5cmd(self, local_file, remote_file): + env = os.environ.copy() + + args = [ + "s5cmd", + "--credentials-file", + self.config.credentials_file, + "--endpoint-url", + self.config.endpoint_url, + "cp", + "--acl", + "public-read", + str(local_file), + str(remote_file), + ] + + logger.debug(" ".join(args)) + + subprocess.run( + args, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + env=env, + check=True, + ) diff --git a/src/workflow.py b/src/workflow.py index 254bf02..9465005 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -1,4 +1,5 @@ import traceback +from pathlib import Path from typing import Optional from dask import config @@ -9,6 +10,7 @@ from src.load import loader_registry from src.log import logger from src.pipelines import pipelines_registry +from src.upload import UploadS3 from src.writer import dataset_writer_registry @@ -31,28 +33,26 @@ def __call__(self, shot: int): if self.verbose: logger.setLevel("DEBUG") + writer_config = self.config.writer + self.writer = dataset_writer_registry.create( + writer_config.type, **writer_config.options + ) + self.loader = loader_registry.create("uda") + self.pipelines = pipelines_registry.create(self.facility) + try: self.create_dataset(shot) self.upload_dataset(shot) - self.cleanup(shot) logger.info(f"Done shot #{shot}") except Exception as e: logger.error(f"Failed to run workflow with error {type(e)}: {e}\n") logger.debug(traceback.print_exception(e)) def create_dataset(self, shot: int): - writer_config = self.config.writer - writer = dataset_writer_registry.create( - writer_config.type, **writer_config.options - ) - - loader = loader_registry.create("uda") - pipelines = pipelines_registry.create(self.facility) - builder = DatasetBuilder( - loader, - writer, - pipelines, + self.loader, + self.writer, + self.pipelines, self.include_sources, self.exclude_sources, ) @@ -63,8 +63,12 @@ def upload_dataset(self, shot: int): if self.config.upload is None: return - def cleanup(self, shot: int): - pass + file_name = f"{shot}.{self.writer.file_extension}" + local_file = self.config.writer.options["output_path"] / Path(file_name) + remote_file = f"{self.config.upload.base_path}/" + + uploader = UploadS3(self.config.upload) + uploader.upload(local_file, remote_file) class WorkflowManager: