Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/multipart upload #1

Merged
merged 4 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion outpostcli/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cli_version = "0.0.19"
cli_version = "0.0.20"
CLI_BINARY_NAME = "outpostcli"
129 changes: 129 additions & 0 deletions outpostcli/lfs/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""A simple Git LFS client
"""
import hashlib
import logging
from typing import Any, BinaryIO, Dict, Iterable, List, Optional

import requests
from six.moves import urllib_parse

from . import exc, transfer, types

FILE_READ_BUFFER_SIZE = 4 * 1024 * 1000 # 4mb, why not

_log = logging.getLogger(__name__)


class LfsClient(object):

LFS_MIME_TYPE = 'application/vnd.git-lfs+json'

TRANSFER_ADAPTERS = {'basic': transfer.BasicTransferAdapter,
'multipart-basic': transfer.MultipartTransferAdapter}

TRANSFER_ADAPTER_PRIORITY = ('multipart-basic', 'basic')

def __init__(self, lfs_server_url, auth_token=None, transfer_adapters=TRANSFER_ADAPTER_PRIORITY):
# type: (str, Optional[str], Iterable[str]) -> LfsClient
self._url = lfs_server_url.rstrip('/')
self._auth_token = auth_token
self._transfer_adapters = transfer_adapters

def batch(self, prefix, operation, objects, ref=None, transfers=None):
# type: (str, str, List[Dict[str, Any]], Optional[str], Optional[List[str]]) -> Dict[str, Any]
"""Send a batch request to the LFS server

TODO: allow specifying more than one file for a single batch operation
"""
url = self._url_for(prefix, 'objects', 'batch')
if transfers is None:
transfers = self._transfer_adapters

payload = {'transfers': transfers,
'operation': operation,
'objects': objects}
if ref:
payload['ref'] = ref

headers = {'Content-type': self.LFS_MIME_TYPE,
'Accept': self.LFS_MIME_TYPE}
if self._auth_token:
headers['Authorization'] = 'Bearer {}'.format(self._auth_token)

response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
raise exc.LfsError("Unexpected response from LFS server: {}".format(response.status_code),
status_code=response.status_code)
_log.debug("Got reply for batch request: %s", response.json())
return response.json()

def upload(self, file_obj, organization, repo, **extras):
# type: (BinaryIO, str, str, Any) -> types.ObjectAttributes
"""Upload a file to LFS storage
"""
object_attrs = self._get_object_attrs(file_obj)
self._add_extra_object_attributes(object_attrs, extras)
response = self.batch('{}/{}'.format(organization, repo), 'upload', [object_attrs])

try:
adapter = self.TRANSFER_ADAPTERS[response['transfer']]()
except KeyError:
raise ValueError("Unsupported transfer adapter: {}".format(response['transfer']))

adapter.upload(file_obj, response['objects'][0])
return object_attrs

def download(self, file_obj, object_sha256, object_size, organization, repo, **extras):
# type: (BinaryIO, str, int, str, str, Any) -> None
"""Download a file and save it to file_obj

file_obj is expected to be an file-like object open for writing in binary mode

