Skip to content

Commit

Permalink
now able to use --upload arg as an option to enable lakefs versioning…
Browse files Browse the repository at this point in the history
…, requires a repo name
  • Loading branch information
James Hodson committed Sep 18, 2024
1 parent ade3c38 commit a270e28
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 7 deletions.
6 changes: 3 additions & 3 deletions src/lake_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ def execute_command(command):
logging.error("Error executing command: %s", e.stderr)
return e

def lakefs_merge_into_main():
def lakefs_merge_into_main(repo):
logging.info("Uploading files from data directory to branch...")
command = [
"lakectl", "merge",
f"lakefs://example-repo/ingestion/",
f"lakefs://example-repo/main/",
f"lakefs://{repo}/ingestion/",
f"lakefs://{repo}/main/",
"-m", "Merge ingestion branch into main"
]
execute_command(command)
Expand Down
5 changes: 3 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def main():
parser.add_argument("--credentials_file", default="lakectl.cfg")
parser.add_argument("--serial", default=False, action='store_true')
parser.add_argument("--endpoint_url", default="http://localhost:8000")
parser.add_argument("--upload", default=False, action="store_true")
parser.add_argument("--upload", nargs='?', const=False, default=False)
parser.add_argument("--metadata_dir", default="data/uda")
parser.add_argument("--force", action="store_true")
parser.add_argument("--signal_names", nargs="*", default=[])
Expand All @@ -36,6 +36,7 @@ def main():
config = LakeFSUploadConfig(
credentials_file=args.credentials_file,
endpoint_url=args.endpoint_url,
repository=args.upload
)
workflow_cls = partial(LakeFSIngestionWorkflow, upload_config=config)
else:
Expand Down Expand Up @@ -63,7 +64,7 @@ def main():
logging.info(f"Finished source {source}")

if args.upload:
lakefs_merge_into_main()
lakefs_merge_into_main(args.upload)

if __name__ == "__main__":
main()
4 changes: 2 additions & 2 deletions src/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __call__(self):
logging.info(f"Uploading {self.local_file} to LakeFS.")
args = [
"lakectl", "fs", "upload",
f"lakefs://example-repo/ingestion/{self.shot_name}",
f"lakefs://{self.config.repository}/ingestion/{self.shot_name}",
"-s", str(self.local_file), "--recursive"
]

Expand Down Expand Up @@ -78,7 +78,7 @@ def __call__(self):
logging.info(f"Commit {self.local_file} to branch.")
args = [
"lakectl", "commit",
f"lakefs://example-repo/ingestion/",
f"lakefs://{self.config.repository}/ingestion/",
"-m", f"Commit file {self.local_file}"
]

Expand Down
1 change: 1 addition & 0 deletions src/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class UploadConfig:
class LakeFSUploadConfig:
endpoint_url: str
credentials_file: str
repository: str

0 comments on commit a270e28

Please sign in to comment.