Skip to content

Commit

Permalink
Merge pull request #2 from outpostHQ/enhance/gitserver
Browse files Browse the repository at this point in the history
Enhance/gitserver
  • Loading branch information
aj-ya authored May 20, 2024
2 parents 81a9e7c + 6b096d4 commit 6f4e2c4
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 96 deletions.
89 changes: 55 additions & 34 deletions outpostcli/lfs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
from outpostkit.repository.lfs.logger import create_lfs_logger

from outpostcli.constants import CLI_BINARY_NAME
from outpostcli.lfs.exc import LFSException, ProxyLFSException
from outpostcli.lfs.parallel import multimap
from outpostcli.lfs.part import PartInfo, transfer_part
from outpostcli.lfs.utils import part_dict_list_to_xml
from outpostcli.lfs.utils import (
abort_multipart_upload,
complete_multipart_upload,
)
from outpostcli.utils import click_group


Expand Down Expand Up @@ -142,6 +146,7 @@ def on_progress(oid: str, uploaded_bytes: int):
completion_url = msg["action"]["href"]
header: Dict[str, str] = msg["action"]["header"]
chunk_size = int(header.pop("chunk_size"))
abort_url = str(header.pop("abort_url"))
presigned_urls: List[str] = list(header.values())

# if i can extract part number from url, no need for this.
Expand All @@ -154,43 +159,59 @@ def on_progress(oid: str, uploaded_bytes: int):
cores = cpu_count()
_log.info({"cores": cores})
bytes_so_far = 0
with multimap(cores) as pmap:
for resp in pmap(
transfer_part,
(
PartInfo(filepath, i + 1, chunk_size, part)
for (i, part) in enumerate(presigned_urls)
),
):
bytes_so_far += chunk_size
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": bytes_so_far,
"bytesSinceLast": chunk_size,
try:
with multimap(cores) as pmap:
for resp in pmap(
transfer_part,
(
PartInfo(filepath, i + 1, chunk_size, part)
for (i, part) in enumerate(presigned_urls)
),
):
if isinstance(resp, ProxyLFSException):
raise LFSException(
code=resp.code,
message=resp.message,
doc_url=resp.doc_url,
)
else:
bytes_so_far += chunk_size
# Not precise but that's ok.
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": bytes_so_far,
"bytesSinceLast": chunk_size,
}
)
parts.append(resp)
pass
complete_multipart_upload(completion_url, parts)
write_msg({"event": "complete", "oid": oid})
except LFSException as e:
abort_multipart_upload(abort_url)
write_msg({"error": {"code": e.code, "message": e.message}})
except requests.HTTPError as e:
abort_multipart_upload(abort_url)
_log.error(e, exc_info=True)
write_msg(
{
"error": {
"code": e.response.status_code,
"message": e.response.text,
}
)
parts.append(resp)
pass
# for i, presigned_url in enumerate(presigned_urls):

# Not precise but that's ok.
_log.info(parts)
r = requests.post(
completion_url,
data=part_dict_list_to_xml(parts),
)
r.raise_for_status()
write_msg({"event": "complete", "oid": oid})
except requests.HTTPError as e:
_log.error(e, exc_info=True)
write_msg(
{"error": {"code": e.response.status_code, "message": e.response.text}}
)
}
)
except Exception as e:
abort_multipart_upload(abort_url)
_log.error(e, exc_info=True)
raise
except Exception as e:
_log.error(e, exc_info=True)
raise

# _log.info({"parts": parts})

if __name__ == "__main__":
lfs()
49 changes: 49 additions & 0 deletions outpostcli/lfs/exc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from dataclasses import dataclass
from functools import wraps
from typing import Optional

import requests
from outpostkit.repository.lfs.logger import create_lfs_logger

_log = create_lfs_logger(__name__)


class LFSException(Exception):
def __init__(
self, code: int, message: str, doc_url: Optional[str] = None, *args: object
) -> None:
self.message = message
self.code = code
self.doc_url = doc_url
super().__init__(*args)


@dataclass
class ProxyLFSException:
message: str
code: int
doc_url: Optional[str] = None


def handle_request_errors(function):
@wraps(function)
def wrapper():
try:
return function()
except requests.exceptions.HTTPError as errh:
_log.error(errh)
return ProxyLFSException(
code=errh.response.status_code, message=errh.strerror
)
except requests.exceptions.ConnectionError as errc:
_log.error(errc)
return ProxyLFSException(
code=500, message=f"Connection Error: {errc.strerror}"
)
except requests.exceptions.Timeout as errt:
_log.error(errt)
return ProxyLFSException(
code=500, message=f"Connection Timed Out: {errt.strerror}"
)

return wrapper
3 changes: 0 additions & 3 deletions outpostcli/lfs/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
from functools import wraps
from multiprocessing.pool import IMapIterator

from outpostkit.repository.lfs.logger import create_lfs_logger

_log = create_lfs_logger(__name__)

@contextlib.contextmanager
def multimap(cores=None):
Expand Down
20 changes: 16 additions & 4 deletions outpostcli/lfs/part.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from dataclasses import dataclass

import requests
from outpostkit.repository.lfs.logger import create_lfs_logger
from tenacity import retry, stop_after_attempt, wait_exponential

from outpostcli.lfs.exc import ProxyLFSException, handle_request_errors
from outpostcli.lfs.file_slice import FileSlice
from outpostcli.lfs.parallel import map_wrap
from outpostcli.lfs.types import UploadedPartObject
Expand All @@ -17,6 +20,17 @@ class PartInfo:
url: str


@handle_request_errors
@retry(
stop=stop_after_attempt(4), # Maximum number of retries
wait=wait_exponential(multiplier=1, min=1, max=60), # Exponential backoff
)
def retriable_upload_part(url: str, data: FileSlice):
r = requests.put(url, data=data)
r.raise_for_status()
return r


@map_wrap
def transfer_part(part: PartInfo):
with FileSlice(
Expand All @@ -25,9 +39,7 @@ def transfer_part(part: PartInfo):
read_limit=part.chunk_size,
) as data:
try:
r = requests.put(part.url, data=data)
r.raise_for_status()

r = retriable_upload_part(part.url, data)
return UploadedPartObject(
{
"etag": str(r.headers.get("etag")),
Expand All @@ -36,4 +48,4 @@ def transfer_part(part: PartInfo):
)
except Exception as e:
_log.error(e)
raise
return ProxyLFSException(code=500, message=f"Unhandled Error: {e}")
127 changes: 72 additions & 55 deletions outpostcli/lfs/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import io
import os
from contextlib import AbstractContextManager
from json import JSONDecodeError
import re
from contextlib import AbstractContextManager
from typing import (
BinaryIO,
Dict,
List,
Optional,
)

import httpx
import requests
from tenacity import retry, stop_after_attempt, wait_exponential

from outpostcli.lfs.exc import handle_request_errors
from outpostcli.lfs.types import UploadedPartObject


Expand Down Expand Up @@ -60,7 +60,7 @@ class SliceFileObj(AbstractContextManager):
```
"""

def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int):
def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int) -> None:
self.fileobj = fileobj
self.seek_from = seek_from
self.read_limit = read_limit
Expand All @@ -76,7 +76,7 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self.fileobj.seek(self._previous_position, io.SEEK_SET)

def read(self, n: int = -1):
def read(self, n: int = -1) -> bytes:
pos = self.tell()
if pos >= self._len:
return b""
Expand Down Expand Up @@ -124,55 +124,55 @@ def __str__(self) -> str:
return f"status: {self.status_code}, message: {self.code + ' - '+ self.message if self.code else self.message}"


def _raise_for_status(resp: httpx.Response):
if 400 <= resp.status_code < 600:
content_type, _, _ = resp.headers["content-type"].partition(";")
# if content_type != "text/event-stream":
# raise ValueError(
# "Expected response Content-Type to be 'text/event-stream', "
# f"got {content_type!r}"
# )
try:
if content_type == "application/json":
try:
data = resp.json()
if isinstance(data, dict):
raise HTTPException(
status_code=resp.status_code,
message=data.get("message")
or "Request failed without message.",
code=data.get("code"),
) from None
else:
raise HTTPException(
status_code=resp.status_code,
message=(
data
if isinstance(data, str)
else getattr(
data, "message", "Request failed without message."
)
),
) from None
except JSONDecodeError as e:
raise HTTPException(
message="Failed to decode json body.", status_code=500
) from e
elif content_type == "text/plain":
raise HTTPException(
status_code=resp.status_code, message=resp.text
)
elif content_type == "text/html":
raise HTTPException(
status_code=resp.status_code, message=resp.text
)
else:
raise HTTPException(
status_code=resp.status_code,
message=f"Request failed. Unhandled Content Type: {content_type}",
)
except Exception:
raise
# def _raise_for_status(resp: httpx.Response):
# if 400 <= resp.status_code < 600:
# content_type, _, _ = resp.headers["content-type"].partition(";")
# # if content_type != "text/event-stream":
# # raise ValueError(
# # "Expected response Content-Type to be 'text/event-stream', "
# # f"got {content_type!r}"
# # )
# try:
# if content_type == "application/json":
# try:
# data = resp.json()
# if isinstance(data, dict):
# raise HTTPException(
# status_code=resp.status_code,
# message=data.get("message")
# or "Request failed without message.",
# code=data.get("code"),
# ) from None
# else:
# raise HTTPException(
# status_code=resp.status_code,
# message=(
# data
# if isinstance(data, str)
# else getattr(
# data, "message", "Request failed without message."
# )
# ),
# ) from None
# except JSONDecodeError as e:
# raise HTTPException(
# message="Failed to decode json body.", status_code=500
# ) from e
# elif content_type == "text/plain":
# raise HTTPException(
# status_code=resp.status_code, message=resp.text
# )
# elif content_type == "text/html":
# raise HTTPException(
# status_code=resp.status_code, message=resp.text
# )
# else:
# raise HTTPException(
# status_code=resp.status_code,
# message=f"Request failed. Unhandled Content Type: {content_type}",
# )
# except Exception:
# raise


def try_extracting_part_number(s: str):
Expand All @@ -191,3 +191,20 @@ def part_dict_list_to_xml(multi_parts: List[UploadedPartObject]):
s += " </Part>\n"
s += "</CompleteMultipartUpload>"
return s


@handle_request_errors
@retry(
stop=stop_after_attempt(4), # Maximum number of retries
wait=wait_exponential(multiplier=1, min=1, max=60), # Exponential backoff
)
def complete_multipart_upload(url: str, parts: List[UploadedPartObject]):
r = requests.post(url, data=part_dict_list_to_xml(parts))
r.raise_for_status()
return r


def abort_multipart_upload(url: str):
r = requests.delete(url)
r.raise_for_status()
return r
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"click",
"rich",
"requests",
"tenacity"
]
optional-dependencies = { dev = [
"ruff",
Expand Down

0 comments on commit 6f4e2c4

Please sign in to comment.