diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 32c8edca7..53b8a52e1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -133,7 +133,7 @@ repos: name: mypy-storage additional_dependencies: - aiohttp - - gcloud-aio-auth + - gcloud-aio-auth>=5.3.0 - rsa - types-aiofiles - types-requests diff --git a/auth/gcloud/aio/auth/session.py b/auth/gcloud/aio/auth/session.py index 646739dcc..2cb621b07 100644 --- a/auth/gcloud/aio/auth/session.py +++ b/auth/gcloud/aio/auth/session.py @@ -344,7 +344,7 @@ async def get( if not auto_decompress and not stream: warnings.warn( 'the requests library always decompresses responses when ' - 'outside of streaming mode; when audo_decompress is ' + 'outside of streaming mode; when auto_decompress is ' 'False, stream = True must also be set', UserWarning, ) diff --git a/storage/gcloud/aio/storage/blob.py b/storage/gcloud/aio/storage/blob.py index 93510cb2c..6f3ca5c37 100644 --- a/storage/gcloud/aio/storage/blob.py +++ b/storage/gcloud/aio/storage/blob.py @@ -99,12 +99,15 @@ def chunk_size(self) -> int: async def download( self, timeout: int = DEFAULT_TIMEOUT, session: Optional[Session] = None, + auto_decompress: bool = True, ) -> Any: + headers = None if auto_decompress else {'accept-encoding': 'gzip'} return await self.bucket.storage.download( self.bucket.name, self.name, timeout=timeout, session=session, + headers=headers, ) async def upload( diff --git a/storage/gcloud/aio/storage/storage.py b/storage/gcloud/aio/storage/storage.py index b9fbe8fd6..492dae456 100644 --- a/storage/gcloud/aio/storage/storage.py +++ b/storage/gcloud/aio/storage/storage.py @@ -1,5 +1,6 @@ import binascii import enum +import gzip import io import json import logging @@ -415,12 +416,20 @@ async def upload( metadata: Optional[Dict[str, Any]] = None, session: Optional[Session] = None, force_resumable_upload: Optional[bool] = None, + zipped: bool = False, timeout: int = 30, ) -> Dict[str, Any]: url = f'{self._api_root_write}/{bucket}/o' - stream = self._preprocess_data(file_data) + parameters = parameters or {} + if zipped: + parameters['contentEncoding'] = 'gzip' + # Here we load the file-like object data into memory in chunks and + # re-write it compressed. This is implemented like this so we don't + # load the whole file into memory at once. + stream = self._compress_file_in_chunks(input_stream=stream) + if BUILD_GCLOUD_REST and isinstance(stream, io.StringIO): # HACK: `requests` library does not accept `str` as `data` in `put` # HTTP request. @@ -431,8 +440,6 @@ async def upload( # mime detection method same as in aiohttp 3.4.4 content_type = content_type or mimetypes.guess_type(object_name)[0] - parameters = parameters or {} - headers = headers or {} headers.update(await self._headers()) headers.update({ @@ -498,6 +505,34 @@ def _preprocess_data(data: Any) -> IO[Any]: raise TypeError(f'unsupported upload type: "{type(data)}"') + @staticmethod + def _compress_file_in_chunks(input_stream: IO[AnyStr], + chunk_size: int = 8192) -> IO[bytes]: + """ + Reads the contents of input_stream and writes it gzip-compressed to + output_stream in chunks. The chunk size is 8Kb by default, which is a + standard filesystem block size. + """ + compressed_stream = io.BytesIO() + + with gzip.open(compressed_stream, 'wb') as gzipped_file: + chunk_bytes: bytes + while True: + chunk = input_stream.read(chunk_size) + if not chunk: + break + if isinstance(chunk, str): + chunk_bytes = chunk.encode('utf-8') + else: + chunk_bytes = chunk + + gzipped_file.write(chunk_bytes) + + # After finishing writing, reset the buffer position so it can be read + compressed_stream.seek(0) + + return compressed_stream + @staticmethod def _decide_upload_type( force_resumable_upload: Optional[bool], @@ -553,19 +588,35 @@ async def _download( headers = headers or {} headers.update(await self._headers()) + # aiohttp and requests automatically decompress the body if this + # argument is not passed. We assume that if the Accept-Encoding header + # is present, then the client will handle the decompression + auto_decompress = 'accept-encoding' not in {k.lower() for k in headers} + s = AioSession(session) if session else self.session - response = await s.get( - url, headers=headers, params=params or {}, - timeout=timeout, - ) - # N.B. the GCS API sometimes returns 'application/octet-stream' when a - # string was uploaded. To avoid potential weirdness, always return a - # bytes object. - try: - data: bytes = await response.read() - except (AttributeError, TypeError): - data = response.content # type: ignore[assignment] + data: bytes + if not auto_decompress and BUILD_GCLOUD_REST: + # Requests lib has a different way of reading compressed data. We + # must pass the stream=True argument and read the response using + # the 'raw' property. + response = await s.get( + url, headers=headers, params=params or {}, + timeout=timeout, stream=True, + ) + data = response.raw.read() # type: ignore[attr-defined] + else: + response = await s.get( + url, headers=headers, params=params or {}, + timeout=timeout, auto_decompress=auto_decompress, + ) + # N.B. the GCS API sometimes returns 'application/octet-stream' + # when a string was uploaded. To avoid potential weirdness, always + # return a bytes object. + try: + data = await response.read() + except (AttributeError, TypeError): + data = response.content # type: ignore[assignment] return data diff --git a/storage/poetry.lock b/storage/poetry.lock index 2752a090b..e87f4b794 100644 --- a/storage/poetry.lock +++ b/storage/poetry.lock @@ -404,7 +404,7 @@ files = [ [[package]] name = "gcloud-aio-auth" -version = "5.2.2" +version = "5.3.0" description = "Python Client for Google Cloud Auth" optional = false python-versions = ">= 3.8, < 4.0" @@ -810,4 +810,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = ">= 3.8, < 4.0" -content-hash = "e731feff697102e91cc35f6b4d03743ecf99e122a16d1fa6c4e1b2deaf702443" +content-hash = "5e920330cf906327c4ab16795a38767cf0af739de8ff0560f03b1c8d336cec9c" diff --git a/storage/poetry.rest.lock b/storage/poetry.rest.lock index 18dd64b87..faf3d22b7 100644 --- a/storage/poetry.rest.lock +++ b/storage/poetry.rest.lock @@ -277,7 +277,7 @@ test = ["pytest (>=6)"] [[package]] name = "gcloud-rest-auth" -version = "5.2.1" +version = "5.3.0" description = "Python Client for Google Cloud Auth" optional = false python-versions = ">= 3.8, < 4.0" @@ -345,38 +345,38 @@ testing = ["pytest", "pytest-benchmark"] [[package]] name = "pyasn1" -version = "0.5.1" +version = "0.6.0" description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" +python-versions = ">=3.8" files = [ - {file = "pyasn1-0.5.1-py2.py3-none-any.whl", hash = "sha256:4439847c58d40b1d0a573d07e3856e95333f1976294494c325775aeca506eb58"}, - {file = "pyasn1-0.5.1.tar.gz", hash = "sha256:6d391a96e59b23130a5cfa74d6fd7f388dbbe26cc8f1edf39fdddf08d9d6676c"}, + {file = "pyasn1-0.6.0-py2.py3-none-any.whl", hash = "sha256:cca4bb0f2df5504f02f6f8a775b6e416ff9b0b3b16f7ee80b5a3153d9b804473"}, + {file = "pyasn1-0.6.0.tar.gz", hash = "sha256:3a35ab2c4b5ef98e17dfdec8ab074046fbda76e281c5a706ccd82328cfc8f64c"}, ] [[package]] name = "pyasn1-modules" -version = "0.3.0" +version = "0.4.0" description = "A collection of ASN.1-based protocols modules" optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" +python-versions = ">=3.8" files = [ - {file = "pyasn1_modules-0.3.0-py2.py3-none-any.whl", hash = "sha256:d3ccd6ed470d9ffbc716be08bd90efbd44d0734bc9303818f7336070984a162d"}, - {file = "pyasn1_modules-0.3.0.tar.gz", hash = "sha256:5bd01446b736eb9d31512a30d46c1ac3395d676c6f3cafa4c03eb54b9925631c"}, + {file = "pyasn1_modules-0.4.0-py3-none-any.whl", hash = "sha256:be04f15b66c206eed667e0bb5ab27e2b1855ea54a842e5037738099e8ca4ae0b"}, + {file = "pyasn1_modules-0.4.0.tar.gz", hash = "sha256:831dbcea1b177b28c9baddf4c6d1013c24c3accd14a1873fffaa6a2e905f17b6"}, ] [package.dependencies] -pyasn1 = ">=0.4.6,<0.6.0" +pyasn1 = ">=0.4.6,<0.7.0" [[package]] name = "pycparser" -version = "2.21" +version = "2.22" description = "C parser in Python" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.8" files = [ - {file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"}, - {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, + {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, + {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, ] [[package]] @@ -420,17 +420,17 @@ testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygm [[package]] name = "pytest-mock" -version = "3.12.0" +version = "3.14.0" description = "Thin-wrapper around the mock package for easier use with pytest" optional = false python-versions = ">=3.8" files = [ - {file = "pytest-mock-3.12.0.tar.gz", hash = "sha256:31a40f038c22cad32287bb43932054451ff5583ff094bca6f675df2f8bc1a6e9"}, - {file = "pytest_mock-3.12.0-py3-none-any.whl", hash = "sha256:0972719a7263072da3a21c7f4773069bcc7486027d7e8e1f81d98a47e701bc4f"}, + {file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"}, + {file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"}, ] [package.dependencies] -pytest = ">=5.0" +pytest = ">=6.2.5" [package.extras] dev = ["pre-commit", "pytest-asyncio", "tox"] @@ -501,4 +501,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = ">= 3.8, < 4.0" -content-hash = "2216a97eb9dfd593e72cb604ba23caafdd20c7ad74ac2ce1e75b5ebc70e73fe2" +content-hash = "78c6b9c52bc39c96527df352de8d28f2c2af27c073ef9ee6dfcef5adc399dcf9" diff --git a/storage/pyproject.rest.toml b/storage/pyproject.rest.toml index 623f86e5c..cb5f65235 100644 --- a/storage/pyproject.rest.toml +++ b/storage/pyproject.rest.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcloud-rest-storage" -version = "9.2.0" +version = "9.3.0" description = "Python Client for Google Cloud Storage" readme = "README.rst" @@ -22,15 +22,15 @@ classifiers = [ [tool.poetry.dependencies] python = ">= 3.8, < 4.0" # aiofiles = ">= 0.6.0, < 24.0.0" -gcloud-rest-auth = ">= 3.6.0, < 6.0.0" -pyasn1-modules = ">= 0.2.1, < 0.4.0" +gcloud-rest-auth = ">= 5.3.0, < 6.0.0" +pyasn1-modules = ">= 0.2.1, < 0.4.1" rsa = ">= 3.1.4, < 5.0.0" [tool.poetry.group.dev.dependencies] gcloud-rest-auth = { path = "../auth" } pytest = "8.1.1" -# pytest-asyncio = "0.23.5" -pytest-mock = "3.12.0" +# pytest-asyncio = "0.23.6" +pytest-mock = "3.14.0" [[tool.poetry.source]] name = "pypi" diff --git a/storage/pyproject.toml b/storage/pyproject.toml index 06e0ba76b..0b9db8ad4 100644 --- a/storage/pyproject.toml +++ b/storage/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcloud-aio-storage" -version = "9.2.0" +version = "9.3.0" description = "Python Client for Google Cloud Storage" readme = "README.rst" @@ -22,7 +22,7 @@ classifiers = [ [tool.poetry.dependencies] python = ">= 3.8, < 4.0" aiofiles = ">= 0.6.0, < 24.0.0" -gcloud-aio-auth = ">= 3.6.0, < 6.0.0" +gcloud-aio-auth = ">= 5.3.0, < 6.0.0" pyasn1-modules = ">= 0.2.1, < 0.4.1" rsa = ">= 3.1.4, < 5.0.0" diff --git a/storage/tests/integration/smoke_test.py b/storage/tests/integration/smoke_test.py index c02565196..06aa06e0a 100644 --- a/storage/tests/integration/smoke_test.py +++ b/storage/tests/integration/smoke_test.py @@ -1,3 +1,4 @@ +import gzip import json import uuid @@ -40,10 +41,10 @@ async def test_object_life_cycle( bucket = storage.get_bucket(bucket_name) blob = await bucket.get_blob(object_name) constructed_result = await blob.download() - assert constructed_result == expected_data + _assert_expected_data(expected_data, constructed_result) direct_result = await storage.download(bucket_name, object_name) - assert direct_result == expected_data + _assert_expected_data(expected_data, direct_result) await storage.copy( bucket_name, object_name, bucket_name, @@ -51,7 +52,7 @@ async def test_object_life_cycle( ) direct_result = await storage.download(bucket_name, copied_object_name) - assert direct_result == expected_data + _assert_expected_data(expected_data, direct_result) await storage.delete(bucket_name, object_name) await storage.delete(bucket_name, copied_object_name) @@ -61,3 +62,53 @@ async def test_object_life_cycle( with pytest.raises(ResponseError): await storage.download(bucket_name, copied_object_name) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'uploaded_data,expected_data,file_extension', [ + ('test', b'test', 'txt'), + (b'test', b'test', 'bin'), + ( + json.dumps({'data': 1}), json.dumps( + {'data': 1}, + ).encode('utf-8'), 'json', + ), + ], +) +async def test_zipped_upload( + bucket_name, creds, uploaded_data, + expected_data, file_extension, +): + object_name = f'{uuid.uuid4().hex}/{uuid.uuid4().hex}.{file_extension}' + + async with Session() as session: + storage = Storage(service_file=creds, session=session) + await storage.upload(bucket_name, object_name, uploaded_data, + zipped=True) + + bucket = storage.get_bucket(bucket_name) + blob = await bucket.get_blob(object_name) + + # Download file from GCS without the Accept-Encoding: gzip. The + # data will be served uncompressed by GCS + constructed_result = await blob.download() + _assert_expected_data(expected_data, constructed_result) + + # Specify that the file should be downloaded compressed + constructed_result = await blob.download(auto_decompress=False) + _assert_expected_data(expected_data, constructed_result, + compressed=True) + + # Do the same but using the storage directly + direct_result = await storage.download(bucket_name, object_name) + _assert_expected_data(expected_data, direct_result) + + direct_result = await storage.download( + bucket_name, object_name, headers={'Accept-Encoding': 'gzip'}) + _assert_expected_data(expected_data, direct_result, compressed=True) + + +def _assert_expected_data(expected_data, actual_data, compressed=False): + actual_data = gzip.decompress(actual_data) if compressed else actual_data + assert expected_data == actual_data