Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: to_gbq uses default_type for ambiguous array types and struct field types #838

Merged
merged 20 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5c5a04b
fix: `to_gbq` uses `default_type` for ambiguous array types and struc…
tswast Dec 12, 2024
30c2d0c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Dec 12, 2024
fa6907b
fix arrow list(null) case too
tswast Dec 12, 2024
d9b0a10
Merge remote-tracking branch 'origin/issue836-to_gbq-with-schema' int…
tswast Dec 12, 2024
b5ccce2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Dec 12, 2024
77d8e9f
lint
tswast Dec 12, 2024
c289eb4
Merge remote-tracking branch 'origin/main' into issue836-to_gbq-with-…
tswast Dec 12, 2024
58a0e54
Merge remote-tracking branch 'origin/issue836-to_gbq-with-schema' int…
tswast Dec 12, 2024
ae17ea4
Update pandas_gbq/schema/pandas_to_bigquery.py
tswast Dec 16, 2024
45f1df2
Update pandas_gbq/schema/pandas_to_bigquery.py
tswast Dec 16, 2024
286fefd
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Dec 16, 2024
619ec27
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Dec 16, 2024
e1d486c
Merge branch 'issue836-to_gbq-with-schema' of https://github.com/goog…
gcf-owl-bot[bot] Dec 16, 2024
d79e20d
Merge remote-tracking branch 'origin/main' into issue836-to_gbq-with-…
tswast Dec 18, 2024
1e48053
remove redundant string check
tswast Dec 19, 2024
854e9c4
Apply suggestions from code review
tswast Dec 19, 2024
c20449b
Merge remote-tracking branch 'origin/issue836-to_gbq-with-schema' int…
tswast Dec 19, 2024
82ac6db
add docstrings and a few more test cases
tswast Dec 19, 2024
a2dc91e
use python 3.10 for docs github action
tswast Dec 19, 2024
7e23e74
exclude docs from owlbot
tswast Dec 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.9"
python-version: "3.10"
- name: Install nox
run: |
python -m pip install --upgrade setuptools pip wheel
Expand Down
1 change: 1 addition & 0 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"noxfile.py",
"README.rst",
# exclude this file as we have an alternate prerelease.cfg
".github/workflows/docs.yml",
".kokoro/presubmit/prerelease-deps.cfg",
".kokoro/presubmit/presubmit.cfg",
],
Expand Down
111 changes: 92 additions & 19 deletions pandas_gbq/schema/pandas_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import collections.abc
import datetime
from typing import Optional, Tuple
from typing import Any, Optional, Tuple
import warnings

import db_dtypes
Expand All @@ -28,14 +28,21 @@
# `docs/source/writing.rst`.
_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"boolean": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
"datetime64[us, UTC]": "TIMESTAMP",
"datetime64[ns]": "DATETIME",
"datetime64[us]": "DATETIME",
"float32": "FLOAT",
"float64": "FLOAT",
"int8": "INTEGER",
"int16": "INTEGER",
"int32": "INTEGER",
"int64": "INTEGER",
"Int8": "INTEGER",
"Int16": "INTEGER",
"Int32": "INTEGER",
"Int64": "INTEGER",
"uint8": "INTEGER",
"uint16": "INTEGER",
"uint32": "INTEGER",
Expand Down Expand Up @@ -103,7 +110,7 @@ def dataframe_to_bigquery_fields(

# Try to automatically determine the type based on a few rows of the data.
values = dataframe.reset_index()[column]
bq_field = values_to_bigquery_field(column, values)
bq_field = values_to_bigquery_field(column, values, default_type=default_type)

if bq_field:
bq_schema_out.append(bq_field)
Expand All @@ -114,7 +121,9 @@ def dataframe_to_bigquery_fields(
arrow_value = pyarrow.array(values)
bq_field = (
pandas_gbq.schema.pyarrow_to_bigquery.arrow_type_to_bigquery_field(
column, arrow_value.type
column,
arrow_value.type,
default_type=default_type,
)
)

Expand Down Expand Up @@ -151,6 +160,19 @@ def dataframe_to_bigquery_fields(


def dtype_to_bigquery_field(name, dtype) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from a pandas dtype.

Args:
name (str):
Name of the column/field.
dtype:
A pandas / numpy dtype object.

Returns:
Optional[schema.SchemaField]:
The schema field, or None if a type cannot be inferred, such as if
it is ambiguous like the object dtype.
"""
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)

if bq_type is not None:
Expand All @@ -164,9 +186,44 @@ def dtype_to_bigquery_field(name, dtype) -> Optional[schema.SchemaField]:
return None


def value_to_bigquery_field(name, value) -> Optional[schema.SchemaField]:
if isinstance(value, str):
return schema.SchemaField(name, "STRING")
def value_to_bigquery_field(
name: str, value: Any, default_type: Optional[str] = None
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from a single value.

Args:
name:
The name of the field.
value:
The value to infer the type from. If None, the default type is used
if available.
default_type:
The default field type. Defaults to None.

Returns:
The schema field, or None if a type cannot be inferred.
"""

# Set the SchemaField datatype to the given default_type if the value
tswast marked this conversation as resolved.
Show resolved Hide resolved
# being assessed is None.
if value is None:
return schema.SchemaField(name, default_type)

# Map from Python types to BigQuery types. This isn't super exhaustive
# because we rely more on pyarrow, which can check more than one value to
# determine the type.
type_mapping = {
str: "STRING",
}

# geopandas and shapely are optional dependencies, so only check if those
# are installed.
if _BaseGeometry is not None:
type_mapping[_BaseGeometry] = "GEOGRAPHY"

for type_, bq_type in type_mapping.items():
if isinstance(value, type_):
return schema.SchemaField(name, bq_type)

# For timezone-naive datetimes, the later pyarrow conversion to try and
# learn the type add a timezone to such datetimes, causing them to be
Expand All @@ -182,35 +239,51 @@ def value_to_bigquery_field(name, value) -> Optional[schema.SchemaField]:
else:
return schema.SchemaField(name, "DATETIME")

if _BaseGeometry is not None and isinstance(value, _BaseGeometry):
return schema.SchemaField(name, "GEOGRAPHY")

return None


def values_to_bigquery_field(name, values) -> Optional[schema.SchemaField]:
def values_to_bigquery_field(
name: str, values: Any, default_type: str = "STRING"
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from a list of values.

This function iterates through the given values to determine the
corresponding schema field type.

Args:
name:
The name of the field.
values:
An iterable of values to infer the type from. If all the values
are None or the iterable is empty, the function returns None.
default_type:
The default field type to use if a specific type cannot be
determined from the values. Defaults to "STRING".

Returns:
The schema field, or None if a type cannot be inferred.
"""
value = pandas_gbq.core.pandas.first_valid(values)

# All NULL, type not determinable.
# All values came back as NULL, thus type not determinable by this method.
tswast marked this conversation as resolved.
Show resolved Hide resolved
# Return None so we can try other methods.
if value is None:
return None

field = value_to_bigquery_field(name, value)
if field is not None:
field = value_to_bigquery_field(name, value, default_type=default_type)
if field:
return field

if isinstance(value, str):
return schema.SchemaField(name, "STRING")

# Check plain ARRAY values here. Let STRUCT get determined by pyarrow,
# which can examine more values to determine all keys.
# Check plain ARRAY values here. Exclude mapping types to let STRUCT get
# determined by pyarrow, which can examine more values to determine all
# keys.
if isinstance(value, collections.abc.Iterable) and not isinstance(
value, collections.abc.Mapping
):
# It could be that this value contains all None or is empty, so get the
# first non-None value we can find.
valid_item = pandas_gbq.core.pandas.first_array_valid(values)
field = value_to_bigquery_field(name, valid_item)
field = value_to_bigquery_field(name, valid_item, default_type=default_type)

if field is not None:
return schema.SchemaField(name, field.field_type, mode="REPEATED")
Expand Down
61 changes: 56 additions & 5 deletions pandas_gbq/schema/pyarrow_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,31 @@
}


def arrow_type_to_bigquery_field(name, type_) -> Optional[schema.SchemaField]:
def arrow_type_to_bigquery_field(
tswast marked this conversation as resolved.
Show resolved Hide resolved
name, type_, default_type="STRING"
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from an arrow type.

Args:
name (str):
Name of the column/field.
type_:
A pyarrow type object.

Returns:
Optional[schema.SchemaField]:
The schema field, or None if a type cannot be inferred, such as if
it is a type that doesn't have a clear mapping in BigQuery.

null() are assumed to be the ``default_type``, since there are no
values that contradict that.
"""
# If a sub-field is the null type, then assume it's the default type, as
# that's the best we can do.
# https://github.com/googleapis/python-bigquery-pandas/issues/836
if pyarrow.types.is_null(type_):
return schema.SchemaField(name, default_type)

# Since both TIMESTAMP/DATETIME use pyarrow.timestamp(...), we need to use
# a special case to disambiguate them. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/450
Expand All @@ -52,22 +76,49 @@ def arrow_type_to_bigquery_field(name, type_) -> Optional[schema.SchemaField]:
return schema.SchemaField(name, detected_type)

if pyarrow.types.is_list(type_):
return arrow_list_type_to_bigquery(name, type_)
return arrow_list_type_to_bigquery(name, type_, default_type=default_type)

if pyarrow.types.is_struct(type_):
inner_fields: list[pyarrow.Field] = []
struct_type = cast(pyarrow.StructType, type_)
for field_index in range(struct_type.num_fields):
field = struct_type[field_index]
inner_fields.append(arrow_type_to_bigquery_field(field.name, field.type))
inner_fields.append(
arrow_type_to_bigquery_field(
field.name, field.type, default_type=default_type
)
)

return schema.SchemaField(name, "RECORD", fields=inner_fields)

return None


def arrow_list_type_to_bigquery(name, type_) -> Optional[schema.SchemaField]:
inner_field = arrow_type_to_bigquery_field(name, type_.value_type)
def arrow_list_type_to_bigquery(
name, type_, default_type="STRING"
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from an arrow list type.

Args:
name (str):
Name of the column/field.
type_:
A pyarrow type object.

Returns:
Optional[schema.SchemaField]:
The schema field, or None if a type cannot be inferred, such as if
it is a type that doesn't have a clear mapping in BigQuery.

null() are assumed to be the ``default_type``, since there are no
values that contradict that.
"""
inner_field = arrow_type_to_bigquery_field(
name, type_.value_type, default_type=default_type
)

# If this is None, it means we got some type that we can't cleanly map to
# a BigQuery type, so bubble that status up.
if inner_field is None:
return None

Expand Down
49 changes: 40 additions & 9 deletions tests/unit/schema/test_pandas_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,34 @@ def module_under_test():
def test_dataframe_to_bigquery_fields_w_named_index(module_under_test):
df_data = collections.OrderedDict(
[
("str_index", ["a", "b"]),
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("nullable_int_column", pandas.Series([42, None], dtype="Int64")),
("uint_column", pandas.Series([7, 13], dtype="uint8")),
("bool_column", [True, False]),
("boolean_column", pandas.Series([True, None], dtype="boolean")),
(
"datetime_column",
[
datetime.datetime(1999, 12, 31, 23, 59, 59, 999999),
datetime.datetime(2000, 1, 1, 0, 0, 0),
],
),
(
"timestamp_column",
[
datetime.datetime(
1999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc
),
datetime.datetime(
2000, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
],
),
]
)
index = pandas.Index(["a", "b"], name="str_index")
dataframe = pandas.DataFrame(df_data, index=index)
dataframe = pandas.DataFrame(df_data).set_index("str_index", drop=True)

returned_schema = module_under_test.dataframe_to_bigquery_fields(
dataframe, [], index=True
Expand All @@ -37,27 +58,37 @@ def test_dataframe_to_bigquery_fields_w_named_index(module_under_test):
schema.SchemaField("str_index", "STRING", "NULLABLE"),
schema.SchemaField("str_column", "STRING", "NULLABLE"),
schema.SchemaField("int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("nullable_int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("uint_column", "INTEGER", "NULLABLE"),
schema.SchemaField("bool_column", "BOOLEAN", "NULLABLE"),
schema.SchemaField("boolean_column", "BOOLEAN", "NULLABLE"),
schema.SchemaField("datetime_column", "DATETIME", "NULLABLE"),
schema.SchemaField("timestamp_column", "TIMESTAMP", "NULLABLE"),
)
assert returned_schema == expected_schema


def test_dataframe_to_bigquery_fields_w_multiindex(module_under_test):
df_data = collections.OrderedDict(
[
("str_index", ["a", "a"]),
("int_index", [0, 0]),
(
"dt_index",
[
datetime.datetime(1999, 12, 31, 23, 59, 59, 999999),
datetime.datetime(2000, 1, 1, 0, 0, 0),
],
),
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
)
index = pandas.MultiIndex.from_tuples(
[
("a", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)),
("a", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)),
],
names=["str_index", "int_index", "dt_index"],
dataframe = pandas.DataFrame(df_data).set_index(
["str_index", "int_index", "dt_index"],
drop=True,
)
dataframe = pandas.DataFrame(df_data, index=index)

returned_schema = module_under_test.dataframe_to_bigquery_fields(
dataframe, [], index=True
Expand Down
18 changes: 8 additions & 10 deletions tests/unit/schema/test_pyarrow_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ def test_arrow_type_to_bigquery_field_scalar_types(pyarrow_type, bigquery_type):


def test_arrow_type_to_bigquery_field_unknown():
assert (
pyarrow_to_bigquery.arrow_type_to_bigquery_field("test_name", pyarrow.null())
is None
)
assert pyarrow_to_bigquery.arrow_type_to_bigquery_field(
"test_name", pyarrow.null(), default_type="DEFAULT_TYPE"
) == bigquery.SchemaField("test_name", "DEFAULT_TYPE")


def test_arrow_type_to_bigquery_field_list_of_unknown():
assert (
pyarrow_to_bigquery.arrow_type_to_bigquery_field(
"test_name", pyarrow.list_(pyarrow.null())
)
is None
)
assert pyarrow_to_bigquery.arrow_type_to_bigquery_field(
"test_name",
pyarrow.list_(pyarrow.null()),
default_type="DEFAULT_TYPE",
) == bigquery.SchemaField("test_name", "DEFAULT_TYPE", mode="REPEATED")
Loading
Loading