Skip to content

Commit

Permalink
Update workflow to upload to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
samueljackson92 committed Dec 18, 2024
1 parent 3754551 commit 82238e8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ uda
zarr
rich
pydantic
s5cmd
# git+ssh://git@git.ccfe.ac.uk/MAST-U/mastcodes.git@release/1.3.10#subdirectory=uda/python
44 changes: 44 additions & 0 deletions src/upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import shutil
import subprocess

from src.config import UploadConfig
from src.log import logger


class UploadS3:
def __init__(self, config: UploadConfig, mode: str = "s5cmd"):
self.config = config
self.mode = mode

def upload(self, local_file: str, remote_file: str):
logger.info(f'Uploading "{local_file}" to "{remote_file}"')
self.upload_s5cmd(local_file, remote_file)
# clean up local file
shutil.rmtree(local_file)

def upload_s5cmd(self, local_file, remote_file):
env = os.environ.copy()

args = [
"s5cmd",
"--credentials-file",
self.config.credentials_file,
"--endpoint-url",
self.config.endpoint_url,
"cp",
"--acl",
"public-read",
str(local_file),
str(remote_file),
]

logger.debug(" ".join(args))

subprocess.run(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
env=env,
check=True,
)
32 changes: 18 additions & 14 deletions src/workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import traceback
from pathlib import Path
from typing import Optional

from dask import config
Expand All @@ -9,6 +10,7 @@
from src.load import loader_registry
from src.log import logger
from src.pipelines import pipelines_registry
from src.upload import UploadS3
from src.writer import dataset_writer_registry


Expand All @@ -31,28 +33,26 @@ def __call__(self, shot: int):
if self.verbose:
logger.setLevel("DEBUG")

writer_config = self.config.writer
self.writer = dataset_writer_registry.create(
writer_config.type, **writer_config.options
)
self.loader = loader_registry.create("uda")
self.pipelines = pipelines_registry.create(self.facility)

try:
self.create_dataset(shot)
self.upload_dataset(shot)
self.cleanup(shot)
logger.info(f"Done shot #{shot}")
except Exception as e:
logger.error(f"Failed to run workflow with error {type(e)}: {e}\n")
logger.debug(traceback.print_exception(e))

def create_dataset(self, shot: int):
writer_config = self.config.writer
writer = dataset_writer_registry.create(
writer_config.type, **writer_config.options
)

loader = loader_registry.create("uda")
pipelines = pipelines_registry.create(self.facility)

builder = DatasetBuilder(
loader,
writer,
pipelines,
self.loader,
self.writer,
self.pipelines,
self.include_sources,
self.exclude_sources,
)
Expand All @@ -63,8 +63,12 @@ def upload_dataset(self, shot: int):
if self.config.upload is None:
return

def cleanup(self, shot: int):
pass
file_name = f"{shot}.{self.writer.file_extension}"
local_file = self.config.writer.options["output_path"] / Path(file_name)
remote_file = f"{self.config.upload.base_path}/"

uploader = UploadS3(self.config.upload)
uploader.upload(local_file, remote_file)


class WorkflowManager:
Expand Down

0 comments on commit 82238e8

Please sign in to comment.