diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala index 2f66f1f1cb73..474386806151 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala @@ -38,7 +38,7 @@ private[spark] object ChangeFeedTable { StructField(RawJsonBodyAttributeName, StringType, nullable=true), StructField(IdAttributeName, StringType, nullable=false), StructField(TimestampAttributeName, LongType, nullable=false), - StructField(ETagAttributeName, StringType, nullable=false), + StructField(ETagAttributeName, StringType, nullable=true), StructField(LsnAttributeName, LongType, nullable=false), StructField(MetadataJsonBodyAttributeName, StringType, nullable=false), StructField(PreviousRawJsonBodyAttributeName, StringType, nullable=true), diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverterBase.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverterBase.scala index 3bf04f0a51fb..c8fb0ca9af58 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverterBase.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverterBase.scala @@ -712,7 +712,15 @@ private[cosmos] class CosmosRowConverterBase( case _ => null } } else { - null + objectNode.get(MetadataJsonBodyAttributeName) match { + case metadataNode: JsonNode => + metadataNode.get(IdAttributeName) match { + case valueNode: JsonNode => + Option(valueNode).fold(null: String)(v => v.asText(null)) + case _ => null + } + case _ => null + } } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala index f6144f78f0aa..214db1c2f8af 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala @@ -5,18 +5,22 @@ package com.azure.cosmos.spark import com.azure.cosmos.SparkBridgeInternal import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState import com.azure.cosmos.implementation.{TestConfigurations, Utils} -import com.azure.cosmos.models.PartitionKey +import com.azure.cosmos.models.{ChangeFeedPolicy, CosmosContainerProperties, PartitionKey} import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue} +import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions +import org.apache.spark.sql.functions.{col, concat, lit} import org.apache.spark.sql.types._ import java.io.{BufferedReader, InputStreamReader} import java.nio.file.Paths +import java.time.Duration import java.util.UUID import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters.asScalaBufferConverter class SparkE2EChangeFeedITest extends IntegrationSpec @@ -522,7 +526,7 @@ class SparkE2EChangeFeedITest } // wait for the log store to get these changes - Thread.sleep(2000) + Thread.sleep(1000) val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load() val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName) @@ -543,6 +547,119 @@ class SparkE2EChangeFeedITest }) } + "spark change feed query streaming (full fidelity)" can "use default schema" in { + + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val cosmosContainerName = s"${UUID.randomUUID().toString}" + val properties: CosmosContainerProperties = + new CosmosContainerProperties(cosmosContainerName, "/pk") + properties.setChangeFeedPolicy( + ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10))) + cosmosClient + .getDatabase(cosmosDatabase) + .createContainer(properties) + .block + val sinkContainerName = cosmosClient + .getDatabase(cosmosDatabase) + .createContainer(s"sink-${UUID.randomUUID().toString}", "/pk") + .block + .getProperties + .getId + + val readCfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainerName, + "spark.cosmos.read.inferSchema.enabled" -> "false", + "spark.cosmos.changeFeed.mode" -> "FullFidelity", + "spark.cosmos.changeFeed.startFrom" -> "NOW", + ) + + val writeCfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> sinkContainerName, + "spark.cosmos.write.strategy" -> "ItemOverwrite", + "spark.cosmos.write.bulk.enabled" -> "true" + ) + + val changeFeedDF = spark + .readStream + .format("cosmos.oltp.changeFeed") + .options(readCfg) + .load() + + val modifiedChangeFeedDF = changeFeedDF.withColumn("_rawBody", concat(lit("{\"id\":\""), col("id"), lit("\"}"))) + + val microBatchQuery = modifiedChangeFeedDF + .writeStream + .format("cosmos.oltp") + .options(writeCfg) + .option("checkpointLocation", "/tmp/" + UUID.randomUUID().toString) + .outputMode("append") + .start() + + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainerName) + + val createdObjectIds = new mutable.HashMap[String, String]() + val replacedObjectIds = new mutable.HashMap[String, String]() + val deletedObjectIds = new mutable.HashMap[String, String]() + // Perform operations for change feed to capture + for (sequenceNumber <- 1 to 20) { + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + objectNode.put("name", "Shrodigner's cat") + objectNode.put("type", "cat") + val pk = UUID.randomUUID().toString + objectNode.put("pk", pk) + objectNode.put("age", 20) + objectNode.put("sequenceNumber", sequenceNumber) + val id = UUID.randomUUID().toString + objectNode.put("id", id) + createdObjectIds.put(id, pk) + if (sequenceNumber % 2 == 0) { + replacedObjectIds.put(id, pk) + } + if (sequenceNumber % 3 == 0) { + deletedObjectIds.put(id, pk) + } + container.createItem(objectNode).block() + } + + for (id <- replacedObjectIds.keys) { + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + objectNode.put("name", "Shrodigner's cat") + objectNode.put("type", "dog") + objectNode.put("age", 25) + objectNode.put("id", id) + objectNode.put("pk", replacedObjectIds(id)) + container.replaceItem(objectNode, id, new PartitionKey(replacedObjectIds(id))).block() + } + + for (id <- deletedObjectIds.keys) { + container.deleteItem(id, new PartitionKey(deletedObjectIds(id))).block() + } + // wait for the log store to get these changes + Thread.sleep(1000) + changeFeedDF.schema.equals( + ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true + microBatchQuery.processAllAvailable() + microBatchQuery.stop() + + val sinkContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sinkContainerName) + val feedResponses = sinkContainer.queryItems("SELECT * FROM c", classOf[ObjectNode]).byPage().collectList().block() + + var numResults = 0 + for (feedResponse <- feedResponses.asScala) { + numResults += feedResponse.getResults.size() + } + + // Basic validation that something was written from bulk + numResults should be > 0 + } + "spark change feed query (incremental)" can "proceed with simulated Spark2 Checkpoint" in { val cosmosEndpoint = TestConfigurations.HOST val cosmosMasterKey = TestConfigurations.MASTER_KEY