Skip to content

Commit

Permalink
Merge branch 'main' into fix-memory-leak-avro-slow-generic-coder
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen authored Jan 31, 2025
2 parents d62f69d + 9aeba0d commit ffb8837
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 82 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ val munitVersion = "1.1.0"
val neo4jDriverVersion = "4.4.19"
val ndArrayVersion = "0.3.3"
val parquetExtraVersion = "0.4.3"
val parquetVersion = "1.14.4"
val parquetVersion = "1.15.0"
val pprintVersion = "0.9.0"
val protobufGenericVersion = "0.2.9"
val scalacheckVersion = "1.18.1"
Expand Down Expand Up @@ -487,6 +487,13 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
// Changes in avro SlowGenericRecordCoder
ProblemFilters.exclude[Problem](
"com.spotify.scio.coders.avro.SlowGenericRecordCoder*"
),
// tablerow json fix
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.bigquery.types.package#Json.apply"
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.spotify.scio.bigquery.types.package#Json.parse"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,23 @@ object TypedBigQueryIT {
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary {
import Arbitrary._
import Gen._
Gen
.oneOf(
// json object
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
// json array
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
// json literals
alphaLowerStr.map(str => s""""$str""""),
arbInt.arbitrary.map(_.toString),
arbBool.arbitrary.map(_.toString),
Gen.const("null")
)
.map(wkt => Json(wkt))
}

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
Expand All @@ -108,7 +119,7 @@ object TypedBigQueryIT {
private val tableRowTable = table("records_tablerow")
private val avroTable = table("records_avro")

private val records = Gen.listOfN(100, recordGen).sample.get
private val records = Gen.listOfN(5, recordGen).sample.get
private val options = PipelineOptionsFactory
.fromArgs(
"--project=data-integration-test",
Expand Down Expand Up @@ -145,6 +156,14 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.map { row =>
// TableRow BQ save API uses json
// TO disambiguate from literal json string,
// field MUST be converted to parsed JSON
val jsonLoadRow = new TableRow()
jsonLoadRow.putAll(row.asInstanceOf[java.util.Map[String, _]]) // cast for 2.12
jsonLoadRow.set("json", Json.parse(row.getJson("json")))
}
.saveAsBigQueryTable(
tableRowTable,
schema = Record.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class FutureHandlers {
public interface Base<F, V> {

default Duration getTimeout() {
return Duration.ofMinutes(1);
return Duration.ofMinutes(10);
}

void waitForFutures(Iterable<F> futures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,57 +228,6 @@ final private[client] class TableOps(client: Client) {
def exists(tableSpec: String): Boolean =
exists(bq.BigQueryHelpers.parseTableSpec(tableSpec))

/**
* This is annoying but the GCP BQ client v2 does not accept BQ json rows in the same format as BQ
* load. JSON column are expected as string instead of parsed json
*/
private def normalizeRows(schema: TableSchema)(tableRow: TableRow): TableRow =
normalizeRows(schema.getFields.asScala.toList)(tableRow)

private def normalizeRows(fields: List[TableFieldSchema])(tableRow: TableRow): TableRow = {
import com.spotify.scio.bigquery._

fields.foldLeft(tableRow) { (row, f) =>
f.getType match {
case "JSON" =>
val name = f.getName
f.getMode match {
case "REQUIRED" =>
row.set(name, row.getJson(name).wkt)
case "NULLABLE" =>
row.getJsonOpt(name).fold(row) { json =>
row.set(name, json.wkt)
}
case "REPEATED" =>
row.set(name, row.getJsonList(name).map(_.wkt).asJava)
}
case "RECORD" | "STRUCT" =>
val name = f.getName
val netedFields = f.getFields.asScala.toList
f.getMode match {
case "REQUIRED" =>
row.set(name, normalizeRows(netedFields)(row.getRecord(name)))
case "NULLABLE" =>
row.getRecordOpt(name).fold(row) { nestedRow =>
row.set(name, normalizeRows(netedFields)(nestedRow))
}
case "REPEATED" =>
row.set(
name,
row
.getRecordList(name)
.map { nestedRow =>
normalizeRows(netedFields)(nestedRow)
}
.asJava
)
}
case _ =>
row
}
}
}

/** Write rows to a table. */
def writeRows(
tableReference: TableReference,
Expand Down Expand Up @@ -313,7 +262,7 @@ final private[client] class TableOps(client: Client) {
case WriteDisposition.WRITE_APPEND =>
}

service.insertAll(tableReference, rows.map(normalizeRows(schema)).asJava)
service.insertAll(tableReference, rows.asJava)
}

/** Write rows to a table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ object TableRowOps {
}

def json(value: AnyRef): Json = value match {
case x: Json => x
case x: TableRow => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
case x: Json => x
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
}

def bignumeric(value: AnyRef): BigNumeric = value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
// for TableRow/json, use parsed JSON to prevent escaping
q"_root_.com.spotify.scio.bigquery.types.Json.parse($tree)"
q"$tree.wkt"
case t if t =:= typeOf[BigNumeric] =>
// for TableRow/json, use string to avoid precision loss (like numeric)
q"$tree.wkt.toString"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ package object types {
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

def apply(row: TableRow): Json = Json(mapper.writeValueAsString(row))
def parse(json: Json): TableRow = mapper.readValue(json.wkt, classOf[TableRow])
def parse(json: Json): AnyRef = mapper.readValue(json.wkt, classOf[Object])
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,24 @@ final class ConverterProviderSpec
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
// f is a key field from TableRow. It cannot be used as column name
// see https://github.com/apache/beam/issues/33531
key <- Gen.alphaStr.retryUntil(_ != "f")
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary {
import Arbitrary._
import Gen._
Gen
.oneOf(
// json object
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
// json array
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
// json literals
alphaLowerStr.map(str => s""""$str""""),
arbInt.arbitrary.map(_.toString),
arbBool.arbitrary.map(_.toString),
Gen.const("null")
)
.map(wkt => Json(wkt))
}

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,9 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {

it should "handle required json type" in {
val wkt = """{"name":"Alice","age":30}"""
val parsed = new TableRow()
.set("name", "Alice")
.set("age", 30)

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> parsed)
RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
}

it should "handle required big numeric type" in {
Expand Down

0 comments on commit ffb8837

Please sign in to comment.