The Macrometa Collections Databricks Connector allows you to integrate Apache Spark with Macrometa collections, allows you to read and write data from and to Macrometa collections using Apache Spark.
- Databricks Runtime 11.3 LTS(with Apache Spark 3.3.0)
- Scala 2.12 or later
- Macrometa account with access to streams
- Set up your source options:
val sourceOptions = Map(
"regionUrl" -> "<REGION_URL>",
"apiKey" -> "apikey <API_KEY>",
"fabric" -> "<FABRIC>",
"collection" -> "<COLLECTION>",
"batchSize" -> "<BATCH_SIZE>",
"query" -> "<QUERY>"
)
- Create a spark session:
val spark = SparkSession.builder()
.appName("MacrometaCollectionApp")
.master("local[*]")
.getOrCreate()
-
Read from the Macrometa collection:
- Auto infer schema:
val inputDF = spark .read .format("com.macrometa.spark.collection.MacrometaTableProvider") .options(sourceOptions) .load()
- User defined schema:
val userSchema = new StructType().add("value", "string") val inputDF = spark .read .format("com.macrometa.spark.collection.MacrometaTableProvider") .options(sourceOptions) .schema(userSchema) .load()
-
Show the read results(only 20 rows)
inputDF.show()
- Perform transformations on the DataFrame( Assuming that the sourceCollection has 'value' as a property for each document)
val modifiedDF = inputDF
.select("value")
.withColumnRenamed("value", "number")
.withColumn("randomNumber", rand())
- Set up your target options:
val targetOptions = Map(
"regionUrl" -> "<REGION_URL>",
"apiKey" -> "apikey <API_KEY>",
"fabric" -> "<FABRIC>",
"collection" -> "<COLLECTION>",
"batchSize" -> "<BATCH_SIZE>",
"primaryKey" -> "<PRIMARY_KEY>"
)
- Write to the Macrometa collection:
modifiedDF
.write
.format("com.macrometa.spark.collection.MacrometaTableProvider")
.options(targetOptions)
.mode(SaveMode.Append)
.save()
- Close SparkSession:
spark.close()