Skip to content

Commit

Permalink
[VL] Explicit Arrow transitions, Part 1: Add LoadArrowDataExec / Offl…
Browse files Browse the repository at this point in the history
…oadArrowDataExec
  • Loading branch information
zhztheplayer committed Sep 25, 2024
1 parent 59e02c1 commit 738ac30
Show file tree
Hide file tree
Showing 22 changed files with 390 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.columnarbatch

import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention, TransitionDef}

import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -34,7 +34,29 @@ object VeloxBatch extends Convention.BatchType {
VeloxColumnarToRowExec(plan)
})

// Velox batch is considered one-way compatible with Arrow batch.
// This is practically achieved by utilizing C++ API VeloxColumnarBatch::from at runtime.
fromBatch(ArrowBatch, TransitionDef.empty)
// TODO: Add explicit transitions between Arrow native batch and Velox batch.
// See https://github.com/apache/incubator-gluten/issues/7313.

fromBatch(
ArrowBatches.ArrowJavaBatch,
() =>
(plan: SparkPlan) => {
OffloadArrowDataExec(plan)
})

toBatch(
ArrowBatches.ArrowJavaBatch,
() =>
(plan: SparkPlan) => {
LoadArrowDataExec(plan)
})

fromBatch(
ArrowBatches.ArrowNativeBatch,
() =>
(plan: SparkPlan) => {
LoadArrowDataExec(plan)
})

toBatch(ArrowBatches.ArrowNativeBatch, TransitionDef.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ object ArrowCSVFileFormat {
batchSize
)
veloxBatch
.map(v => ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), v))
.map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), v))
}

private def toAttribute(field: StructField): AttributeReference =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.collection.JavaConverters._
/**
* An operator to resize input batches by appending the later batches to the one that comes earlier,
* or splitting one batch to smaller ones.
*
* FIXME: Code duplication with ColumnarToColumnarExec.
*/
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.datasource.v2

import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.columnarbatch.ArrowBatches
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention

Expand All @@ -34,7 +34,7 @@ case class ArrowBatchScanExec(original: BatchScanExec)
@transient lazy val batch: Batch = original.batch

override protected def batchType0(): Convention.BatchType = {
ArrowBatch
ArrowBatches.ArrowJavaBatch
}

override lazy val readerFactory: PartitionReaderFactory = original.readerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.spark.api.python

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxBatch}
import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq}
import org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.PullOutProjectHelper
Expand All @@ -44,6 +47,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.immutable.Seq
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -209,9 +213,20 @@ case class ColumnarArrowEvalPythonExec(
child: SparkPlan,
evalType: Int)
extends EvalPythonExec
with GlutenPlan {
with GlutenPlan
with KnownChildrenConventions {
override def supportsColumnar: Boolean = true

override protected def batchType0(): Convention.BatchType = ArrowJavaBatch

// FIXME: Make this accepts ArrowJavaBatch as input. Before doing that, a weight-based
// shortest patch algorithm should be added into transition factory. So that the factory
// can find out row->velox->arrow-native->arrow-java as the possible viable transition.
// Otherwise with current solution, any input (even already in Arrow Java format) will be
// converted into Velox format then into Arrow Java format before entering python runner.
override def requiredChildrenConventions(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any, ConventionReq.BatchType.Is(VeloxBatch)))

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
Expand Down Expand Up @@ -334,17 +349,17 @@ case class ColumnarArrowEvalPythonExec(
val inputBatchIter = contextAwareIterator.map {
inputCb =>
start_time = System.nanoTime()
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
ColumnarBatches.retain(inputCb)
val loaded = ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), inputCb)
ColumnarBatches.retain(loaded)
// 0. cache input for later merge
inputCbCache += inputCb
numInputRows += inputCb.numRows
inputCbCache += loaded
numInputRows += loaded.numRows
// We only need to pass the referred cols data to python worker for evaluation.
var colsForEval = new ArrayBuffer[ColumnVector]()
for (i <- originalOffsets) {
colsForEval += inputCb.column(i)
colsForEval += loaded.column(i)
}
new ColumnarBatch(colsForEval.toArray, inputCb.numRows())
new ColumnarBatch(colsForEval.toArray, loaded.numRows())
}

val outputColumnarBatchIterator =
Expand All @@ -366,11 +381,9 @@ case class ColumnarArrowEvalPythonExec(
numOutputBatches += 1
numOutputRows += numRows
val batch = new ColumnarBatch(joinedVectors, numRows)
val offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
ColumnarBatches.release(outputCb)
ColumnarBatches.checkLoaded(batch)
procTime += (System.nanoTime() - start_time) / 1000000
offloaded
batch
}
Iterators
.wrap(res)
Expand All @@ -390,13 +403,13 @@ case class ColumnarArrowEvalPythonExec(
if (from > to) {
do {
vector.close()
} while (vector.refCnt() == to)
} while (vector.refCnt() != to)
return
}
// from < to
do {
vector.retain()
} while (vector.refCnt() == to)
} while (vector.refCnt() != to)
}

override protected def withNewChildInternal(newChild: SparkPlan): ColumnarArrowEvalPythonExec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.columnarbatch.ArrowBatches
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention

Expand All @@ -42,7 +42,7 @@ case class ArrowFileSourceScanExec(original: FileSourceScanExec)
override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()

override protected def batchType0(): Convention.BatchType = {
ArrowBatch
ArrowBatches.ArrowJavaBatch
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class VeloxColumnarWriteFilesRDD(
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
val loadedCb = ColumnarBatches.load(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
// the operation will find a zero column batch from a task-local pool
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
} else {
val offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
ColumnarBatches.getNativeHandle(offloaded)
ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.getNativeHandle(batch)
}
}
datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object ExecUtil {
val pid = rangePartitioner.get.getPartition(partitionKeyExtractor(row))
pidVec.putInt(i, pid)
}
val pidBatch = ColumnarBatches.ensureOffloaded(
val pidBatch = ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows))
val newHandle = ColumnarBatches.compose(pidBatch, cb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public void testOffloadAndLoad() {
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Expand All @@ -69,7 +69,7 @@ public void testCreateByHandle() {
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
final long handle = ColumnarBatches.getNativeHandle(offloaded);
final ColumnarBatch created = ColumnarBatches.create(handle);
Expand Down Expand Up @@ -110,10 +110,10 @@ public void testOffloadAndLoadReadRow() {
col1.putNull(numRows - 1);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Expand Down
Loading

0 comments on commit 738ac30

Please sign in to comment.