Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
zouyunhe committed Sep 19, 2024
1 parent bb1def6 commit 4c202a6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec}
import org.apache.spark.sql.execution.window.{WindowExec, WindowGroupLimitExecShim}
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveTableScanNestedColumnPruning}

/**
* Converts a vanilla Spark plan node into Gluten plan node. Gluten plan is supposed to be executed
Expand Down Expand Up @@ -275,7 +275,11 @@ object OffloadOthers {
case plan: ProjectExec =>
val columnarChild = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ProjectExecTransformer(plan.projectList, columnarChild)
if (HiveTableScanNestedColumnPruning.supportNestedColumnPruning(plan)) {
HiveTableScanNestedColumnPruning.apply(plan)
} else {
ProjectExecTransformer(plan.projectList, columnarChild)
}
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
HashAggregateExecBaseTransformer.from(plan)
Expand Down Expand Up @@ -415,6 +419,7 @@ object OffloadOthers {
ScanTransformerFactory.createBatchScanTransformer(plan)
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
// TODO: Add DynamicPartitionPruningHiveScanSuite.scala
logInfo("Hive table scan exec transformer....")
val hiveTableScanExecTransformer =
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer(plan)
val validateResult = hiveTableScanExecTransformer.doValidate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,21 @@ object HiveTableScanExecTransformer {
projectAttrs: Seq[Attribute] = Seq.empty): HiveTableScanExecTransformer = {
plan match {
case hiveTableScanTrans: HiveTableScanExecTransformer =>
hiveTableScanTrans
case hiveTableScan: HiveTableScanExec =>
var projectOutputAttrs = hiveTableScan.requestedAttributes
var projectOutputAttrs = hiveTableScanTrans.requestedAttributes
if (projectAttrs.nonEmpty) {
projectOutputAttrs = projectAttrs
}
new HiveTableScanExecTransformer(
hiveTableScanTrans.requestedAttributes,
hiveTableScanTrans.relation,
hiveTableScanTrans.partitionPruningPred,
projectOutputAttrs)(hiveTableScanTrans.session)
case hiveTableScan: HiveTableScanExec =>
new HiveTableScanExecTransformer(
hiveTableScan.requestedAttributes,
hiveTableScan.relation,
hiveTableScan.partitionPruningPred,
projectOutputAttrs)(hiveTableScan.session)
Seq.empty[Attribute])(hiveTableScan.session)
case _ =>
throw new UnsupportedOperationException(
s"Can't transform HiveTableScanExecTransformer from ${plan.getClass.getSimpleName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, GetArrayItem, GetMapValue, GetStructField, NamedExpression}
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.hive.HiveTableScanExecTransformer.TEXT_INPUT_FORMAT_CLASS
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand All @@ -32,7 +31,7 @@ object HiveTableScanNestedColumnPruning extends Logging {
def supportNestedColumnPruning(projectExec: ProjectExec): Boolean = {
if (BackendsApiManager.getSparkPlanExecApiInstance.supportHiveTableScanNestedColumnPruning()) {
projectExec.child match {
case HiveTableScanExec(_, relation, _) =>
case HiveTableScanExecTransformer(_, relation, _, _) =>
// Only support for hive json format. ORC, Parquet is already supported by `FileSourceScanExec` and hive text format will fallback to valina to execute for nested field right now.
relation.tableMeta.storage.inputFormat match {
case Some(inputFormat)
Expand Down Expand Up @@ -175,7 +174,7 @@ object HiveTableScanNestedColumnPruning extends Logging {
val rootAttr = convertExpression(child, outputs, getRoot = true)
val newChild = convertExpression(child, Seq.apply(rootAttr.asInstanceOf[Attribute]))
val newAlias = alias.withNewChildren(Seq.apply(newChild)).asInstanceOf[Alias]
logInfo("The new generated project expression:" + newAlias.toJSON)
logDebug("The new generated project expression:" + newAlias.toJSON)
newProjectList :+= newAlias
case _ =>
newProjectList :+= expr
Expand Down

0 comments on commit 4c202a6

Please sign in to comment.