TODO: allow specifying more than one file for a single batch operation
"""
object_attrs = {"oid": object_sha256, "size": object_size}
self._add_extra_object_attributes(object_attrs, extras)

response = self.batch('{}/{}'.format(organization, repo), 'download', [object_attrs])

try:
adapter = self.TRANSFER_ADAPTERS[response['transfer']]()
except KeyError:
raise ValueError("Unsupported transfer adapter: {}".format(response['transfer']))

return adapter.download(file_obj, response['objects'][0])

def _url_for(self, *segments, **params):
# type: (str, str) -> str
path = '/'.join(segments)
url = '{url}/{path}'.format(url=self._url, path=path)
if params:
url = '{url}?{params}'.format(url=url, params=urllib_parse.urlencode(params))
return url

@staticmethod
def _get_object_attrs(file_obj, **extras):
# type: (BinaryIO, Any) -> types.ObjectAttributes
digest = hashlib.sha256()
try:
while True:
data = file_obj.read(FILE_READ_BUFFER_SIZE)
if data:
digest.update(data)
else:
break

size = file_obj.tell()
oid = digest.hexdigest()
finally:
file_obj.seek(0)

return types.ObjectAttributes(oid=oid, size=size)

@staticmethod
def _add_extra_object_attributes(attributes, extras):
# type: (types.ObjectAttributes, Dict[str, Any]) -> None
"""Add Giftless-specific 'x-...' attributes to an object dict
"""
for k, v in extras.items():
attributes['x-{}'.format(k)] = v
109 changes: 37 additions & 72 deletions outpostcli/lfs/commands.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# ref: https://github.com/huggingface/huggingface_hub/blob/main/src/huggingface_hub/commands/lfs.py
import json
import logging
import os
import subprocess
import sys
from typing import Dict, Optional

import click
import httpx
from outpostkit.repository.lfs.transfer import MultipartTransferAdapter

from outpostcli.constants import CLI_BINARY_NAME
from outpostcli.lfs.utils import HTTPException, SliceFileObj, _raise_for_status
from outpostcli.log import Logger
from outpostcli.lfs.utils import HTTPException
from outpostcli.utils import click_group

# from huggingface_hub.commands import BaseHuggingfaceCLICommand
Expand All @@ -19,6 +19,16 @@

# logger = logging.get_logger(__name__)

_log = logging.getLogger(__name__)
_log.handlers.clear()
file_handler = logging.FileHandler(
"./lfs_command.log", # maybe create a config dir at home, ~/.outpost
)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
_log.addHandler(file_handler)


@click_group()
def lfs():
Expand Down Expand Up @@ -46,13 +56,18 @@ def enable_largefiles(path):
sys.exit(1)

subprocess.run(
f"git config lfs.customtransfer.multipart.path {CLI_BINARY_NAME}".split(),
f"git config lfs.customtransfer.multipart-basic.path {CLI_BINARY_NAME}".split(),
check=True,
cwd=local_path,
)

subprocess.run(
["git", "config", "lfs.customtransfer.multipart.args", f'{LFS_MULTIPART_UPLOAD_COMMAND}'],
[
"git",
"config",
"lfs.customtransfer.multipart-basic.args",
f"{LFS_MULTIPART_UPLOAD_COMMAND}",
],
check=True,
cwd=local_path,
)
Expand Down Expand Up @@ -84,21 +99,32 @@ def read_msg() -> Optional[Dict]:
@lfs.command(name=MULTIPART_UPLOAD_COMMAND_NAME)
def multipart_upload():
try:

"""Command called by git lfs directly and is not meant to be called by the user"""
# ... (rest of the existing code)
init_msg = json.loads(sys.stdin.readline().strip())
if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"):
write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}})
sys.exit(1)

Logger.info(init_msg)
_log.info(init_msg)
# The transfer process should use the information it needs from the
# initiation structure, and also perform any one-off setup tasks it
# needs to do. It should then respond on stdout with a simple empty
# confirmation structure, as follows:
write_msg({})

_bytes_so_far = 0

def on_progress(uploaded_bytes: int):
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": _bytes_so_far + uploaded_bytes,
"bytesSinceLast": uploaded_bytes,
}
)

# After the initiation exchange, git-lfs will send any number of
# transfer requests to the stdin of the transfer process, in a serial sequence.
while True:
Expand All @@ -109,20 +135,8 @@ def multipart_upload():
# On receiving this message the transfer process should
# clean up and terminate. No response is expected.
sys.exit(0)

oid = msg["oid"]
filepath = msg["path"]
completion_url = msg["action"]["href"]
header: Dict = msg["action"]["header"]
chunkSize = int(header.get("chunkSize"))
presignedUrlPrefix: str = header.get("presignedUrlPrefix")
authToken = header.pop("Authorization")
Logger.info(msg)

presignedUrls = [(header.get(key), int(key[len(presignedUrlPrefix):])) for key in header.keys() if key.startswith(presignedUrlPrefix)]

presignedUrls.sort(key=lambda x: x[1])
Logger.info(presignedUrls)

write_msg(
{
Expand All @@ -133,61 +147,12 @@ def multipart_upload():
}
)

parts = []

file_stat = os.stat(filepath)
with httpx.Client() as client:
with open(filepath, "rb") as file:
for presigned_url, partNo in presignedUrls:

startByte = partNo * chunkSize

with SliceFileObj(
file,
seek_from=startByte,
read_limit=chunkSize,
) as data:

response = client.put(presigned_url, content=data, headers={
'Authorization': authToken,
"Content-Length": str(file_stat.st_size - startByte if (((startByte) + chunkSize) > file_stat.st_size) else chunkSize)
},timeout=None)

_raise_for_status(response)

Logger.info(response.headers)

parts.append( {
"ETag" : response.headers.get("etag"),
"PartNumber": partNo
} )

# In order to support progress reporting while data is uploading / downloading,
# the transfer process should post messages to stdout
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": (partNo + 1) * chunkSize,
"bytesSinceLast": chunkSize,
}
)
# Not precise but that's ok.
r = httpx.post(
completion_url,
json={
"oid": oid,
"parts": parts,
},
headers={
'Authorization': authToken
}
)
_raise_for_status(r)

_log.info(msg)
with open(filepath, "rb") as file_obj:
MultipartTransferAdapter().upload(file_obj, msg["action"], on_progress)
write_msg({"event": "complete", "oid": oid})
except HTTPException as e:
Logger.error(e)
_log.error(e)
write_msg({"error": {"code": e.status_code, "message": e.message}})
# except:
# write_msg({"error": {"code": 500, "message": "Something went wrong"}})
Expand Down
Loading