Skip to content

Commit

Permalink
Full Fidelity Change Feed Deletes JSON Change (#43881)
Browse files Browse the repository at this point in the history
* Make id nullable

* Add tests

* Fix test

* dummy commit to rerun pipeline

* revert dummy commit to rerun pipeline
  • Loading branch information
tvaron3 authored Feb 1, 2025
1 parent e745564 commit 507b1e2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[spark] object ChangeFeedTable {
StructField(RawJsonBodyAttributeName, StringType, nullable=true),
StructField(IdAttributeName, StringType, nullable=false),
StructField(TimestampAttributeName, LongType, nullable=false),
StructField(ETagAttributeName, StringType, nullable=false),
StructField(ETagAttributeName, StringType, nullable=true),
StructField(LsnAttributeName, LongType, nullable=false),
StructField(MetadataJsonBodyAttributeName, StringType, nullable=false),
StructField(PreviousRawJsonBodyAttributeName, StringType, nullable=true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,15 @@ private[cosmos] class CosmosRowConverterBase(
case _ => null
}
} else {
null
objectNode.get(MetadataJsonBodyAttributeName) match {
case metadataNode: JsonNode =>
metadataNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
case _ => null
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ package com.azure.cosmos.spark
import com.azure.cosmos.SparkBridgeInternal
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
import com.azure.cosmos.models.PartitionKey
import com.azure.cosmos.models.{ChangeFeedPolicy, CosmosContainerProperties, PartitionKey}
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue}
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.{col, concat, lit}
import org.apache.spark.sql.types._

import java.io.{BufferedReader, InputStreamReader}
import java.nio.file.Paths
import java.time.Duration
import java.util.UUID
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.asScalaBufferConverter

class SparkE2EChangeFeedITest
extends IntegrationSpec
Expand Down Expand Up @@ -522,7 +526,7 @@ class SparkE2EChangeFeedITest
}

// wait for the log store to get these changes
Thread.sleep(2000)
Thread.sleep(1000)

val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
Expand All @@ -543,6 +547,119 @@ class SparkE2EChangeFeedITest
})
}

"spark change feed query streaming (full fidelity)" can "use default schema" in {

val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val cosmosContainerName = s"${UUID.randomUUID().toString}"
val properties: CosmosContainerProperties =
new CosmosContainerProperties(cosmosContainerName, "/pk")
properties.setChangeFeedPolicy(
ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10)))
cosmosClient
.getDatabase(cosmosDatabase)
.createContainer(properties)
.block
val sinkContainerName = cosmosClient
.getDatabase(cosmosDatabase)
.createContainer(s"sink-${UUID.randomUUID().toString}", "/pk")
.block
.getProperties
.getId

val readCfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainerName,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.mode" -> "FullFidelity",
"spark.cosmos.changeFeed.startFrom" -> "NOW",
)

val writeCfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> sinkContainerName,
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "true"
)

val changeFeedDF = spark
.readStream
.format("cosmos.oltp.changeFeed")
.options(readCfg)
.load()

val modifiedChangeFeedDF = changeFeedDF.withColumn("_rawBody", concat(lit("{\"id\":\""), col("id"), lit("\"}")))

val microBatchQuery = modifiedChangeFeedDF
.writeStream
.format("cosmos.oltp")
.options(writeCfg)
.option("checkpointLocation", "/tmp/" + UUID.randomUUID().toString)
.outputMode("append")
.start()

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainerName)

val createdObjectIds = new mutable.HashMap[String, String]()
val replacedObjectIds = new mutable.HashMap[String, String]()
val deletedObjectIds = new mutable.HashMap[String, String]()
// Perform operations for change feed to capture
for (sequenceNumber <- 1 to 20) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
val pk = UUID.randomUUID().toString
objectNode.put("pk", pk)
objectNode.put("age", 20)
objectNode.put("sequenceNumber", sequenceNumber)
val id = UUID.randomUUID().toString
objectNode.put("id", id)
createdObjectIds.put(id, pk)
if (sequenceNumber % 2 == 0) {
replacedObjectIds.put(id, pk)
}
if (sequenceNumber % 3 == 0) {
deletedObjectIds.put(id, pk)
}
container.createItem(objectNode).block()
}

for (id <- replacedObjectIds.keys) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "dog")
objectNode.put("age", 25)
objectNode.put("id", id)
objectNode.put("pk", replacedObjectIds(id))
container.replaceItem(objectNode, id, new PartitionKey(replacedObjectIds(id))).block()
}

for (id <- deletedObjectIds.keys) {
container.deleteItem(id, new PartitionKey(deletedObjectIds(id))).block()
}
// wait for the log store to get these changes
Thread.sleep(1000)
changeFeedDF.schema.equals(
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
microBatchQuery.processAllAvailable()
microBatchQuery.stop()

val sinkContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sinkContainerName)
val feedResponses = sinkContainer.queryItems("SELECT * FROM c", classOf[ObjectNode]).byPage().collectList().block()

var numResults = 0
for (feedResponse <- feedResponses.asScala) {
numResults += feedResponse.getResults.size()
}

// Basic validation that something was written from bulk
numResults should be > 0
}

"spark change feed query (incremental)" can "proceed with simulated Spark2 Checkpoint" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down

0 comments on commit 507b1e2

Please sign in to comment.