From f6117ed5dd75877d8c6d1d6be62dc20724384488 Mon Sep 17 00:00:00 2001 From: Ajeya Bhat Date: Mon, 20 May 2024 14:56:34 +0530 Subject: [PATCH 1/3] added proxies, retries, error handling --- outpostcli/lfs/commands.py | 35 +++++++++++++++------------ outpostcli/lfs/exc.py | 49 ++++++++++++++++++++++++++++++++++++++ outpostcli/lfs/parallel.py | 3 --- outpostcli/lfs/part.py | 22 +++++++++++++---- outpostcli/lfs/utils.py | 14 +++++++++++ pyproject.toml | 1 + 6 files changed, 102 insertions(+), 22 deletions(-) create mode 100644 outpostcli/lfs/exc.py diff --git a/outpostcli/lfs/commands.py b/outpostcli/lfs/commands.py index dbc64c4..e3e085f 100644 --- a/outpostcli/lfs/commands.py +++ b/outpostcli/lfs/commands.py @@ -11,6 +11,7 @@ from outpostkit.repository.lfs.logger import create_lfs_logger from outpostcli.constants import CLI_BINARY_NAME +from outpostcli.lfs.exc import LFSException, ProxyLFSException from outpostcli.lfs.parallel import multimap from outpostcli.lfs.part import PartInfo, transfer_part from outpostcli.lfs.utils import part_dict_list_to_xml @@ -162,21 +163,25 @@ def on_progress(oid: str, uploaded_bytes: int): for (i, part) in enumerate(presigned_urls) ), ): - bytes_so_far += chunk_size - write_msg( - { - "event": "progress", - "oid": oid, - "bytesSoFar": bytes_so_far, - "bytesSinceLast": chunk_size, - } - ) - parts.append(resp) - pass - # for i, presigned_url in enumerate(presigned_urls): - - # Not precise but that's ok. - _log.info(parts) + if isinstance(resp, ProxyLFSException): + raise LFSException( + code=resp.code, message=resp.message, doc_url=resp.doc_url + ) + else: + bytes_so_far += chunk_size + # Not precise but that's ok. + write_msg( + { + "event": "progress", + "oid": oid, + "bytesSoFar": bytes_so_far, + "bytesSinceLast": chunk_size, + } + ) + parts.append(resp) + pass + + # _log.info({"parts": parts}) r = requests.post( completion_url, data=part_dict_list_to_xml(parts), diff --git a/outpostcli/lfs/exc.py b/outpostcli/lfs/exc.py new file mode 100644 index 0000000..5d9bae0 --- /dev/null +++ b/outpostcli/lfs/exc.py @@ -0,0 +1,49 @@ +from dataclasses import dataclass +from functools import wraps +from typing import Optional + +import requests +from outpostkit.repository.lfs.logger import create_lfs_logger + +_log = create_lfs_logger(__name__) + + +class LFSException(Exception): + def __init__( + self, code: int, message: str, doc_url: Optional[str] = None, *args: object + ) -> None: + self.message = message + self.code = code + self.doc_url = doc_url + super().__init__(*args) + + +@dataclass +class ProxyLFSException: + message: str + code: int + doc_url: Optional[str] = None + + +def handle_request_errors(function): + @wraps(function) + def wrapper(): + try: + return function() + except requests.exceptions.HTTPError as errh: + _log.error(errh) + return ProxyLFSException( + code=errh.response.status_code, message=errh.strerror + ) + except requests.exceptions.ConnectionError as errc: + _log.error(errc) + return ProxyLFSException( + code=500, message=f"Connection Error: {errc.strerror}" + ) + except requests.exceptions.Timeout as errt: + _log.error(errt) + return ProxyLFSException( + code=500, message=f"Connection Timed Out: {errt.strerror}" + ) + + return wrapper diff --git a/outpostcli/lfs/parallel.py b/outpostcli/lfs/parallel.py index 9375c2d..43431c9 100644 --- a/outpostcli/lfs/parallel.py +++ b/outpostcli/lfs/parallel.py @@ -3,9 +3,6 @@ from functools import wraps from multiprocessing.pool import IMapIterator -from outpostkit.repository.lfs.logger import create_lfs_logger - -_log = create_lfs_logger(__name__) @contextlib.contextmanager def multimap(cores=None): diff --git a/outpostcli/lfs/part.py b/outpostcli/lfs/part.py index fd46160..a91cdbc 100644 --- a/outpostcli/lfs/part.py +++ b/outpostcli/lfs/part.py @@ -1,7 +1,12 @@ from dataclasses import dataclass + import requests from outpostkit.repository.lfs.logger import create_lfs_logger +from requests.adapters import HTTPAdapter +from tenacity import retry, stop_after_attempt, wait_exponential +from urllib3 import Retry +from outpostcli.lfs.exc import ProxyLFSException, handle_request_errors from outpostcli.lfs.file_slice import FileSlice from outpostcli.lfs.parallel import map_wrap from outpostcli.lfs.types import UploadedPartObject @@ -17,6 +22,17 @@ class PartInfo: url: str +@handle_request_errors +@retry( + stop=stop_after_attempt(4), # Maximum number of retries + wait=wait_exponential(multiplier=1, min=1, max=60), # Exponential backoff +) +def retyable_upload_part(url: str, data: FileSlice): + r = requests.put(url, data=data) + r.raise_for_status() + return r + + @map_wrap def transfer_part(part: PartInfo): with FileSlice( @@ -25,9 +41,7 @@ def transfer_part(part: PartInfo): read_limit=part.chunk_size, ) as data: try: - r = requests.put(part.url, data=data) - r.raise_for_status() - + r = retyable_upload_part(part.url, data) return UploadedPartObject( { "etag": str(r.headers.get("etag")), @@ -36,4 +50,4 @@ def transfer_part(part: PartInfo): ) except Exception as e: _log.error(e) - raise + return ProxyLFSException(code=500, message=f"Unhandled Error: {e}") diff --git a/outpostcli/lfs/utils.py b/outpostcli/lfs/utils.py index cf1a298..09b7186 100644 --- a/outpostcli/lfs/utils.py +++ b/outpostcli/lfs/utils.py @@ -11,7 +11,10 @@ ) import httpx +import requests +from tenacity import retry, stop_after_attempt, wait_exponential +from outpostcli.lfs.exc import handle_request_errors from outpostcli.lfs.types import UploadedPartObject @@ -191,3 +194,14 @@ def part_dict_list_to_xml(multi_parts: List[UploadedPartObject]): s += " \n" s += "" return s + + +@handle_request_errors +@retry( + stop=stop_after_attempt(4), # Maximum number of retries + wait=wait_exponential(multiplier=1, min=1, max=60), # Exponential backoff +) +def complete_multipart_upload(url: str, data: List[UploadedPartObject]): + r = requests.post(url, data=part_dict_list_to_xml(data)) + r.raise_for_status() + return r diff --git a/pyproject.toml b/pyproject.toml index 6dc2946..0d4c3aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "click", "rich", "requests", + "tenacity" ] optional-dependencies = { dev = [ "ruff", From 3e234017b9195e712b96d0a5b83152739a968023 Mon Sep 17 00:00:00 2001 From: Ajeya Bhat Date: Mon, 20 May 2024 16:13:29 +0530 Subject: [PATCH 2/3] continue --- outpostcli/lfs/commands.py | 8 +-- outpostcli/lfs/utils.py | 111 ++++++++++++++++++------------------- 2 files changed, 56 insertions(+), 63 deletions(-) diff --git a/outpostcli/lfs/commands.py b/outpostcli/lfs/commands.py index e3e085f..cca3160 100644 --- a/outpostcli/lfs/commands.py +++ b/outpostcli/lfs/commands.py @@ -14,7 +14,7 @@ from outpostcli.lfs.exc import LFSException, ProxyLFSException from outpostcli.lfs.parallel import multimap from outpostcli.lfs.part import PartInfo, transfer_part -from outpostcli.lfs.utils import part_dict_list_to_xml +from outpostcli.lfs.utils import complete_multipart_upload, part_dict_list_to_xml from outpostcli.utils import click_group @@ -182,11 +182,7 @@ def on_progress(oid: str, uploaded_bytes: int): pass # _log.info({"parts": parts}) - r = requests.post( - completion_url, - data=part_dict_list_to_xml(parts), - ) - r.raise_for_status() + complete_multipart_upload(completion_url, parts) write_msg({"event": "complete", "oid": oid}) except requests.HTTPError as e: _log.error(e, exc_info=True) diff --git a/outpostcli/lfs/utils.py b/outpostcli/lfs/utils.py index 09b7186..3066cbd 100644 --- a/outpostcli/lfs/utils.py +++ b/outpostcli/lfs/utils.py @@ -1,16 +1,13 @@ import io import os -from contextlib import AbstractContextManager -from json import JSONDecodeError import re +from contextlib import AbstractContextManager from typing import ( BinaryIO, - Dict, List, Optional, ) -import httpx import requests from tenacity import retry, stop_after_attempt, wait_exponential @@ -63,7 +60,7 @@ class SliceFileObj(AbstractContextManager): ``` """ - def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int): + def __init__(self, fileobj: BinaryIO, seek_from: int, read_limit: int) -> None: self.fileobj = fileobj self.seek_from = seek_from self.read_limit = read_limit @@ -79,7 +76,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.fileobj.seek(self._previous_position, io.SEEK_SET) - def read(self, n: int = -1): + def read(self, n: int = -1) -> bytes: pos = self.tell() if pos >= self._len: return b"" @@ -127,55 +124,55 @@ def __str__(self) -> str: return f"status: {self.status_code}, message: {self.code + ' - '+ self.message if self.code else self.message}" -def _raise_for_status(resp: httpx.Response): - if 400 <= resp.status_code < 600: - content_type, _, _ = resp.headers["content-type"].partition(";") - # if content_type != "text/event-stream": - # raise ValueError( - # "Expected response Content-Type to be 'text/event-stream', " - # f"got {content_type!r}" - # ) - try: - if content_type == "application/json": - try: - data = resp.json() - if isinstance(data, dict): - raise HTTPException( - status_code=resp.status_code, - message=data.get("message") - or "Request failed without message.", - code=data.get("code"), - ) from None - else: - raise HTTPException( - status_code=resp.status_code, - message=( - data - if isinstance(data, str) - else getattr( - data, "message", "Request failed without message." - ) - ), - ) from None - except JSONDecodeError as e: - raise HTTPException( - message="Failed to decode json body.", status_code=500 - ) from e - elif content_type == "text/plain": - raise HTTPException( - status_code=resp.status_code, message=resp.text - ) - elif content_type == "text/html": - raise HTTPException( - status_code=resp.status_code, message=resp.text - ) - else: - raise HTTPException( - status_code=resp.status_code, - message=f"Request failed. Unhandled Content Type: {content_type}", - ) - except Exception: - raise +# def _raise_for_status(resp: httpx.Response): +# if 400 <= resp.status_code < 600: +# content_type, _, _ = resp.headers["content-type"].partition(";") +# # if content_type != "text/event-stream": +# # raise ValueError( +# # "Expected response Content-Type to be 'text/event-stream', " +# # f"got {content_type!r}" +# # ) +# try: +# if content_type == "application/json": +# try: +# data = resp.json() +# if isinstance(data, dict): +# raise HTTPException( +# status_code=resp.status_code, +# message=data.get("message") +# or "Request failed without message.", +# code=data.get("code"), +# ) from None +# else: +# raise HTTPException( +# status_code=resp.status_code, +# message=( +# data +# if isinstance(data, str) +# else getattr( +# data, "message", "Request failed without message." +# ) +# ), +# ) from None +# except JSONDecodeError as e: +# raise HTTPException( +# message="Failed to decode json body.", status_code=500 +# ) from e +# elif content_type == "text/plain": +# raise HTTPException( +# status_code=resp.status_code, message=resp.text +# ) +# elif content_type == "text/html": +# raise HTTPException( +# status_code=resp.status_code, message=resp.text +# ) +# else: +# raise HTTPException( +# status_code=resp.status_code, +# message=f"Request failed. Unhandled Content Type: {content_type}", +# ) +# except Exception: +# raise def try_extracting_part_number(s: str): @@ -201,7 +198,7 @@ def part_dict_list_to_xml(multi_parts: List[UploadedPartObject]): stop=stop_after_attempt(4), # Maximum number of retries wait=wait_exponential(multiplier=1, min=1, max=60), # Exponential backoff ) -def complete_multipart_upload(url: str, data: List[UploadedPartObject]): - r = requests.post(url, data=part_dict_list_to_xml(data)) +def complete_multipart_upload(url: str, parts: List[UploadedPartObject]): + r = requests.post(url, data=part_dict_list_to_xml(parts)) r.raise_for_status() return r From 6b096d42909f5a60dd0550dbe9247b0ce32a57de Mon Sep 17 00:00:00 2001 From: Ajeya Bhat Date: Mon, 20 May 2024 17:40:49 +0530 Subject: [PATCH 3/3] handle abort --- outpostcli/lfs/commands.py | 90 +++++++++++++++++++++++--------------- outpostcli/lfs/part.py | 6 +-- outpostcli/lfs/utils.py | 6 +++ 3 files changed, 63 insertions(+), 39 deletions(-) diff --git a/outpostcli/lfs/commands.py b/outpostcli/lfs/commands.py index cca3160..905e3c8 100644 --- a/outpostcli/lfs/commands.py +++ b/outpostcli/lfs/commands.py @@ -14,7 +14,10 @@ from outpostcli.lfs.exc import LFSException, ProxyLFSException from outpostcli.lfs.parallel import multimap from outpostcli.lfs.part import PartInfo, transfer_part -from outpostcli.lfs.utils import complete_multipart_upload, part_dict_list_to_xml +from outpostcli.lfs.utils import ( + abort_multipart_upload, + complete_multipart_upload, +) from outpostcli.utils import click_group @@ -143,6 +146,7 @@ def on_progress(oid: str, uploaded_bytes: int): completion_url = msg["action"]["href"] header: Dict[str, str] = msg["action"]["header"] chunk_size = int(header.pop("chunk_size")) + abort_url = str(header.pop("abort_url")) presigned_urls: List[str] = list(header.values()) # if i can extract part number from url, no need for this. @@ -155,43 +159,59 @@ def on_progress(oid: str, uploaded_bytes: int): cores = cpu_count() _log.info({"cores": cores}) bytes_so_far = 0 - with multimap(cores) as pmap: - for resp in pmap( - transfer_part, - ( - PartInfo(filepath, i + 1, chunk_size, part) - for (i, part) in enumerate(presigned_urls) - ), - ): - if isinstance(resp, ProxyLFSException): - raise LFSException( - code=resp.code, message=resp.message, doc_url=resp.doc_url - ) - else: - bytes_so_far += chunk_size - # Not precise but that's ok. - write_msg( - { - "event": "progress", - "oid": oid, - "bytesSoFar": bytes_so_far, - "bytesSinceLast": chunk_size, - } - ) - parts.append(resp) - pass - - # _log.info({"parts": parts}) - complete_multipart_upload(completion_url, parts) - write_msg({"event": "complete", "oid": oid}) - except requests.HTTPError as e: - _log.error(e, exc_info=True) - write_msg( - {"error": {"code": e.response.status_code, "message": e.response.text}} - ) + try: + with multimap(cores) as pmap: + for resp in pmap( + transfer_part, + ( + PartInfo(filepath, i + 1, chunk_size, part) + for (i, part) in enumerate(presigned_urls) + ), + ): + if isinstance(resp, ProxyLFSException): + raise LFSException( + code=resp.code, + message=resp.message, + doc_url=resp.doc_url, + ) + else: + bytes_so_far += chunk_size + # Not precise but that's ok. + write_msg( + { + "event": "progress", + "oid": oid, + "bytesSoFar": bytes_so_far, + "bytesSinceLast": chunk_size, + } + ) + parts.append(resp) + pass + complete_multipart_upload(completion_url, parts) + write_msg({"event": "complete", "oid": oid}) + except LFSException as e: + abort_multipart_upload(abort_url) + write_msg({"error": {"code": e.code, "message": e.message}}) + except requests.HTTPError as e: + abort_multipart_upload(abort_url) + _log.error(e, exc_info=True) + write_msg( + { + "error": { + "code": e.response.status_code, + "message": e.response.text, + } + } + ) + except Exception as e: + abort_multipart_upload(abort_url) + _log.error(e, exc_info=True) + raise except Exception as e: _log.error(e, exc_info=True) raise + # _log.info({"parts": parts}) + if __name__ == "__main__": lfs() diff --git a/outpostcli/lfs/part.py b/outpostcli/lfs/part.py index a91cdbc..afe9461 100644 --- a/outpostcli/lfs/part.py +++ b/outpostcli/lfs/part.py @@ -2,9 +2,7 @@ import requests from outpostkit.repository.lfs.logger import create_lfs_logger -from requests.adapters import HTTPAdapter from tenacity import retry, stop_after_attempt, wait_exponential -from urllib3 import Retry from outpostcli.lfs.exc import ProxyLFSException, handle_request_errors from outpostcli.lfs.file_slice import FileSlice @@ -27,7 +25,7 @@ class PartInfo: stop=stop_after_attempt(4), # Maximum number of retries wait=wait_exponential(multiplier=1, min=1, max=60), # Exponential backoff ) -def retyable_upload_part(url: str, data: FileSlice): +def retriable_upload_part(url: str, data: FileSlice): r = requests.put(url, data=data) r.raise_for_status() return r @@ -41,7 +39,7 @@ def transfer_part(part: PartInfo): read_limit=part.chunk_size, ) as data: try: - r = retyable_upload_part(part.url, data) + r = retriable_upload_part(part.url, data) return UploadedPartObject( { "etag": str(r.headers.get("etag")), diff --git a/outpostcli/lfs/utils.py b/outpostcli/lfs/utils.py index 3066cbd..94f6939 100644 --- a/outpostcli/lfs/utils.py +++ b/outpostcli/lfs/utils.py @@ -202,3 +202,9 @@ def complete_multipart_upload(url: str, parts: List[UploadedPartObject]): r = requests.post(url, data=part_dict_list_to_xml(parts)) r.raise_for_status() return r + + +def abort_multipart_upload(url: str): + r = requests.delete(url) + r.raise_for_status() + return r