-
Hello everybody. Thanks for maintaining this amazing project! :D In our team we use some custom properties to track user/app stuff. Something like:
I think I saw an example that used a Postprocessing Filter to add this information. But now I see that most of what I need to do could be done either with a Dispatcher or overriding a simple method:
Source: #35 (comment) I also noticed declarative configuration of properties will be added in a next release. What's the preferred way? (Is anybody using Postprocessing filters for that?) |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 6 replies
-
LineageDispatcherThe goal of the dispatcher is to send the lineage data to the server. Currently we support kafka dispatcher, http dispatcher and some others for debuging and testing like logging dispatcher and console dispatcher. Typically you would implement your own dispatcher when you need another method of sending data like RabbitMq or store the data to S3 for some reason. PostProcesingFilterThe goal of the filter is to change the lineage data before it is sent to the dispatcher. This might mean filtering out some sensitive data or adding additional information. Filters have access to the the original Spark UserExtraMetaDataProvider
Declarative way for adding user extra metadataThis is planned, but it may take some time before it's ready. Currently, a custom import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.harvester.ExtraMetadataImplicits._
class MyExtraAppendingPostProcessingFilter(conf: Configuration) extends PostProcessingFilter {
override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent =
event.withAddedExtra(Map("foo" -> "bar"))
override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ): ExecutionPlan =
plan.withAddedExtra(Map("foo" -> "bar"))
override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ): ReadOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processDataOperation(op: DataOperation, ctx: HarvestingContext ): DataOperation =
op.withAddedExtra(Map("foo" -> "bar"))
}
|
Beta Was this translation helpful? Give feedback.
-
Perfect answer. Thanks so much! |
Beta Was this translation helpful? Give feedback.
-
For being able to use it I had to add the constructor. Full code: package za.co.absa.spline.harvester.postprocessing
import org.apache.commons.configuration.Configuration
import scala.util.matching.Regex
import za.co.absa.commons.CaptureGroupReplacer
import za.co.absa.commons.config.ConfigurationImplicits.ConfigurationRequiredWrapper
import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import za.co.absa.spline.producer.model.v1_1._
class CustomFilter extends PostProcessingFilter {
def this(conf: Configuration) = this()
override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent =
event.withAddedExtra(Map("foo" -> "bar"))
override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ): ExecutionPlan =
plan.withAddedExtra(Map("foo" -> "bar"))
override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ): ReadOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processDataOperation(op: DataOperation, ctx: HarvestingContext ): DataOperation =
op.withAddedExtra(Map("foo" -> "bar"))
} |
Beta Was this translation helpful? Give feedback.
-
I used to put on the config props of th cluster
what do i need to put in order to caputre also the databrics notebook name and workspace url? sc._jvm.za.co.absa.spline.harvester.SparkLineageInitializer.enableLineageTracking(spark._jsparkSession) |
Beta Was this translation helpful? Give feedback.
-
As it was said before there is no out-of-the-box support for capturing databricks notebook or workspace Url. You still need to do it yourself and pass into Spline in one of available ways depending on your convenience (either using |
Beta Was this translation helpful? Give feedback.
LineageDispatcher
The goal of the dispatcher is to send the lineage data to the server. Currently we support kafka dispatcher, http dispatcher and some others for debuging and testing like logging dispatcher and console dispatcher. Typically you would implement your own dispatcher when you need another method of sending data like RabbitMq or store the data to S3 for some reason.
PostProcesingFilter
The goal of the filter is to change the lineage data before it is sent to the dispatcher. This might mean filtering out some sensitive data or adding additional information. Filters have access to the the original Spark
LogicalPlan
and theSparkSession
as means of metadata.UserExtraMetaDataPro…