Skip to content

Commit

Permalink
map and struct type write format (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil authored Oct 8, 2023
1 parent 5410651 commit 155a6a3
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,42 @@ private[spark] object SchemaUtils {
val mapData = row.getMap(ordinal)
val keys = mapData.keyArray()
val values = mapData.valueArray()
val sb = StringBuilder.newBuilder
sb.append("{")
var i = 0
val map = mutable.Map[Any, Any]()
while (i < keys.numElements()) {
map += rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, mt.valueType)
rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, mt.valueType)
sb.append(quoteData(rowColumnValue(keys, i, mt.keyType), mt.keyType))
.append(":").append(quoteData(rowColumnValue(values, i, mt.valueType), mt.valueType))
.append(",")
i += 1
}
map.toMap.asJava
case st: StructType => row.getStruct(ordinal, st.length)
if (i > 0) sb.dropRight(1)
sb.append("}").toString
case st: StructType =>
val structData = row.getStruct(ordinal, st.length)
val sb = StringBuilder.newBuilder
sb.append("{")
var i = 0
while (i < structData.numFields) {
val field = st.get(i)
sb.append(s""""${field.name}":""")
.append(quoteData(rowColumnValue(structData, i, field.dataType), field.dataType))
.append(",")
i += 1
}
if (i > 0) sb.dropRight(1)
sb.append("}").toString
case _ => throw new DorisException(s"Unsupported spark type: ${dataType.typeName}")
}

}

private def quoteData(value: Any, dataType: DataType): Any = {
dataType match {
case StringType | TimestampType | DateType => s""""$value""""
case _ => value
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.doris.spark.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.{Assert, Ignore, Test}

import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._

@Ignore
class SchemaUtilsTest {
Expand All @@ -31,9 +31,16 @@ class SchemaUtilsTest {

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(Seq(
(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3), Map[String, String]("a" -> "1"))
)).toDF("c1", "c2", "c3", "c4", "c5")
val rdd = spark.sparkContext.parallelize(Seq(
Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3),
Map[String, String]("a" -> "1"), Row("a", 1))
))
val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType)
.add("c2", DateType)
.add("c3", TimestampType)
.add("c4", ArrayType.apply(IntegerType))
.add("c5", MapType.apply(StringType, StringType))
.add("c6", StructType.apply(Seq(StructField("a", StringType), StructField("b", IntegerType)))))

val schema = df.schema

Expand All @@ -44,8 +51,8 @@ class SchemaUtilsTest {
Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, fields(1).dataType))
Assert.assertEquals("2023-09-08 17:00:00.0", SchemaUtils.rowColumnValue(row, 2, fields(2).dataType))
Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, fields(3).dataType))
println(SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
Assert.assertEquals(Map("a" -> "1").asJava, SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
Assert.assertEquals("{\"a\":\"a\",\"b\":1}", SchemaUtils.rowColumnValue(row, 5, fields(5).dataType))

})

Expand Down

0 comments on commit 155a6a3

Please sign in to comment.