From a270e280205f09c142548444dc12d970cdc57942 Mon Sep 17 00:00:00 2001 From: James Hodson Date: Wed, 18 Sep 2024 12:36:12 +0100 Subject: [PATCH] now able to use --upload arg as an option to enable lakefs versioning, requires a repo name --- src/lake_fs.py | 6 +++--- src/main.py | 5 +++-- src/task.py | 4 ++-- src/uploader.py | 1 + 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/lake_fs.py b/src/lake_fs.py index 1b3dcf1..515cb6f 100644 --- a/src/lake_fs.py +++ b/src/lake_fs.py @@ -18,12 +18,12 @@ def execute_command(command): logging.error("Error executing command: %s", e.stderr) return e -def lakefs_merge_into_main(): +def lakefs_merge_into_main(repo): logging.info("Uploading files from data directory to branch...") command = [ "lakectl", "merge", - f"lakefs://example-repo/ingestion/", - f"lakefs://example-repo/main/", + f"lakefs://{repo}/ingestion/", + f"lakefs://{repo}/main/", "-m", "Merge ingestion branch into main" ] execute_command(command) diff --git a/src/main.py b/src/main.py index 2332429..467f030 100644 --- a/src/main.py +++ b/src/main.py @@ -23,7 +23,7 @@ def main(): parser.add_argument("--credentials_file", default="lakectl.cfg") parser.add_argument("--serial", default=False, action='store_true') parser.add_argument("--endpoint_url", default="http://localhost:8000") - parser.add_argument("--upload", default=False, action="store_true") + parser.add_argument("--upload", nargs='?', const=False, default=False) parser.add_argument("--metadata_dir", default="data/uda") parser.add_argument("--force", action="store_true") parser.add_argument("--signal_names", nargs="*", default=[]) @@ -36,6 +36,7 @@ def main(): config = LakeFSUploadConfig( credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, + repository=args.upload ) workflow_cls = partial(LakeFSIngestionWorkflow, upload_config=config) else: @@ -63,7 +64,7 @@ def main(): logging.info(f"Finished source {source}") if args.upload: - lakefs_merge_into_main() + lakefs_merge_into_main(args.upload) if __name__ == "__main__": main() diff --git a/src/task.py b/src/task.py index af59841..18b57f3 100644 --- a/src/task.py +++ b/src/task.py @@ -44,7 +44,7 @@ def __call__(self): logging.info(f"Uploading {self.local_file} to LakeFS.") args = [ "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{self.shot_name}", + f"lakefs://{self.config.repository}/ingestion/{self.shot_name}", "-s", str(self.local_file), "--recursive" ] @@ -78,7 +78,7 @@ def __call__(self): logging.info(f"Commit {self.local_file} to branch.") args = [ "lakectl", "commit", - f"lakefs://example-repo/ingestion/", + f"lakefs://{self.config.repository}/ingestion/", "-m", f"Commit file {self.local_file}" ] diff --git a/src/uploader.py b/src/uploader.py index b9a8547..337ae5e 100644 --- a/src/uploader.py +++ b/src/uploader.py @@ -11,3 +11,4 @@ class UploadConfig: class LakeFSUploadConfig: endpoint_url: str credentials_file: str + repository: str