Skip to content

Commit

Permalink
#552 Fix Hive DDL for tables with long descriptions with '\n' charact…
Browse files Browse the repository at this point in the history
…ers.
  • Loading branch information
yruslan committed Feb 17, 2025
1 parent 241c096 commit 434035c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ object SparkUtils {
val MAX_LENGTH_METADATA_KEY = "maxLength"
val COMMENT_METADATA_KEY = "comment"

// This seems to be limitation for multiple catalogs, like Glue and Hive.
val MAX_COMMENT_LENGTH = 255

/** Get Spark StructType from a case class. */
def getStructType[T: TypeTag]: StructType = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

Expand Down Expand Up @@ -223,10 +226,21 @@ object SparkUtils {
*/
def transformSchemaForCatalog(schema: StructType): StructType = {
def transformField(field: StructField): StructField = {
val metadata = if (field.metadata.contains("comment")) {
val comment = field.metadata.getString("comment")
val fixedComment = sanitizeCommentForHiveDDL(comment)
new MetadataBuilder()
.withMetadata(field.metadata)
.putString("comment", fixedComment)
.build()
} else {
field.metadata
}

field.dataType match {
case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, field.metadata)
case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, field.metadata)
case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, field.metadata)
case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, metadata)
case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, metadata)
case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, metadata)
}
}

Expand Down Expand Up @@ -277,6 +291,27 @@ object SparkUtils {
}
}

/**
* Sanitizes a comment for Hive DDL. Ideally this should be done by Spark, but because there are meny versions
* of Hive and other catalogs, it is sometimes hard to have an general solution.
*
* These transformations are tested for Hive 1.0.
*
* @param comment The comment (description) of a column or a table.
* @return
*/
def sanitizeCommentForHiveDDL(comment: String): String = {
val escapedComment = comment
.replace("\n", " ") // This breaks DBeaver (shows no columns)
.replace("\\n", " ") // This breaks DBeaver (shows no columns)

if (escapedComment.length > MAX_COMMENT_LENGTH) {
s"${escapedComment.take(MAX_COMMENT_LENGTH - 3)}..."
} else {
escapedComment
}
}

/**
* Removes metadata of nested fields to make DDL compatible with some Hive-like catalogs.
* In addition, removes the nullability flag for all fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
queryExecutor.execute(sqlHiveRepair)
}

private def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = {
private[core] def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = {
val partitionColsLower = partitionBy.map(_.toLowerCase())

val nonPartitionFields = SparkUtils.transformSchemaForCatalog(schema)
Expand All @@ -128,7 +128,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
}
}

private def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = {
private[core] def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = {
if (partitionBy.isEmpty) {
""
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,25 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
}
}

"sanitizeCommentForHiveDDL" should {
"replace line ending characters" in {
val comment = "line1\nline2\\nline3"

val actual = sanitizeCommentForHiveDDL(comment)

assert(actual == "line1 line2 line3")
}

"truncate long comments" in {
val comment = "a" * 500

val actual = sanitizeCommentForHiveDDL(comment)

assert(actual.length == 255)
assert(actual.endsWith("a..."))
}
}

"getLengthFromMetadata" should {
"return length for long type" in {
val metadata = new MetadataBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
Expand Down Expand Up @@ -156,6 +157,24 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt
}
}

"getTableDDL()" should {
"work with columns with descriptions with line ending characters (regression)" in {
val schema = StructType(Seq(
StructField("a", StringType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build()),
StructField("b", IntegerType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build())
))

val expected = "`a` STRING COMMENT 'line1\\'?\\' line2?'"

val qe = new QueryExecutorMock(tableExists = true)
val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true)

val actual = hiveHelper.getTableDDL(schema, Seq("b"))

compareText(actual, expected)
}
}

private def getParquetPath(tempBaseDir: String, partitionBy: Seq[String] = Nil): String = {
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, "file:///")

Expand Down

0 comments on commit 434035c

Please sign in to comment.