Skip to content

Latest commit

 

History

History
91 lines (81 loc) · 2.29 KB

GETTING_STARTED_WITH_COLLECTION_DATA_CONNECTOR.md

File metadata and controls

91 lines (81 loc) · 2.29 KB

Macrometa Collections Databricks Connector

The Macrometa Collections Databricks Connector allows you to integrate Apache Spark with Macrometa collections, allows you to read and write data from and to Macrometa collections using Apache Spark.

Requirements

  • Databricks Runtime 11.3 LTS(with Apache Spark 3.3.0)
  • Scala 2.12 or later
  • Macrometa account with access to streams

Usage

Reading from a Macrometa Collection

  1. Set up your source options:
val sourceOptions = Map(
  "regionUrl" -> "<REGION_URL>",
  "apiKey" -> "apikey <API_KEY>",
  "fabric" -> "<FABRIC>",
  "collection" -> "<COLLECTION>",
  "batchSize" -> "<BATCH_SIZE>",
  "query" -> "<QUERY>"
)
  1. Create a spark session:
val spark = SparkSession.builder()
  .appName("MacrometaCollectionApp")
  .master("local[*]")
  .getOrCreate()
  1. Read from the Macrometa collection:

    1. Auto infer schema:
     val inputDF = spark
         .read
         .format("com.macrometa.spark.collection.MacrometaTableProvider")
         .options(sourceOptions)
         .load()
    1. User defined schema:
    val userSchema = new StructType().add("value", "string")
    val inputDF = spark
        .read
        .format("com.macrometa.spark.collection.MacrometaTableProvider")
        .options(sourceOptions)
        .schema(userSchema)
        .load()
  2. Show the read results(only 20 rows)

   inputDF.show()
  1. Perform transformations on the DataFrame( Assuming that the sourceCollection has 'value' as a property for each document)
val modifiedDF = inputDF
  .select("value")
  .withColumnRenamed("value", "number")
  .withColumn("randomNumber", rand())

Writing to a Macrometa collection

  1. Set up your target options:
val targetOptions = Map(
  "regionUrl" -> "<REGION_URL>",
  "apiKey" -> "apikey <API_KEY>",
  "fabric" -> "<FABRIC>",
  "collection" -> "<COLLECTION>",
  "batchSize" -> "<BATCH_SIZE>",
  "primaryKey" -> "<PRIMARY_KEY>"
)
  1. Write to the Macrometa collection:
modifiedDF
  .write
  .format("com.macrometa.spark.collection.MacrometaTableProvider")
  .options(targetOptions)
  .mode(SaveMode.Append)
  .save()
  1. Close SparkSession:
spark.close()