Skip to content

Commit

Permalink
#552 Fix Hive DDL for tables with long descriptions with '\n' charact…
Browse files Browse the repository at this point in the history
…ers.
  • Loading branch information
yruslan committed Feb 17, 2025
1 parent 89f1bc3 commit 5780608
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object SparkUtils {

val MAX_LENGTH_METADATA_KEY = "maxLength"
val COMMENT_METADATA_KEY = "comment"
val MAX_COMMENT_LENGTH = 254

/** Get Spark StructType from a case class. */
def getStructType[T: TypeTag]: StructType = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
Expand Down Expand Up @@ -223,10 +224,21 @@ object SparkUtils {
*/
def transformSchemaForCatalog(schema: StructType): StructType = {
def transformField(field: StructField): StructField = {
val metadata = if (field.metadata.contains("comment")) {
val comment = field.metadata.getString("comment")
val fixedComment = sanitizeCommentForHiveDDL(comment)
new MetadataBuilder()
.withMetadata(field.metadata)
.putString("comment", fixedComment)
.build()
} else {
field.metadata
}

field.dataType match {
case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, field.metadata)
case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, field.metadata)
case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, field.metadata)
case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, metadata)
case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, metadata)
case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, metadata)
}
}

Expand Down Expand Up @@ -277,6 +289,27 @@ object SparkUtils {
}
}

/**
* Sanitizes a comment for Hive DDL. Ideally this should be done by Spark, but because there are meny versions
* of Hive and other catalogs, it is sometimes hard to have an general solution.
*
* These transformations are tested for Hive 1.0.
*
* @param comment The comment (description) of a column or a table.
* @return
*/
def sanitizeCommentForHiveDDL(comment: String): String = {
val escapedComment = comment
.replace("\n", " ") // This breaks DDL
.replace("\\n", " ") // This breaks DBeaver

if (escapedComment.length > MAX_COMMENT_LENGTH) {
s"${escapedComment.take(MAX_COMMENT_LENGTH - 3)}..."
} else {
escapedComment
}
}

/**
* Removes metadata of nested fields to make DDL compatible with some Hive-like catalogs.
* In addition, removes the nullability flag for all fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
queryExecutor.execute(sqlHiveRepair)
}

private def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = {
private[core] def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = {
val partitionColsLower = partitionBy.map(_.toLowerCase())

val nonPartitionFields = SparkUtils.transformSchemaForCatalog(schema)
Expand All @@ -128,7 +128,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
}
}

private def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = {
private[core] def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = {
if (partitionBy.isEmpty) {
""
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
Expand Down Expand Up @@ -156,6 +157,24 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt
}
}

"getTableDDL()" should {
"work with columns with descriptions with line ending characters (regression)" in {
val schema = StructType(Seq(
StructField("a", StringType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build()),
StructField("b", IntegerType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build())
))

val expected = "`a` STRING COMMENT 'line1\"\\?\"\\nline2\\?'"

val qe = new QueryExecutorMock(tableExists = true)
val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true)

val actual = hiveHelper.getTableDDL(schema, Seq("b"))

compareText(actual, expected)
}
}

private def getParquetPath(tempBaseDir: String, partitionBy: Seq[String] = Nil): String = {
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, "file:///")

Expand Down

0 comments on commit 5780608

Please sign in to comment.