diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 725e90e2adfb..59d912d8e75d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -145,18 +145,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def validateScan( format: ReadFileFormat, fields: Array[StructField], - partTable: Boolean, - rootPaths: Seq[String], - paths: Seq[String]): ValidationResult = { - - def validateFilePath: Boolean = { - // Fallback to vanilla spark when the input path - // does not contain the partition info. - if (partTable && !paths.forall(_.contains("="))) { - return false - } - true - } + rootPaths: Seq[String]): ValidationResult = { // Validate if all types are supported. def hasComplexType: Boolean = { @@ -176,12 +165,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { !unsupportedDataTypes.isEmpty } format match { - case ParquetReadFormat => - if (validateFilePath) { - ValidationResult.succeeded - } else { - ValidationResult.failed("Validate file path failed.") - } + case ParquetReadFormat => ValidationResult.succeeded case OrcReadFormat => ValidationResult.succeeded case MergeTreeReadFormat => ValidationResult.succeeded case TextReadFormat => @@ -343,8 +327,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def transformCheckOverflow: Boolean = false - override def requiredInputFilePaths(): Boolean = true - override def requireBloomFilterAggMightContainJointFallback(): Boolean = false def maxShuffleReadRows(): Long = { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 0a3dbc3f5a37..3c834b7ca847 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -34,16 +34,20 @@ import org.apache.spark.affinity.CHAffinity import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.shuffle.CHColumnarShuffleWriter +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch import java.lang.{Long => JLong} import java.net.URI +import java.nio.charset.StandardCharsets +import java.time.ZoneOffset import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ @@ -156,14 +160,41 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val fileSizes = new JArrayList[JLong]() val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] + val metadataColumns = new JArrayList[JMap[String, String]] f.files.foreach { file => paths.add(new URI(file.filePath.toString()).toASCIIString) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) - // TODO: Support custom partition location + val metadataColumn = + SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames) + metadataColumns.add(metadataColumn) val partitionColumn = new JHashMap[String, String]() + for (i <- 0 until file.partitionValues.numFields) { + val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME + } else { + val pn = file.partitionValues.get(i, partitionSchema.fields(i).dataType) + partitionSchema.fields(i).dataType match { + case _: BinaryType => + new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) + case _: DateType => + DateFormatter.apply().format(pn.asInstanceOf[Integer]) + case _: DecimalType => + pn.asInstanceOf[Decimal].toJavaBigInteger.toString + case _: TimestampType => + TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(pn.asInstanceOf[java.lang.Long]) + case _ => pn.toString + } + } + partitionColumn.put( + ConverterUtils.normalizeColName(partitionSchema.names(i)), + partitionColumnValue) + } partitionColumns.add(partitionColumn) + val (fileSize, modificationTime) = SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file) (fileSize, modificationTime) match { @@ -185,7 +216,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { fileSizes, modificationTimes, partitionColumns, - new JArrayList[JMap[String, String]](), + metadataColumns, fileFormat, preferredLocations.toList.asJava, mapAsJavaMap(properties) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 56dc92f420a3..939fc7f04fd6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -93,9 +93,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def validateScan( format: ReadFileFormat, fields: Array[StructField], - partTable: Boolean, - rootPaths: Seq[String], - paths: Seq[String]): ValidationResult = { + rootPaths: Seq[String]): ValidationResult = { val filteredRootPaths = distinctRootPaths(rootPaths) if ( filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper diff --git a/cpp-ch/local-engine/Common/GlutenStringUtils.cpp b/cpp-ch/local-engine/Common/GlutenStringUtils.cpp index 4a18f4ceda02..858099fff920 100644 --- a/cpp-ch/local-engine/Common/GlutenStringUtils.cpp +++ b/cpp-ch/local-engine/Common/GlutenStringUtils.cpp @@ -22,48 +22,20 @@ namespace local_engine { -PartitionValues GlutenStringUtils::parsePartitionTablePath(const std::string & file) -{ - PartitionValues result; - Poco::StringTokenizer path(file, "/"); - for (const auto & item : path) - { - auto pos = item.find('='); - if (pos != std::string::npos) - { - auto key = boost::to_lower_copy(item.substr(0, pos)); - auto value = item.substr(pos + 1); - - std::string unescaped_key; - std::string unescaped_value; - Poco::URI::decode(key, unescaped_key); - Poco::URI::decode(value, unescaped_value); - result.emplace_back(std::move(unescaped_key), std::move(unescaped_value)); - } - } - return result; -} bool GlutenStringUtils::isNullPartitionValue(const std::string & value) { return value == "__HIVE_DEFAULT_PARTITION__"; } -std::string GlutenStringUtils::dumpPartitionValue(const PartitionValue & value) -{ - return value.first + "=" + value.second; -} - -std::string GlutenStringUtils::dumpPartitionValues(const PartitionValues & values) +std::string GlutenStringUtils::dumpPartitionValues(const std::map & values) { std::string res; res += "["; - for (size_t i = 0; i < values.size(); ++i) + for (const auto & [key, value] : values) { - if (i) - res += ", "; - res += dumpPartitionValue(values[i]); + res += key + "=" + value + ", "; } res += "]"; diff --git a/cpp-ch/local-engine/Common/GlutenStringUtils.h b/cpp-ch/local-engine/Common/GlutenStringUtils.h index dd044135320f..0d980f228f18 100644 --- a/cpp-ch/local-engine/Common/GlutenStringUtils.h +++ b/cpp-ch/local-engine/Common/GlutenStringUtils.h @@ -17,6 +17,7 @@ #pragma once #include #include +#include namespace local_engine { @@ -26,10 +27,8 @@ using PartitionValues = std::vector; class GlutenStringUtils { public: - static PartitionValues parsePartitionTablePath(const std::string & file); static bool isNullPartitionValue(const std::string & value); - static std::string dumpPartitionValue(const PartitionValue & value); - static std::string dumpPartitionValues(const PartitionValues & values); + static std::string dumpPartitionValues(const std::map & values); }; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp index 1097abe6e698..fc5acc533d59 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp @@ -51,12 +51,19 @@ FormatFile::FormatFile( const ReadBufferBuilderPtr & read_buffer_builder_) : context(context_), file_info(file_info_), read_buffer_builder(read_buffer_builder_) { - PartitionValues part_vals = GlutenStringUtils::parsePartitionTablePath(file_info.uri_file()); - for (size_t i = 0; i < part_vals.size(); ++i) + if (file_info.partition_columns_size()) { - const auto & part = part_vals[i]; - partition_keys.push_back(part.first); - partition_values[part.first] = part.second; + for (size_t i = 0; i < file_info.partition_columns_size(); ++i) + { + const auto & partition_column = file_info.partition_columns(i); + std::string unescaped_key; + std::string unescaped_value; + Poco::URI::decode(partition_column.key(), unescaped_key); + Poco::URI::decode(partition_column.value(), unescaped_value); + auto key = std::move(unescaped_key); + partition_keys.push_back(key); + partition_values[key] = std::move(unescaped_value); + } } LOG_INFO( @@ -66,7 +73,7 @@ FormatFile::FormatFile( file_info.file_format_case(), std::to_string(file_info.start()) + "-" + std::to_string(file_info.start() + file_info.length()), file_info.partition_index(), - GlutenStringUtils::dumpPartitionValues(part_vals)); + GlutenStringUtils::dumpPartitionValues(partition_values)); } FormatFilePtr FormatFileUtil::createFile( diff --git a/cpp-ch/local-engine/tests/gtest_utils.cpp b/cpp-ch/local-engine/tests/gtest_utils.cpp deleted file mode 100644 index 4ea713921f6a..000000000000 --- a/cpp-ch/local-engine/tests/gtest_utils.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include - -using namespace local_engine; - -TEST(TestStringUtils, TestExtractPartitionValues) -{ - std::string path = "/tmp/col1=1/col2=test/a.parquet"; - auto values = GlutenStringUtils::parsePartitionTablePath(path); - ASSERT_EQ(2, values.size()); - ASSERT_EQ("col1", values[0].first); - ASSERT_EQ("1", values[0].second); - ASSERT_EQ("col2", values[1].first); - ASSERT_EQ("test", values[1].second); -} diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 60f8a60064c6..1cbeb52a9213 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -53,8 +53,6 @@ case class IcebergScanTransformer( override def getDataSchema: StructType = new StructType() - override def getInputFilePathsInternal: Seq[String] = Seq.empty - // TODO: get root paths from table. override def getRootPathsInternal: Seq[String] = Seq.empty diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 7d07431a87d4..f1f46dd87e17 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -33,14 +33,14 @@ trait BackendSettingsApi { def validateScan( format: ReadFileFormat, fields: Array[StructField], - partTable: Boolean, - rootPaths: Seq[String], - paths: Seq[String]): ValidationResult = ValidationResult.succeeded + rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded + def supportWriteFilesExec( format: FileFormat, fields: Array[StructField], bucketSpec: Option[BucketSpec], options: Map[String, String]): ValidationResult = ValidationResult.succeeded + def supportNativeWrite(fields: Array[StructField]): Boolean = true def supportNativeMetadataColumns(): Boolean = false def supportNativeRowIndexColumn(): Boolean = false @@ -112,8 +112,6 @@ trait BackendSettingsApi { def staticPartitionWriteOnly(): Boolean = false - def requiredInputFilePaths(): Boolean = false - // TODO: Move this to test settings as used in UT only. def requireBloomFilterAggMightContainJointFallback(): Boolean = true diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala index 1a0ff3f84567..e0621a20de9a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala @@ -30,8 +30,5 @@ trait BaseDataSource { /** Returns the partitions generated by this data source scan. */ def getPartitions: Seq[InputPartition] - /** Returns the input file paths, used to validate the partition column path */ - def getInputFilePathsInternal: Seq[String] - def getRootPathsInternal: Seq[String] } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 419c22d6c326..912b93079f4a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -50,16 +50,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource /** This can be used to report FileFormat for a file based scan operator. */ val fileFormat: ReadFileFormat - // TODO: Remove this expensive call when CH support scan custom partition location. - def getInputFilePaths: Seq[String] = { - // This is a heavy operation, and only the required backend executes the corresponding logic. - if (BackendsApiManager.getSettings.requiredInputFilePaths()) { - getInputFilePathsInternal - } else { - Seq.empty - } - } - def getRootFilePaths: Seq[String] = { if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) { getRootPathsInternal @@ -101,12 +91,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource } val validationResult = BackendsApiManager.getSettings - .validateScan( - fileFormat, - fields, - getPartitionSchema.nonEmpty, - getRootFilePaths, - getInputFilePaths) + .validateScan(fileFormat, fields, getRootFilePaths) if (!validationResult.ok()) { return validationResult } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index 553c7c4e0e7a..e1a1be8e29b5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -125,13 +125,6 @@ abstract class BatchScanExecTransformerBase( case _ => new StructType() } - override def getInputFilePathsInternal: Seq[String] = { - scan match { - case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq - case _ => Seq.empty - } - } - override def getRootPathsInternal: Seq[String] = { scan match { case fileScan: FileScan => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index af49cfd1ba02..d64c5ae016c5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -123,10 +123,6 @@ abstract class FileSourceScanExecTransformerBase( override def getDataSchema: StructType = relation.dataSchema - override def getInputFilePathsInternal: Seq[String] = { - relation.location.inputFiles.toSeq - } - override def getRootPathsInternal: Seq[String] = { FileIndexUtil.getRootPath(relation.location) } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index 938bac2b1b2c..85432350d4a2 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -71,11 +71,6 @@ case class HiveTableScanExecTransformer( override def getDataSchema: StructType = relation.tableMeta.dataSchema - override def getInputFilePathsInternal: Seq[String] = { - // FIXME how does a hive table expose file paths? - Seq.empty - } - // TODO: get root paths from hive table. override def getRootPathsInternal: Seq[String] = Seq.empty