-
sc._jvm.za.co.absa.spline.harvester.SparkLineageInitializer.enableLineageTracking(spark._jsparkSession) not on each notebook- but on a centralized location
%scala
import scala.util.parsing.json.JSON
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.agent.AgentConfig
import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.producer.model.ExecutionPlan
import za.co.absa.spline.producer.model.ExecutionEvent
import za.co.absa.spline.producer.model.ReadOperation
import za.co.absa.spline.producer.model.WriteOperation
import za.co.absa.spline.producer.model.DataOperation
import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.SparkLineageInitializer._
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val workspaceUrl=tagMap("browserHostName")
val workspaceName=dbutils.notebook().getContext().notebookPath.get
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"workspaceName" ->workspaceName,
"workspaceUrl" -> workspaceUrl,
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
class CustomFilter extends PostProcessingFilter {
def this(conf: Configuration) = this()
override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent =
event.withAddedExtra(Map("foo" -> "bar"))
override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ): ExecutionPlan =
plan.withAddedExtra(Map( "notebookInfo" -> notebookInfoJson))
override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ): ReadOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processDataOperation(op: DataOperation, ctx: HarvestingContext ): DataOperation =
op.withAddedExtra(Map("foo" -> "bar"))
}
val myInstance = new CustomFilter()
spark.enableLineageTracking(
AgentConfig.builder()
.postProcessingFilter(myInstance)
.build()
) |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
|
Beta Was this translation helpful? Give feedback.
The short answer is yes, there is a feature called codeless initialization.
The longer answer is it depends on your setup. For example, on Databricks it doesn't work straight out of the box because of the Databricks specific libraries loading approach. For Databricks there are workarounds, but you need to be able to modify Spark driver init script.
See #587 (comment)
and #316 (reply in thread)
You can create an agent extension out of the code above, and install it as an additional library to your Spark cluster.
See https://github.com/AbsaOSS/spline-getting-started/tree/main/spark-agent-extension-example