Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7028][CH][Part-5] #7395

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,28 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- compile proto buffer files using copied protoc binary -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<executions>
<execution>
<id>compile-gluten-proto</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<protoSourceRoot>src/main/resources/org/apache/spark/sql/execution/datasources/v1</protoSourceRoot>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}
import java.util.Date
import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

val uuid = UUID.randomUUID.toString

val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
Expand All @@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging {
description.tableSchema.toAttributes
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.plan,
CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -50,22 +49,15 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.updateToHadoopConf(conf)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -76,19 +68,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}
import java.util.Date
import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

val uuid = UUID.randomUUID.toString

val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
Expand All @@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging {
description.tableSchema.toAttributes
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.plan,
CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
Expand Down Expand Up @@ -170,7 +164,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)

val sparkSession = SparkSession.getActiveSession.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -53,22 +52,16 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

// just for the sake of compatibility
val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.updateToHadoopConf(conf)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -79,19 +72,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}
import java.util.Date
import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -97,8 +97,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

val uuid = UUID.randomUUID.toString

val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
Expand All @@ -117,13 +115,9 @@ object OptimizeTableCommandOverwrites extends Logging {
DataTypeUtils.toAttributes(description.tableSchema)
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.plan,
CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
Expand Down Expand Up @@ -172,7 +166,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)

val sparkSession = SparkSession.getActiveSession.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -53,22 +52,15 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.updateToHadoopConf(conf)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -79,19 +71,10 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Loading
Loading