diff --git a/outpostcli/constants.py b/outpostcli/constants.py index 04c84fb..e0330c7 100644 --- a/outpostcli/constants.py +++ b/outpostcli/constants.py @@ -1,2 +1,2 @@ -cli_version = "0.0.19" +cli_version = "0.0.20" CLI_BINARY_NAME = "outpostcli" diff --git a/outpostcli/lfs/client.py b/outpostcli/lfs/client.py new file mode 100644 index 0000000..6c28e10 --- /dev/null +++ b/outpostcli/lfs/client.py @@ -0,0 +1,129 @@ +"""A simple Git LFS client +""" +import hashlib +import logging +from typing import Any, BinaryIO, Dict, Iterable, List, Optional + +import requests +from six.moves import urllib_parse + +from . import exc, transfer, types + +FILE_READ_BUFFER_SIZE = 4 * 1024 * 1000 # 4mb, why not + +_log = logging.getLogger(__name__) + + +class LfsClient(object): + + LFS_MIME_TYPE = 'application/vnd.git-lfs+json' + + TRANSFER_ADAPTERS = {'basic': transfer.BasicTransferAdapter, + 'multipart-basic': transfer.MultipartTransferAdapter} + + TRANSFER_ADAPTER_PRIORITY = ('multipart-basic', 'basic') + + def __init__(self, lfs_server_url, auth_token=None, transfer_adapters=TRANSFER_ADAPTER_PRIORITY): + # type: (str, Optional[str], Iterable[str]) -> LfsClient + self._url = lfs_server_url.rstrip('/') + self._auth_token = auth_token + self._transfer_adapters = transfer_adapters + + def batch(self, prefix, operation, objects, ref=None, transfers=None): + # type: (str, str, List[Dict[str, Any]], Optional[str], Optional[List[str]]) -> Dict[str, Any] + """Send a batch request to the LFS server + + TODO: allow specifying more than one file for a single batch operation + """ + url = self._url_for(prefix, 'objects', 'batch') + if transfers is None: + transfers = self._transfer_adapters + + payload = {'transfers': transfers, + 'operation': operation, + 'objects': objects} + if ref: + payload['ref'] = ref + + headers = {'Content-type': self.LFS_MIME_TYPE, + 'Accept': self.LFS_MIME_TYPE} + if self._auth_token: + headers['Authorization'] = 'Bearer {}'.format(self._auth_token) + + response = requests.post(url, json=payload, headers=headers) + if response.status_code != 200: + raise exc.LfsError("Unexpected response from LFS server: {}".format(response.status_code), + status_code=response.status_code) + _log.debug("Got reply for batch request: %s", response.json()) + return response.json() + + def upload(self, file_obj, organization, repo, **extras): + # type: (BinaryIO, str, str, Any) -> types.ObjectAttributes + """Upload a file to LFS storage + """ + object_attrs = self._get_object_attrs(file_obj) + self._add_extra_object_attributes(object_attrs, extras) + response = self.batch('{}/{}'.format(organization, repo), 'upload', [object_attrs]) + + try: + adapter = self.TRANSFER_ADAPTERS[response['transfer']]() + except KeyError: + raise ValueError("Unsupported transfer adapter: {}".format(response['transfer'])) + + adapter.upload(file_obj, response['objects'][0]) + return object_attrs + + def download(self, file_obj, object_sha256, object_size, organization, repo, **extras): + # type: (BinaryIO, str, int, str, str, Any) -> None + """Download a file and save it to file_obj + + file_obj is expected to be an file-like object open for writing in binary mode + + TODO: allow specifying more than one file for a single batch operation + """ + object_attrs = {"oid": object_sha256, "size": object_size} + self._add_extra_object_attributes(object_attrs, extras) + + response = self.batch('{}/{}'.format(organization, repo), 'download', [object_attrs]) + + try: + adapter = self.TRANSFER_ADAPTERS[response['transfer']]() + except KeyError: + raise ValueError("Unsupported transfer adapter: {}".format(response['transfer'])) + + return adapter.download(file_obj, response['objects'][0]) + + def _url_for(self, *segments, **params): + # type: (str, str) -> str + path = '/'.join(segments) + url = '{url}/{path}'.format(url=self._url, path=path) + if params: + url = '{url}?{params}'.format(url=url, params=urllib_parse.urlencode(params)) + return url + + @staticmethod + def _get_object_attrs(file_obj, **extras): + # type: (BinaryIO, Any) -> types.ObjectAttributes + digest = hashlib.sha256() + try: + while True: + data = file_obj.read(FILE_READ_BUFFER_SIZE) + if data: + digest.update(data) + else: + break + + size = file_obj.tell() + oid = digest.hexdigest() + finally: + file_obj.seek(0) + + return types.ObjectAttributes(oid=oid, size=size) + + @staticmethod + def _add_extra_object_attributes(attributes, extras): + # type: (types.ObjectAttributes, Dict[str, Any]) -> None + """Add Giftless-specific 'x-...' attributes to an object dict + """ + for k, v in extras.items(): + attributes['x-{}'.format(k)] = v \ No newline at end of file diff --git a/outpostcli/lfs/commands.py b/outpostcli/lfs/commands.py index 572d382..8d25311 100644 --- a/outpostcli/lfs/commands.py +++ b/outpostcli/lfs/commands.py @@ -1,16 +1,16 @@ # ref: https://github.com/huggingface/huggingface_hub/blob/main/src/huggingface_hub/commands/lfs.py import json +import logging import os import subprocess import sys from typing import Dict, Optional import click -import httpx +from outpostkit.repository.lfs.transfer import MultipartTransferAdapter from outpostcli.constants import CLI_BINARY_NAME -from outpostcli.lfs.utils import HTTPException, SliceFileObj, _raise_for_status -from outpostcli.log import Logger +from outpostcli.lfs.utils import HTTPException from outpostcli.utils import click_group # from huggingface_hub.commands import BaseHuggingfaceCLICommand @@ -19,6 +19,16 @@ # logger = logging.get_logger(__name__) +_log = logging.getLogger(__name__) +_log.handlers.clear() +file_handler = logging.FileHandler( + "./lfs_command.log", # maybe create a config dir at home, ~/.outpost +) +file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +) +_log.addHandler(file_handler) + @click_group() def lfs(): @@ -46,13 +56,18 @@ def enable_largefiles(path): sys.exit(1) subprocess.run( - f"git config lfs.customtransfer.multipart.path {CLI_BINARY_NAME}".split(), + f"git config lfs.customtransfer.multipart-basic.path {CLI_BINARY_NAME}".split(), check=True, cwd=local_path, ) subprocess.run( - ["git", "config", "lfs.customtransfer.multipart.args", f'{LFS_MULTIPART_UPLOAD_COMMAND}'], + [ + "git", + "config", + "lfs.customtransfer.multipart-basic.args", + f"{LFS_MULTIPART_UPLOAD_COMMAND}", + ], check=True, cwd=local_path, ) @@ -84,7 +99,6 @@ def read_msg() -> Optional[Dict]: @lfs.command(name=MULTIPART_UPLOAD_COMMAND_NAME) def multipart_upload(): try: - """Command called by git lfs directly and is not meant to be called by the user""" # ... (rest of the existing code) init_msg = json.loads(sys.stdin.readline().strip()) @@ -92,13 +106,25 @@ def multipart_upload(): write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}}) sys.exit(1) - Logger.info(init_msg) + _log.info(init_msg) # The transfer process should use the information it needs from the # initiation structure, and also perform any one-off setup tasks it # needs to do. It should then respond on stdout with a simple empty # confirmation structure, as follows: write_msg({}) + _bytes_so_far = 0 + + def on_progress(uploaded_bytes: int): + write_msg( + { + "event": "progress", + "oid": oid, + "bytesSoFar": _bytes_so_far + uploaded_bytes, + "bytesSinceLast": uploaded_bytes, + } + ) + # After the initiation exchange, git-lfs will send any number of # transfer requests to the stdin of the transfer process, in a serial sequence. while True: @@ -109,20 +135,8 @@ def multipart_upload(): # On receiving this message the transfer process should # clean up and terminate. No response is expected. sys.exit(0) - oid = msg["oid"] filepath = msg["path"] - completion_url = msg["action"]["href"] - header: Dict = msg["action"]["header"] - chunkSize = int(header.get("chunkSize")) - presignedUrlPrefix: str = header.get("presignedUrlPrefix") - authToken = header.pop("Authorization") - Logger.info(msg) - - presignedUrls = [(header.get(key), int(key[len(presignedUrlPrefix):])) for key in header.keys() if key.startswith(presignedUrlPrefix)] - - presignedUrls.sort(key=lambda x: x[1]) - Logger.info(presignedUrls) write_msg( { @@ -133,61 +147,12 @@ def multipart_upload(): } ) - parts = [] - - file_stat = os.stat(filepath) - with httpx.Client() as client: - with open(filepath, "rb") as file: - for presigned_url, partNo in presignedUrls: - - startByte = partNo * chunkSize - - with SliceFileObj( - file, - seek_from=startByte, - read_limit=chunkSize, - ) as data: - - response = client.put(presigned_url, content=data, headers={ - 'Authorization': authToken, - "Content-Length": str(file_stat.st_size - startByte if (((startByte) + chunkSize) > file_stat.st_size) else chunkSize) - },timeout=None) - - _raise_for_status(response) - - Logger.info(response.headers) - - parts.append( { - "ETag" : response.headers.get("etag"), - "PartNumber": partNo - } ) - - # In order to support progress reporting while data is uploading / downloading, - # the transfer process should post messages to stdout - write_msg( - { - "event": "progress", - "oid": oid, - "bytesSoFar": (partNo + 1) * chunkSize, - "bytesSinceLast": chunkSize, - } - ) - # Not precise but that's ok. - r = httpx.post( - completion_url, - json={ - "oid": oid, - "parts": parts, - }, - headers={ - 'Authorization': authToken - } - ) - _raise_for_status(r) - + _log.info(msg) + with open(filepath, "rb") as file_obj: + MultipartTransferAdapter().upload(file_obj, msg["action"], on_progress) write_msg({"event": "complete", "oid": oid}) except HTTPException as e: - Logger.error(e) + _log.error(e) write_msg({"error": {"code": e.status_code, "message": e.message}}) # except: # write_msg({"error": {"code": 500, "message": "Something went wrong"}}) diff --git a/outpostcli/lfs/types.py b/outpostcli/lfs/types.py new file mode 100644 index 0000000..72c3b2b --- /dev/null +++ b/outpostcli/lfs/types.py @@ -0,0 +1,86 @@ +"""Some useful type definitions for Git LFS API and transfer protocols +""" + +import sys +from typing import Any, Dict, List, Optional + +if sys.version_info >= (3, 8): + from typing import TypedDict +else: + from typing_extensions import TypedDict + + +ObjectAttributes = TypedDict( + "ObjectAttributes", + { + "oid": str, + "size": int, + }, +) + +BasicActionAttributes = TypedDict( + "BasicActionAttributes", + {"href": str, "header": Optional[Dict[str, str]], "expires_in": int}, +) + +BasicUploadActions = TypedDict( + "BasicUploadActions", + { + "upload": BasicActionAttributes, + "verify": BasicActionAttributes, + }, + total=False, +) + +BasicDownloadActions = TypedDict( + "BasicDownloadActions", + { + "download": BasicActionAttributes, + }, + total=False, +) + +UploadObjectAttributes = TypedDict( + "UploadObjectAttributes", + { + "actions": BasicUploadActions, + "oid": str, + "size": int, + "authenticated": Optional[bool], + }, + total=False, +) + +DownloadObjectAttributes = TypedDict( + "DownloadObjectAttributes", + { + "actions": BasicDownloadActions, + "oid": str, + "size": int, + "authenticated": Optional[bool], + }, + total=False, +) + +MultipartUploadActions = TypedDict( + "MultipartUploadActions", + { + "init": Dict[str, Any], + "commit": Dict[str, Any], + "parts": List[Dict[str, Any]], + "abort": Dict[str, Any], + "verify": Dict[str, Any], + }, + total=False, +) + +MultipartUploadObjectAttributes = TypedDict( + "MultipartUploadObjectAttributes", + { + "actions": MultipartUploadActions, + "oid": str, + "size": int, + "authenticated": Optional[bool], + }, + total=False, +) diff --git a/outpostcli/log.py b/outpostcli/log.py deleted file mode 100644 index e13bdbb..0000000 --- a/outpostcli/log.py +++ /dev/null @@ -1,15 +0,0 @@ -import os -import tempfile -from time import time - -from loguru import logger - -Logger = logger - -# Logger.configure( -# handlers=[ -# # dict(sink=sys.stderr, format="[{time}] {message}"), -# dict(sink=os.path.join(tempfile.gettempdir(), f"{int(time())}.log"), format="[{time}] {message}", enqueue=True, colorize=False), -# ] -# ) -Logger.add(os.path.join(tempfile.gettempdir(), f"{int(time())}.log"), colorize=False, enqueue=True) diff --git a/pyproject.toml b/pyproject.toml index 2ecfb30..8762aa2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,18 +4,17 @@ build-backend = "setuptools.build_meta" [project] name = "outpostcli" -version = "0.0.19" +version = "0.0.20" description = "CLI for Outpost" readme = "README.md" license = { file = "LICENSE" } authors = [{ name = "Outpost Innovations, Inc." }] requires-python = ">=3.8" dependencies = [ - "outpostkit>=0.0.57", + "outpostkit>=0.0.64", "click", "rich", "requests", - "loguru" ] optional-dependencies = { dev = [ "ruff", diff --git a/requirements-dev.txt b/requirements-dev.txt index c6531cf..98a4c59 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -32,15 +32,13 @@ idna==3.6 # anyio # httpx # requests -loguru==0.7.2 - # via outpostcli (pyproject.toml) markdown-it-py==3.0.0 # via rich mdurl==0.1.2 # via markdown-it-py mypy-extensions==1.0.0 # via black -outpostkit==0.0.52 +outpostkit==0.0.64 # via outpostcli (pyproject.toml) packaging==23.2 # via diff --git a/requirements.txt b/requirements.txt index 4967d5a..44c4a90 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,13 +28,11 @@ idna==3.6 # anyio # httpx # requests -loguru==0.7.2 - # via outpostcli (pyproject.toml) markdown-it-py==3.0.0 # via rich mdurl==0.1.2 # via markdown-it-py -outpostkit==0.0.57 +outpostkit==0.0.64 # via outpostcli (pyproject.toml) packaging==23.2 # via outpostkit diff --git a/setup.py b/setup.py index 21332d2..e6fb65e 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="outpostcli", - version="0.0.7", + version="0.0.20", py_modules=["outpostcli"], install_requires=["Click", "outpostkit"], entry_points={