Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt JSONUtils.concatenateJsonStrings for concatenating JSON strings #11549

Merged
merged 19 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,11 @@ def test_spark_from_json_invalid_json():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.createDataFrame(data, 'json STRING').select(f.col('json'), f.from_json(f.col('json'), schema)),
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
conf =_enable_all_types_conf)

@allow_non_gpu(*non_utc_allow)
def test_from_json_input_wrapped_in_whitespaces():
json_string_gen = StringGen(r'[ \r\n\t]{0,5}{"key": "[A-z]{0,5}"}[ \r\n\t]{0,5}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', 'struct<key:string>')),
conf=_enable_all_types_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,17 @@ object GpuJsonReadCommon {
}
}

def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = {
def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions =
cudfJsonOptionBuilder(options).build()

def cudfJsonOptionBuilder(options: JSONOptions): ai.rapids.cudf.JSONOptions.Builder = {
// This is really ugly, but options.allowUnquotedControlChars is marked as private
// and this is the only way I know to get it without even uglier tricks
@scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " +
"Java enum Feature is deprecated")
val allowUnquotedControlChars =
options.buildJsonFactory()
.isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS)
val allowUnquotedControlChars = options.buildJsonFactory()
.isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS)

ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(true)
Expand All @@ -338,6 +341,5 @@ object GpuJsonReadCommon {
.withUnquotedControlChars(allowUnquotedControlChars)
.withCudfPruneSchema(true)
.withExperimental(true)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, Scalar}
import ai.rapids.cudf.{Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression, HostAlloc}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.jni.JSONUtils
import org.apache.commons.text.StringEscapeUtils

ttnghia marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.json.JSONOptions
Expand All @@ -34,8 +33,7 @@ import org.apache.spark.sql.types._
*/
class JsonParsingException(s: String, cause: Throwable) extends RuntimeException(s, cause) {}

class JsonDeviceDataSource(combined: ColumnVector) extends DataSource {
lazy val data: BaseDeviceMemoryBuffer = combined.getData
class JsonDeviceDataSource(data: DeviceMemoryBuffer) extends DataSource {
lazy val totalSize: Long = data.getLength
override def size(): Long = totalSize

Expand Down Expand Up @@ -64,11 +62,6 @@ class JsonDeviceDataSource(combined: ColumnVector) extends DataSource {
dest.copyFromDeviceBufferAsync(0, data, offset, length, stream)
length
}

override def close(): Unit = {
combined.close()
super.close()
}
}

case class GpuJsonToStructs(
Expand All @@ -80,92 +73,17 @@ case class GpuJsonToStructs(
with NullIntolerant {
import GpuJsonReadCommon._

private lazy val emptyRowStr = constructEmptyRow(schema)

private def constructEmptyRow(schema: DataType): String = {
schema match {
case struct: StructType if struct.fields.nonEmpty =>
s"""{"${StringEscapeUtils.escapeJson(struct.head.name)}":null}"""
case other =>
throw new IllegalArgumentException(s"$other is not supported as a top level type") }
}

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) = {
val stripped = if (input.getData == null) {
input.incRefCount
} else {
withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
}
}

withResource(stripped) { stripped =>
val isEmpty = withResource(stripped.getByteCount) { lengths =>
withResource(cudf.Scalar.fromInt(0)) { zero =>
lengths.lessOrEqualTo(zero)
}
}
val isNullOrEmptyInput = withResource(isEmpty) { _ =>
withResource(input.isNull) { isNull =>
isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8)
}
}
closeOnExcept(isNullOrEmptyInput) { _ =>
withResource(cudf.Scalar.fromString(emptyRowStr)) { emptyRow =>
// TODO is it worth checking if any are empty or null and then skipping this?
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { nullsReplaced =>
val isLiteralNull = withResource(Scalar.fromString("null")) { literalNull =>
nullsReplaced.equalTo(literalNull)
}
withResource(isLiteralNull) { _ =>
withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned =>
checkForNewline(cleaned, "\n", "line separator")
checkForNewline(cleaned, "\r", "carriage return")

// add a newline to each JSON line
val withNewline = withResource(cudf.Scalar.fromString("\n")) { lineSep =>
withResource(ColumnVector.fromScalar(lineSep, cleaned.getRowCount.toInt)) {
newLineCol =>
ColumnVector.stringConcatenate(Array[ColumnView](cleaned, newLineCol))
}
}

// We technically don't need to join the strings together as we just want the buffer
// which should be the same either way.
(isNullOrEmptyInput, withNewline)
}
}
}
}
}
}
}

private def checkForNewline(cleaned: ColumnVector, newlineStr: String, name: String): Unit = {
withResource(cudf.Scalar.fromString(newlineStr)) { newline =>
withResource(cleaned.stringContains(newline)) { hasNewline =>
withResource(hasNewline.any()) { anyNewline =>
if (anyNewline.isValid && anyNewline.getBoolean) {
throw new IllegalArgumentException(
s"We cannot currently support parsing JSON that contains a $name in it")
}
}
}
}
}

private lazy val parsedOptions = new JSONOptions(
options,
timeZoneId.get,
SQLConf.get.columnNameOfCorruptRecord)

private lazy val jsonOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
private lazy val jsonOptionBuilder =
GpuJsonReadCommon.cudfJsonOptionBuilder(parsedOptions)

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
schema match {
case _: MapType =>
JSONUtils.extractRawMapFromJsonString(input.getBase)
case _: MapType => JSONUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType => {
// if we ever need to support duplicate keys we need to keep track of the duplicates
// and make the first one null, but I don't think this will ever happen in practice
Expand All @@ -175,15 +93,15 @@ case class GpuJsonToStructs(
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: setup a datasource
val table = withResource(new JsonDeviceDataSource(combined)) { ds =>
// Step 4: Have cudf parse the JSON data
// Step 1: Concat the data into a single buffer, with verifying nulls/empty strings
val concatenateInput = JSONUtils.concatenateJsonStrings(input.getBase)
withResource(concatenateInput) { _ =>
// Step 2: setup a datasource from the concatenated JSON strings
val table = withResource(new JsonDeviceDataSource(concatenateInput.data)) { ds =>
// Step 3: Have cudf parse the JSON data
try {
cudf.Table.readJSON(cudfSchema, jsonOptions, ds, numRows)
cudf.Table.readJSON(cudfSchema,
jsonOptionBuilder.withLineDelimiter(concatenateInput.delimiter).build, ds, numRows)
} catch {
case e : RuntimeException =>
throw new JsonParsingException("Currently some Json to Struct cases " +
Expand All @@ -192,22 +110,20 @@ case class GpuJsonToStructs(
}
}

// process duplicated field names in input struct schema

withResource(table) { _ =>
// Step 5: verify that the data looks correct
// Step 4: verify that the data looks correct
if (table.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and we read " +
s"a different number of rows than was expected. Expected $numRows, " +
s"but got ${table.getRowCount}")
}

// Step 7: turn the data into a Struct
// Step 5: turn the data into a Struct
withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// Step 8: put nulls back in for nulls and empty strings
// Step 6: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
isNullOrEmpty.ifElse(nullVal, structData)
concatenateInput.isNullOrEmpty.ifElse(structData, nullVal)
}
}
}
Expand Down
Loading