diff --git a/google/auth/_helpers.py b/google/auth/_helpers.py index ad2c095f2..f321bc834 100644 --- a/google/auth/_helpers.py +++ b/google/auth/_helpers.py @@ -17,6 +17,7 @@ import base64 import calendar import datetime +from email.message import Message import sys import urllib @@ -63,6 +64,28 @@ def decorator(method): return decorator +def parse_content_type(header_value): + """Parse a 'content-type' header value to get just the plain media-type (without parameters). + + This is done using the class Message from email.message as suggested in PEP 594 + (because the cgi is now deprecated and will be removed in python 3.13, + see https://peps.python.org/pep-0594/#cgi). + + Args: + header_value (str): The value of a 'content-type' header as a string. + + Returns: + str: A string with just the lowercase media-type from the parsed 'content-type' header. + If the provided content-type is not parsable, returns 'text/plain', + the default value for textual files. + """ + m = Message() + m["content-type"] = header_value + return ( + m.get_content_type() + ) # Despite the name, actually returns just the media-type + + def utcnow(): """Returns the current UTC datetime. diff --git a/google/auth/compute_engine/_metadata.py b/google/auth/compute_engine/_metadata.py index 04abe178f..1b2f5161a 100644 --- a/google/auth/compute_engine/_metadata.py +++ b/google/auth/compute_engine/_metadata.py @@ -218,7 +218,10 @@ def get( if response.status == http_client.OK: content = _helpers.from_bytes(response.data) - if response.headers["content-type"] == "application/json": + if ( + _helpers.parse_content_type(response.headers["content-type"]) + == "application/json" + ): try: return json.loads(content) except ValueError as caught_exc: diff --git a/tests/compute_engine/test__metadata.py b/tests/compute_engine/test__metadata.py index a940feb25..f0e432979 100644 --- a/tests/compute_engine/test__metadata.py +++ b/tests/compute_engine/test__metadata.py @@ -176,6 +176,24 @@ def test_get_success_json(): assert result[key] == value +def test_get_success_json_content_type_charset(): + key, value = "foo", "bar" + + data = json.dumps({key: value}) + request = make_request( + data, headers={"content-type": "application/json; charset=UTF-8"} + ) + + result = _metadata.get(request, PATH) + + request.assert_called_once_with( + method="GET", + url=_metadata._METADATA_ROOT + PATH, + headers=_metadata._METADATA_HEADERS, + ) + assert result[key] == value + + def test_get_success_retry(): key, value = "foo", "bar" diff --git a/tests/test__helpers.py b/tests/test__helpers.py index c1f1d812e..c9a3847ac 100644 --- a/tests/test__helpers.py +++ b/tests/test__helpers.py @@ -51,6 +51,32 @@ def func2(): # pragma: NO COVER _helpers.copy_docstring(SourceClass)(func2) +def test_parse_content_type_plain(): + assert _helpers.parse_content_type("text/html") == "text/html" + assert _helpers.parse_content_type("application/xml") == "application/xml" + assert _helpers.parse_content_type("application/json") == "application/json" + + +def test_parse_content_type_with_parameters(): + content_type_html = "text/html; charset=UTF-8" + content_type_xml = "application/xml; charset=UTF-16; version=1.0" + content_type_json = "application/json; charset=UTF-8; indent=2" + assert _helpers.parse_content_type(content_type_html) == "text/html" + assert _helpers.parse_content_type(content_type_xml) == "application/xml" + assert _helpers.parse_content_type(content_type_json) == "application/json" + + +def test_parse_content_type_missing_or_broken(): + content_type_foo = None + content_type_bar = "" + content_type_baz = "1234" + content_type_qux = " ; charset=UTF-8" + assert _helpers.parse_content_type(content_type_foo) == "text/plain" + assert _helpers.parse_content_type(content_type_bar) == "text/plain" + assert _helpers.parse_content_type(content_type_baz) == "text/plain" + assert _helpers.parse_content_type(content_type_qux) == "text/plain" + + def test_utcnow(): assert isinstance(_helpers.utcnow(), datetime.datetime)