Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] GPU Parquet scan filter pushdown fails with timestamp/INT96 column #11622

Open
tgravescs opened this issue Oct 17, 2024 · 1 comment
Open
Labels
? - Needs Triage Need team to review and classify bug Something isn't working

Comments

@tgravescs
Copy link
Collaborator

Describe the bug
We are seeing an issue on Databricks 10.4 Rapids plugin 24.02 where a query is failing when doing predicate pushdown filtering on a column that is a timestamp/INT96. The exception is:

 java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: FilterPredicate column: clientTs's declared type (java.lang.Long) does not match the schema found in file metadata. Column clientTs is of type: INT96
Valid types for this column are: [class org.apache.parquet.io.api.Binary]
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.$anonfun$buildBaseColumnarReaderForCoalescing$3(GpuParquetScan.scala:1252)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.$anonfun$buildBaseColumnarReaderForCoalescing$3$adapted(GpuParquetScan.scala:1252)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.buildBaseColumnarReaderForCoalescing(GpuParquetScan.scala:1252)
	at com.nvidia.spark.rapids.MultiFilePartitionReaderFactoryBase.createColumnarReader(GpuMultiFileReader.scala:281)
	at com.nvidia.spark.rapids.shims.GpuDataSourceRDD.compute(GpuDataSourceRDD.scala:59)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:161)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: FilterPredicate column: clientTs's declared type (java.lang.Long) does not match the schema found in file metadata. Column clientTs is of type: INT96
Valid types for this column are: [class org.apache.parquet.io.api.Binary]
	at org.apache.parquet.filter2.predicate.ValidTypeMap.assertTypeValid(ValidTypeMap.java:125)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:179)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
	at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:120)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
	at org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:306)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
	at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
	at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
	at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:930)
	at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:735)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$9(GpuParquetScan.scala:758)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$1(GpuParquetScan.scala:753)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.filterBlocks(GpuParquetScan.scala:689)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.com$nvidia$spark$rapids$GpuParquetMultiFilePartitionReaderFactory$$filterBlocksForCoalescingReader(GpuParquetScan.scala:1179)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.$anonfun$call$1(GpuParquetScan.scala:1216)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.call(GpuParquetScan.scala:1215)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.call(GpuParquetScan.scala:1204)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more

It happens with

@tgravescs tgravescs added ? - Needs Triage Need team to review and classify bug Something isn't working labels Oct 17, 2024
@tgravescs
Copy link
Collaborator Author

tgravescs commented Oct 17, 2024

I think different files might have different parquet schemas for this column. I'm not sure if this is part of the problem or not, just noticed it

24/10/17 20:10:53 INFO MultiFileCloudParquetPartitionReader: File schema for the next file XXX.snappy.parquet schema ParquetSchemaWrapper(message spark_schema {
  optional int64 clientTs (TIMESTAMP(MICROS,true));
 ...
}
) doesn't match current YYYY .snappy.parquet schema ParquetSchemaWrapper(message spark_schema {
  optional int96 clientTs;
  ...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
? - Needs Triage Need team to review and classify bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant