Skip to content

[SPARK-51739][PYTHON] Validate Arrow schema from mapInArrow & mapInPandas & DataSource #50531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

wengh
Copy link
Contributor

@wengh wengh commented Apr 7, 2025

What changes were proposed in this pull request?

Check the actual Arrow batch schema against the declared schema in MapInBatchEvaluator, throwing error if they don't match.

Also fix Pandas to Arrow conversion in ArrowStreamPandasUDFSerializer to respect nullability of output schema fields.

Why are the changes needed?

To improve error message and reject suspicious usage.

Does this PR introduce any user-facing change?

Yes.

Behaviour change

  1. Some previously suspicious but accepted schema mismatches are now no longer valid.

    This includes:

    • extraneous fields (previously ignored)
    • wrong order of fields of the same type (previously accepted but in wrong order)
    • expected non-nullable field is actually nullable (previously ignored)

    Example:

    from pyspark.sql.datasource import DataSource, DataSourceReader
    from pyspark.sql.pandas.types import to_arrow_schema
    import pyarrow as pa
    
    expected = StructType.fromDDL("a int, b int")
    actual = StructType.fromDDL("b int, a int")  # wrong order of fields
    
    class TestDataSource(DataSource):
        def schema(self):
            return expected
        def reader(self, schema):
            return TestReader()
    
    class TestReader(DataSourceReader):
        def read(self, partition):
            schema = to_arrow_schema(actual)
            yield pa.record_batch([[1], [2]], schema=schema)
    
    spark.dataSource.register(TestDataSource)
    spark.read.format("TestDataSource").load().show()

    Before:

    +---+---+
    |  a|  b|
    +---+---+
    |  1|  2|
    +---+---+
    

    Now:

    org.apache.spark.SparkException: [ARROW_TYPE_MISMATCH] Invalid schema from pandas_udf(): expected StructType(StructField(a,IntegerType,true),StructField(b,IntegerType,true)), got StructType(StructField(b,LongType,true),StructField(a,LongType,true)). SQLSTATE: 42K0G
    
  2. For other schema mismatches, the error changed from internal error to a clearer ARROW_TYPE_MISMATCH error.

    This includes

    • wrong field types
    • less than expected number of fields

    Example:

    from pyspark.sql.pandas.types import to_arrow_schema
    from pyspark.sql.types import StructType, StructField, IntegerType
    import pyarrow as pa
    
    expected = StructType([StructField("a", IntegerType()), StructField("b", IntegerType())])
    actual = StructType([StructField("a", IntegerType())])  # missing a column
    
    def fun(iterator):
        for batch in iterator:
            schema = to_arrow_schema(actual)
            yield pa.record_batch([[1]], schema=schema)
    
    spark.range(2).mapInArrow(fun, expected).show()

    Before:

    java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
        at org.apache.spark.sql.vectorized.ArrowColumnVector.getChild(ArrowColumnVector.java:134)
        at org.apache.spark.sql.execution.python.MapInBatchEvaluatorFactory$MapInBatchEvaluator.$anonfun$eval$3(MapInBatchEvaluatorFactory.scala:82)
        ...
    

    Now:

    org.apache.spark.SparkException: [ARROW_TYPE_MISMATCH] Invalid schema from pandas_udf(): expected StructType(StructField(a,IntegerType,true),StructField(b,IntegerType,true)), got StructType(StructField(a,IntegerType,true)). SQLSTATE: 42K0G
    

How was this patch tested?

End-to-end tests in python/pyspark/sql/tests/arrow/test_arrow_map.py

Was this patch authored or co-authored using generative AI tooling?

No

@wengh wengh changed the title [WIP][PYTHON] Validate type in MapInBatchEvaluatorFactory [WIP][SPARK-51739][PYTHON] Validate Arrow schema from mapInArrow & mapInPandas & DataSource Apr 7, 2025
@wengh wengh changed the title [WIP][SPARK-51739][PYTHON] Validate Arrow schema from mapInArrow & mapInPandas & DataSource [SPARK-51739][PYTHON] Validate Arrow schema from mapInArrow & mapInPandas & DataSource Apr 7, 2025
@wengh wengh marked this pull request as ready for review April 7, 2025 22:02
@@ -75,6 +77,12 @@ class MapInBatchEvaluatorFactory(
val unsafeProj = UnsafeProjection.create(output, output)

columnarBatchIter.flatMap { batch =>
// Ensure the schema matches the expected schema
val actualSchema = batch.column(0).dataType()
if (!outputSchema.sameType(actualSchema)) { // Ignore nullability mismatch for now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Should ArrowEvalPythonExec also ignore nullability, and other similar Exec if any? Or this also should NOT ignore it?

Copy link
Member

@ueshin ueshin Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, but if this doesn't ignore nullability, it could introduce a breaking change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm .. yeah .. maybe let's just ignore it for now ..

Copy link
Contributor Author

@wengh wengh Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not ignoring nullability breaks many of existing tests (e.g. where expected field is not nullable but actual field is nullable but there's no null values, so the actual schema is technically wrong but not causing any problems)

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, LGTM.

@HyukjinKwon
Copy link
Member

test failure seems unrelated but mind triggering again to make sure?

Copy link
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: do we have corresponding validation in the python side?

@wengh
Copy link
Contributor Author

wengh commented Apr 8, 2025

do we have corresponding validation in the python side?

@zhengruifeng
No. DataSource only checks that top level columns have matching names. mapInArrow & mapInPandas don't check at all.

@wengh wengh force-pushed the validate-arrow-type branch from bf9e707 to 12b8fd0 Compare April 9, 2025 00:14
@@ -518,8 +528,7 @@ def _create_struct_array(self, df, arrow_struct_type, spark_type=None):
for i, field in enumerate(arrow_struct_type)
]

struct_names = [field.name for field in arrow_struct_type]
return pa.StructArray.from_arrays(struct_arrs, struct_names)
return pa.StructArray.from_arrays(struct_arrs, fields=list(arrow_struct_type))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correctly handle non nullable fields required by the arrow_struct_type schema

@HyukjinKwon
Copy link
Member

Merged to master.

@allisonwang-db
Copy link
Contributor

Thanks for the fix! But this is a breaking change. Can we document this in the migration guide?

@wengh
Copy link
Contributor Author

wengh commented Apr 25, 2025

Thanks for the fix! But this is a breaking change. Can we document this in the migration guide?

@allisonwang-db #50722

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants