Skip to content

Commit

Permalink
0.0.61
Browse files Browse the repository at this point in the history
  • Loading branch information
aj-ya committed Jun 9, 2024
1 parent 0f90519 commit d1840f4
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 90 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ share/python-wheels/
*.egg
MANIFEST
venv
local_repos/**
2 changes: 1 addition & 1 deletion outpostcli/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cli_version = "0.0.60"
cli_version = "0.0.61"
CLI_BINARY_NAME = "outpostcli"
93 changes: 75 additions & 18 deletions outpostcli/lfs/file_slice.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,96 @@
import io
import os
from contextlib import AbstractContextManager
from typing import BinaryIO


class FileSlice(AbstractContextManager):
class SliceFileObj(AbstractContextManager):
"""
File-like object that only reads a slice of a file
Utility context manager to read a *slice* of a seekable file-like object as a seekable, file-like object.
This is NOT thread safe
Inspired by stackoverflow.com/a/29838711/593036
Credits to @julien-c
Args:
fileobj (`BinaryIO`):
A file-like object to slice. MUST implement `tell()` and `seek()` (and `read()` of course).
`fileobj` will be reset to its original position when exiting the context manager.
seek_from (`int`):
The start of the slice (offset from position 0 in bytes).
read_limit (`int`):
The maximum number of bytes to read from the slice.
Attributes:
previous_position (`int`):
The previous position
Examples:
Reading 200 bytes with an offset of 128 bytes from a file (ie bytes 128 to 327):
```python
>>> with open("path/to/file", "rb") as file:
... with SliceFileObj(file, seek_from=128, read_limit=200) as fslice:
... fslice.read(...)
```
Reading a file in chunks of 512 bytes
```python
>>> import os
>>> chunk_size = 512
>>> file_size = os.getsize("path/to/file")
>>> with open("path/to/file", "rb") as file:
... for chunk_idx in range(ceil(file_size / chunk_size)):
... with SliceFileObj(file, seek_from=chunk_idx * chunk_size, read_limit=chunk_size) as fslice:
... chunk = fslice.read(...)
```
"""

def __init__(self, filepath: str, seek_from: int, read_limit: int):
self.filepath = filepath
def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int):
self.fileobj = fileobj
self.seek_from = seek_from
self.read_limit = read_limit
self.n_seen = 0

def __enter__(self):
self.f = open(self.filepath, "rb")
self.f.seek(self.seek_from)
self._previous_position = self.fileobj.tell()
end_of_stream = self.fileobj.seek(0, os.SEEK_END)
self._len = min(self.read_limit, end_of_stream - self.seek_from)
# ^^ The actual number of bytes that can be read from the slice
self.fileobj.seek(self.seek_from, io.SEEK_SET)
return self

def __len__(self):
total_length = os.fstat(self.f.fileno()).st_size
return min(self.read_limit, total_length - self.seek_from)
def __exit__(self, exc_type, exc_value, traceback):
self.fileobj.seek(self._previous_position, io.SEEK_SET)

def read(self, n=-1):
if self.n_seen >= self.read_limit:
def read(self, n: int = -1):
pos = self.tell()
if pos >= self._len:
return b""
remaining_amount = self.read_limit - self.n_seen
data = self.f.read(remaining_amount if n < 0 else min(n, remaining_amount))
self.n_seen += len(data)
remaining_amount = self._len - pos
data = self.fileobj.read(
remaining_amount if n < 0 else min(n, remaining_amount)
)
return data

def tell(self) -> int:
return self.fileobj.tell() - self.seek_from

def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
start = self.seek_from
end = start + self._len
if whence in (os.SEEK_SET, os.SEEK_END):
offset = start + offset if whence == os.SEEK_SET else end + offset
offset = max(start, min(offset, end))
whence = os.SEEK_SET
elif whence == os.SEEK_CUR:
cur_pos = self.fileobj.tell()
offset = max(start - cur_pos, min(offset, end - cur_pos))
else:
raise ValueError(f"whence value {whence} is not supported")
return self.fileobj.seek(offset, whence) - self.seek_from

def __iter__(self):
yield self.read(n=4 * 1024 * 1024)

def __exit__(self, *args):
self.f.close()
68 changes: 41 additions & 27 deletions outpostcli/lfs/part.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential

from outpostcli.lfs.exc import LFSException, ProxyLFSException, handle_request_errors
from outpostcli.lfs.file_slice import FileSlice
from outpostcli.lfs.file_slice import SliceFileObj
from outpostcli.lfs.parallel import map_wrap
from outpostcli.lfs.types import UploadedPartObject

Expand All @@ -28,35 +28,49 @@ class PartInfo:
wait=wait_exponential(multiplier=1, min=4, max=60), # Exponential backoff
before_sleep=before_sleep_log(_log, logging.INFO, exc_info=True),
)
def retriable_upload_part(url: str, data: FileSlice):
r = requests.put(url, data=data)
def retriable_upload_part(url: str, data: SliceFileObj):
# headers = {
# "Content-Type": "application/octet-stream",
# "Content-Length": str(data._len),
# }
r = requests.put(
url,
data=data,
)
r.raise_for_status()
return r

# with httpx.Client() as client:
# response = client.put(url, content=data, headers=headers)
# response.raise_for_status()
# _log.info({"etag": response.headers.__dict__})
# return response


@map_wrap
def transfer_part(part: PartInfo):
with FileSlice(
part.filepath,
seek_from=(part.no - 1) * part.chunk_size,
read_limit=part.chunk_size,
) as data:
try:
_log.info(f"uploading part of {part.filepath}, part no: {part.no}")
r = retriable_upload_part(part.url, data)
if isinstance(r, ProxyLFSException):
return LFSException(code=r.code, message=r.message, doc_url=r.doc_url)
else:
etag = str(r.headers.get("etag"))
_log.info(
f"completed upload part of {part.filepath}, part no: {part.no}, etag: {etag}"
)
return UploadedPartObject(
{
"etag": etag,
"part_number": part.no,
}
)
except Exception as e:
_log.error(e, exc_info=True)
return ProxyLFSException(code=500, message=f"Unhandled Error: {e}")
with open(part.filepath, "rb") as fileobj:
with SliceFileObj(
fileobj,
seek_from=(part.no - 1) * part.chunk_size,
read_limit=part.chunk_size,
) as data:
try:
_log.info(f"uploading part of {part.filepath}, part no: {part.no}")
r = retriable_upload_part(part.url, data)
if isinstance(r, ProxyLFSException):
return r
else:
etag = str(r.headers.get("etag"))
_log.info(
f"completed upload part of {part.filepath}, part no: {part.no}, etag: {etag}"
)
return UploadedPartObject(
{
"etag": etag,
"part_number": part.no,
}
)
except Exception as e:
_log.error(e, exc_info=True)
return ProxyLFSException(code=500, message=f"Unhandled Error: {e}")
137 changes: 98 additions & 39 deletions outpostcli/lfs/storage_class/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from multiprocessing import cpu_count
from typing import Any, Dict, List, Literal, Tuple, TypedDict

# from hf_transfer import multipart_upload
from outpostkit.repository.lfs.logger import create_lfs_logger

from outpostcli.lfs.comms import write_msg
Expand All @@ -12,10 +13,32 @@
complete_multipart_upload,
try_extracting_part_number,
)
from outpostcli.lfs.types import UploadedPartObject

_log = create_lfs_logger(__name__)


class FileUploadProgress:
def __init__(self, oid: str) -> None:
self.progress = 0
self.oid = oid
self.chunks = 0

def update(self, sz: int) -> None:
self.progress += sz
self.chunks += 1

_log.info({"oid": self.oid, "chunk_no": sz})
write_msg(
{
"event": "progress",
"oid": self.oid,
"bytesSoFar": self.progress,
"bytesSinceLast": sz,
}
)


class S3UploadActionDetails(TypedDict):
storage_provider: Literal["s3"]
chunk_size: str
Expand Down Expand Up @@ -48,55 +71,57 @@ def s3_multipart_upload(msg: Dict[str, Any]):
# if i can extract part number from url, no need for this.
# presigned_urls: List[str] = list(header.values())
# tbf the above can suffice as all the other headers are popped.

pre_signed_urls: List[Tuple[int, str]] = []
for k, v in header.items():
pNo = try_extracting_part_number(k)
if pNo:
pre_signed_urls.append((pNo, v))

sorted_urls = ["" for _ in range(len(pre_signed_urls))]

for u in pre_signed_urls:
sorted_urls[u[0] - 1] = u[1]

_log.info(
f"Starting multipart upload, oid={oid} num_parts={len(pre_signed_urls)}, chunk_size={chunk_size}"
f"Starting S3 multipart upload, oid={oid} num_parts={len(pre_signed_urls)}, chunk_size={chunk_size}"
)
parts = []
parts: List[UploadedPartObject] = []

cores = cpu_count()
_log.info({"cores": cores})
bytes_so_far = 0
progress_controller = FileUploadProgress(oid)
try:
for part_no, signed_url in pre_signed_urls:
part_info = PartInfo(filepath, part_no, chunk_size, signed_url)
resp = transfer_part(part_info)
if isinstance(resp, ProxyLFSException):
raise LFSException(
code=resp.code,
message=resp.message,
doc_url=resp.doc_url,
)
else:
bytes_so_far += chunk_size
# Not precise but that's ok.
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": bytes_so_far,
"bytesSinceLast": chunk_size,
}
)
parts.append(resp)
pass

# with multimap(cores) as pmap:
# for resp in pmap(
# transfer_part,
# (
# PartInfo(
# filepath,
# part_no,
# chunk_size,
# signed_url,
# )
# for (part_no, signed_url) in pre_signed_urls
# ),
# ):

# hf_transfer
# output = multipart_upload(
# file_path=filepath,
# parts_urls=sorted_urls,
# chunk_size=chunk_size,
# max_files=128,
# parallel_failures=127, # could be removed
# max_retries=5,
# callback=lambda sz: progress_controller.update(sz),
# )
# for _idx, header in enumerate(output):
# etag = header.get("etag")
# if etag is None or etag == "":
# raise ValueError(
# f"Invalid etag (`{etag}`) returned for part {_idx + 1}"
# )
# parts.append(UploadedPartObject(etag=etag, part_number=_idx + 1))
# write_msg(
# {
# "event": "progress",
# "oid": oid,
# "bytesSoFar": bytes_so_far,
# "bytesSinceLast": chunk_size,
# }
# )
# try:
# for part_no, signed_url in pre_signed_urls:
# part_info = PartInfo(filepath, part_no, chunk_size, signed_url)
# resp = transfer_part(part_info)
# if isinstance(resp, ProxyLFSException):
# raise LFSException(
# code=resp.code,
Expand All @@ -116,6 +141,40 @@ def s3_multipart_upload(msg: Dict[str, Any]):
# )
# parts.append(resp)
# pass

bytes_so_far = 0
with multimap(cores) as pmap:
for resp in pmap(
transfer_part,
(
PartInfo(
filepath,
part_no,
chunk_size,
signed_url,
)
for (part_no, signed_url) in pre_signed_urls
),
):
if isinstance(resp, ProxyLFSException):
raise LFSException(
code=resp.code,
message=resp.message,
doc_url=resp.doc_url,
)
else:
bytes_so_far += chunk_size
# Not precise but that's ok.
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": bytes_so_far,
"bytesSinceLast": chunk_size,
}
)
parts.append(resp)
pass
complete_resp = complete_multipart_upload(complete_url, parts)
if isinstance(complete_resp, LFSException):
abort_resp = abort_multipart_upload(abort_url)
Expand Down
Loading

0 comments on commit d1840f4

Please sign in to comment.