Skip to content

Commit

Permalink
[GLUTEN-7394][CH]Reduce the times of the calling listFiles when execu…
Browse files Browse the repository at this point in the history
…ting query from the parquet file format (#7417)

* [GLUTEN-7394][CH]Reduce the times of the calling listFiles when executing query from the mergetree file format

Reduce the times of the calling listFiles when executing query from the mergetree file format

Close #7394.
  • Loading branch information
zzcclp authored Oct 8, 2024
1 parent 3ca5f77 commit 3dceeb8
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = {

def validateFilePath: Boolean = {
// Fallback to vanilla spark when the input path
// does not contain the partition info.
if (partTable && !paths.forall(_.contains("="))) {
return false
}
true
}
rootPaths: Seq[String]): ValidationResult = {

// Validate if all types are supported.
def hasComplexType: Boolean = {
Expand All @@ -176,12 +165,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
!unsupportedDataTypes.isEmpty
}
format match {
case ParquetReadFormat =>
if (validateFilePath) {
ValidationResult.succeeded
} else {
ValidationResult.failed("Validate file path failed.")
}
case ParquetReadFormat => ValidationResult.succeeded
case OrcReadFormat => ValidationResult.succeeded
case MergeTreeReadFormat => ValidationResult.succeeded
case TextReadFormat =>
Expand Down Expand Up @@ -343,8 +327,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {

override def transformCheckOverflow: Boolean = false

override def requiredInputFilePaths(): Boolean = true

override def requireBloomFilterAggMightContainJointFallback(): Boolean = false

def maxShuffleReadRows(): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ import org.apache.spark.affinity.CHAffinity
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.CHColumnarShuffleWriter
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.lang.{Long => JLong}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.time.ZoneOffset
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -156,14 +160,41 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
val fileSizes = new JArrayList[JLong]()
val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
val metadataColumns = new JArrayList[JMap[String, String]]
f.files.foreach {
file =>
paths.add(new URI(file.filePath.toString()).toASCIIString)
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
// TODO: Support custom partition location
val metadataColumn =
SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames)
metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
val partitionColumnValue = if (file.partitionValues.isNullAt(i)) {
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
} else {
val pn = file.partitionValues.get(i, partitionSchema.fields(i).dataType)
partitionSchema.fields(i).dataType match {
case _: BinaryType =>
new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8)
case _: DateType =>
DateFormatter.apply().format(pn.asInstanceOf[Integer])
case _: DecimalType =>
pn.asInstanceOf[Decimal].toJavaBigInteger.toString
case _: TimestampType =>
TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(pn.asInstanceOf[java.lang.Long])
case _ => pn.toString
}
}
partitionColumn.put(
ConverterUtils.normalizeColName(partitionSchema.names(i)),
partitionColumnValue)
}
partitionColumns.add(partitionColumn)

val (fileSize, modificationTime) =
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
(fileSize, modificationTime) match {
Expand All @@ -185,7 +216,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
fileSizes,
modificationTimes,
partitionColumns,
new JArrayList[JMap[String, String]](),
metadataColumns,
fileFormat,
preferredLocations.toList.asJava,
mapAsJavaMap(properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = {
rootPaths: Seq[String]): ValidationResult = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
Expand Down
34 changes: 3 additions & 31 deletions cpp-ch/local-engine/Common/GlutenStringUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,20 @@

namespace local_engine
{
PartitionValues GlutenStringUtils::parsePartitionTablePath(const std::string & file)
{
PartitionValues result;
Poco::StringTokenizer path(file, "/");
for (const auto & item : path)
{
auto pos = item.find('=');
if (pos != std::string::npos)
{
auto key = boost::to_lower_copy(item.substr(0, pos));
auto value = item.substr(pos + 1);

std::string unescaped_key;
std::string unescaped_value;
Poco::URI::decode(key, unescaped_key);
Poco::URI::decode(value, unescaped_value);
result.emplace_back(std::move(unescaped_key), std::move(unescaped_value));
}
}
return result;
}

bool GlutenStringUtils::isNullPartitionValue(const std::string & value)
{
return value == "__HIVE_DEFAULT_PARTITION__";
}

std::string GlutenStringUtils::dumpPartitionValue(const PartitionValue & value)
{
return value.first + "=" + value.second;
}

std::string GlutenStringUtils::dumpPartitionValues(const PartitionValues & values)
std::string GlutenStringUtils::dumpPartitionValues(const std::map<std::string, std::string> & values)
{
std::string res;
res += "[";

for (size_t i = 0; i < values.size(); ++i)
for (const auto & [key, value] : values)
{
if (i)
res += ", ";
res += dumpPartitionValue(values[i]);
res += key + "=" + value + ", ";
}

res += "]";
Expand Down
5 changes: 2 additions & 3 deletions cpp-ch/local-engine/Common/GlutenStringUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once
#include <string>
#include <vector>
#include <map>

namespace local_engine
{
Expand All @@ -26,10 +27,8 @@ using PartitionValues = std::vector<PartitionValue>;
class GlutenStringUtils
{
public:
static PartitionValues parsePartitionTablePath(const std::string & file);
static bool isNullPartitionValue(const std::string & value);

static std::string dumpPartitionValue(const PartitionValue & value);
static std::string dumpPartitionValues(const PartitionValues & values);
static std::string dumpPartitionValues(const std::map<std::string, std::string> & values);
};
}
19 changes: 13 additions & 6 deletions cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,19 @@ FormatFile::FormatFile(
const ReadBufferBuilderPtr & read_buffer_builder_)
: context(context_), file_info(file_info_), read_buffer_builder(read_buffer_builder_)
{
PartitionValues part_vals = GlutenStringUtils::parsePartitionTablePath(file_info.uri_file());
for (size_t i = 0; i < part_vals.size(); ++i)
if (file_info.partition_columns_size())
{
const auto & part = part_vals[i];
partition_keys.push_back(part.first);
partition_values[part.first] = part.second;
for (size_t i = 0; i < file_info.partition_columns_size(); ++i)
{
const auto & partition_column = file_info.partition_columns(i);
std::string unescaped_key;
std::string unescaped_value;
Poco::URI::decode(partition_column.key(), unescaped_key);
Poco::URI::decode(partition_column.value(), unescaped_value);
auto key = std::move(unescaped_key);
partition_keys.push_back(key);
partition_values[key] = std::move(unescaped_value);
}
}

LOG_INFO(
Expand All @@ -66,7 +73,7 @@ FormatFile::FormatFile(
file_info.file_format_case(),
std::to_string(file_info.start()) + "-" + std::to_string(file_info.start() + file_info.length()),
file_info.partition_index(),
GlutenStringUtils::dumpPartitionValues(part_vals));
GlutenStringUtils::dumpPartitionValues(partition_values));
}

FormatFilePtr FormatFileUtil::createFile(
Expand Down
31 changes: 0 additions & 31 deletions cpp-ch/local-engine/tests/gtest_utils.cpp

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ case class IcebergScanTransformer(

override def getDataSchema: StructType = new StructType()

override def getInputFilePathsInternal: Seq[String] = Seq.empty

// TODO: get root paths from table.
override def getRootPathsInternal: Seq[String] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ trait BackendSettingsApi {
def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = ValidationResult.succeeded
rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded

def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = ValidationResult.succeeded

def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportNativeRowIndexColumn(): Boolean = false
Expand Down Expand Up @@ -112,8 +112,6 @@ trait BackendSettingsApi {

def staticPartitionWriteOnly(): Boolean = false

def requiredInputFilePaths(): Boolean = false

// TODO: Move this to test settings as used in UT only.
def requireBloomFilterAggMightContainJointFallback(): Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,5 @@ trait BaseDataSource {
/** Returns the partitions generated by this data source scan. */
def getPartitions: Seq[InputPartition]

/** Returns the input file paths, used to validate the partition column path */
def getInputFilePathsInternal: Seq[String]

def getRootPathsInternal: Seq[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
/** This can be used to report FileFormat for a file based scan operator. */
val fileFormat: ReadFileFormat

// TODO: Remove this expensive call when CH support scan custom partition location.
def getInputFilePaths: Seq[String] = {
// This is a heavy operation, and only the required backend executes the corresponding logic.
if (BackendsApiManager.getSettings.requiredInputFilePaths()) {
getInputFilePathsInternal
} else {
Seq.empty
}
}

def getRootFilePaths: Seq[String] = {
if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) {
getRootPathsInternal
Expand Down Expand Up @@ -101,12 +91,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
}

val validationResult = BackendsApiManager.getSettings
.validateScan(
fileFormat,
fields,
getPartitionSchema.nonEmpty,
getRootFilePaths,
getInputFilePaths)
.validateScan(fileFormat, fields, getRootFilePaths)
if (!validationResult.ok()) {
return validationResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ abstract class BatchScanExecTransformerBase(
case _ => new StructType()
}

override def getInputFilePathsInternal: Seq[String] = {
scan match {
case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq
case _ => Seq.empty
}
}

override def getRootPathsInternal: Seq[String] = {
scan match {
case fileScan: FileScan =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ abstract class FileSourceScanExecTransformerBase(

override def getDataSchema: StructType = relation.dataSchema

override def getInputFilePathsInternal: Seq[String] = {
relation.location.inputFiles.toSeq
}

override def getRootPathsInternal: Seq[String] = {
FileIndexUtil.getRootPath(relation.location)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ case class HiveTableScanExecTransformer(

override def getDataSchema: StructType = relation.tableMeta.dataSchema

override def getInputFilePathsInternal: Seq[String] = {
// FIXME how does a hive table expose file paths?
Seq.empty
}

// TODO: get root paths from hive table.
override def getRootPathsInternal: Seq[String] = Seq.empty

Expand Down

0 comments on commit 3dceeb8

Please sign in to comment.