Skip to content

Commit

Permalink
#547 Shuffle around info available in notifications.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Feb 19, 2025
1 parent 06b6046 commit 2f63ac1
Show file tree
Hide file tree
Showing 29 changed files with 125 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class PipelineInfo(
runtimeInfo: RuntimeInfo,
startedAt: Instant,
finishedAt: Option[Instant],
warningFlag: Boolean,
sparkApplicationId: Option[String],
failureException: Option[Throwable],
pipelineNotificationFailures: Seq[PipelineNotificationFailure],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.core.runner.splitter
package za.co.absa.pramen.api

trait RunMode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ case class PipelineStateSnapshot(
pipelineInfo: PipelineInfo,
batchId: Long,
isFinished: Boolean,
warningFlag: Boolean,
exitedNormally: Boolean,
exitCode: Int,
customShutdownHookCanRun: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,19 @@

package za.co.absa.pramen.api.status

import za.co.absa.pramen.api.RunMode

import java.time.LocalDate

case class RuntimeInfo(
isDryRun: Boolean = false, // If true, the pipeline won't do any writes, just the list of jobs it would run
isUndercover: Boolean = false, // If true, no bookkeeping will be done for the job
minRps: Int = 0, // Configured records per second that is considered bad if the actual rps is lower, ignored if 0
goodRps: Int = 0 // Configured records per second that is considered very good if the actual rps is higher, ignored if 0
runDateFrom: LocalDate, // Specifies the run date or the beginning of the run period.
runDateTo: Option[LocalDate] = None, // If set, the pipeline runs for a period between `runDateFrom` and `runDateTo` inclusive.
historicalRunMode: Option[RunMode] = None, // If set, specifies which historical mode the pipeline runs for.
isRerun: Boolean = false, // If true, the pipeline runs in rerun mode.
isDryRun: Boolean = false, // If true, the pipeline won't do any writes, just the list of jobs it would run.
isUndercover: Boolean = false, // If true, no bookkeeping will be done for the job.
isNewOnly: Boolean = false, // If true, the pipeline runs without catching up late data mode.
isLateOnly: Boolean = false, // If true, the pipeline runs in catching up late data only mode.
minRps: Int = 0, // Configured records per second that is considered bad if the actual rps is lower, ignored if 0.
goodRps: Int = 0 // Configured records per second that is considered very good if the actual rps is higher, ignored if 0.
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package za.co.absa.pramen.core.app.config

import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.RunMode
import za.co.absa.pramen.core.app.config.BookkeeperConfig.BOOKKEEPING_ENABLED
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.runner.splitter.RunMode
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.DateUtils.convertStrToDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,15 @@

package za.co.absa.pramen.core.notify.pipeline

import za.co.absa.pramen.api.PipelineInfo
import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement}
import za.co.absa.pramen.api.status.{PipelineNotificationFailure, TaskResult}

import java.time.Instant

case class PipelineNotification(
exception: Option[Throwable],
warningFlag: Boolean,
pipelineName: String,
environmentName: String,
sparkAppId: Option[String],
started: Instant,
finished: Instant,
pipelineInfo: PipelineInfo,
tasksCompleted: List[TaskResult],
pipelineNotificationFailures: List[PipelineNotificationFailure],
customEntries: List[NotificationEntry],
customSignature: List[TextElement]
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package za.co.absa.pramen.core.notify.pipeline

import za.co.absa.pramen.api.notification.{NotificationEntry, TextElement}
import za.co.absa.pramen.api.status.{PipelineNotificationFailure, TaskResult}
import za.co.absa.pramen.api.status.{PipelineNotificationFailure, RuntimeInfo, TaskResult}
import za.co.absa.pramen.core.app.config.RuntimeConfig

import java.time.Instant

trait PipelineNotificationBuilder {
def addRuntimeConfig(runtimeConfig: RuntimeConfig): Unit
def addRuntimeInfo(runtimeConfig: RuntimeInfo): Unit

def addFailureException(ex: Throwable): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
private val maxExceptionLength = ConfigUtils.getOptionInt(conf, NOTIFICATION_EXCEPTION_MAX_LENGTH_KEY)
private val strictFailures = ConfigUtils.getOptionBoolean(conf, NOTIFICATION_STRICT_FAILURES_KEY).getOrElse(true)

var runtimeConfig: Option[RuntimeConfig] = None
var runtimeInfo: Option[RuntimeInfo] = None
var appException: Option[Throwable] = None
var warningFlag: Boolean = false
var appName: String = "Unspecified Job"
Expand All @@ -79,8 +79,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val pipelineNotificationFailures = new ListBuffer[PipelineNotificationFailure]
val customEntries = new ListBuffer[NotificationEntry]

override def addRuntimeConfig(runtimeConfigIn: RuntimeConfig): Unit = {
runtimeConfig = Option(runtimeConfigIn)
override def addRuntimeInfo(runtimeInfoIn: RuntimeInfo): Unit = {
runtimeInfo = Option(runtimeInfoIn)
}

override def addFailureException(ex: Throwable): Unit = {
Expand Down Expand Up @@ -228,8 +228,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

introParagraph.withText(". ")

runtimeConfig.foreach { c =>
val executionInfoParagraph = renderExecutionInfo(c.runDate, c.runDateTo, c.isRerun, c.checkOnlyNewData, c.checkOnlyLateData)
runtimeInfo.foreach { c =>
val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly)
.withText(". ")
.paragraph
introParagraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,32 @@ object PipelineNotificationDirector {
*/
def build(notificationBuilder: PipelineNotificationBuilder,
notification: PipelineNotification,
validatedEmails: ValidatedEmails,
runtimeConfig: Option[RuntimeConfig])
validatedEmails: ValidatedEmails)
(implicit conf: Config): PipelineNotificationBuilder = {
val minRps = conf.getInt(Keys.WARN_THROUGHPUT_RPS)
val goodRps = conf.getInt(Keys.GOOD_THROUGHPUT_RPS)
val dryRun = conf.getBoolean(DRY_RUN)
val undercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false)
val finishedAt = notification.pipelineInfo.finishedAt.getOrElse(notification.pipelineInfo.startedAt)

notificationBuilder.addAppName(notification.pipelineName)
notificationBuilder.addEnvironmentName(notification.environmentName)
runtimeConfig.foreach(c => notificationBuilder.addRuntimeConfig(c))
notification.sparkAppId.foreach(id => notificationBuilder.addSparkAppId(id))
notificationBuilder.addAppDuration(notification.started, notification.finished)
notificationBuilder.addAppName(notification.pipelineInfo.pipelineName)
notificationBuilder.addEnvironmentName(notification.pipelineInfo.environment)
notificationBuilder.addRuntimeInfo(notification.pipelineInfo.runtimeInfo)
notification.pipelineInfo.sparkApplicationId.foreach(id => notificationBuilder.addSparkAppId(id))
notificationBuilder.addAppDuration(notification.pipelineInfo.startedAt, finishedAt)
notificationBuilder.addDryRun(dryRun)
notificationBuilder.addUndercover(undercover)

notification.exception.foreach(notificationBuilder.addFailureException)
notificationBuilder.addWarningFlag(notification.warningFlag)
notification.pipelineInfo.failureException.foreach(notificationBuilder.addFailureException)
notificationBuilder.addWarningFlag(notification.pipelineInfo.warningFlag)

notificationBuilder.addRpsMetrics(minRps, goodRps)

notification
.tasksCompleted
.foreach(notificationBuilder.addCompletedTask)

notification.pipelineNotificationFailures.foreach(pipelineNotificationFailure =>
notification.pipelineInfo.pipelineNotificationFailures.foreach(pipelineNotificationFailure =>
notificationBuilder.addPipelineNotificationFailure(pipelineNotificationFailure)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji}

import scala.collection.mutable.ListBuffer

class PipelineNotificationEmail(notification: PipelineNotification,
runtimeConfig: RuntimeConfig)
class PipelineNotificationEmail(notification: PipelineNotification)
(implicit conf: Config) extends Sendable {

import PipelineNotificationEmail._
Expand Down Expand Up @@ -61,7 +60,7 @@ class PipelineNotificationEmail(notification: PipelineNotification,
private lazy val notificationBuilder = {
val builder = new PipelineNotificationBuilderHtml

PipelineNotificationDirector.build(builder, notification, validatedEmails, Option(runtimeConfig))
PipelineNotificationDirector.build(builder, notification, validatedEmails)
builder
}

Expand Down Expand Up @@ -90,7 +89,7 @@ class PipelineNotificationEmail(notification: PipelineNotification,

private[core] def getEmailRecipients: String = {
if (conf.hasPath(Keys.MAIL_FAILURES_TO)) {
if (notification.exception.isDefined || notification.tasksCompleted.exists(t => t.runStatus.isFailure)) {
if (notification.pipelineInfo.failureException.isDefined || notification.tasksCompleted.exists(t => t.runStatus.isFailure)) {
val to = conf.getString(Keys.MAIL_FAILURES_TO)
log.warn(s"Sending failures to the special mail list: $to")
to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.pramen.core.runner.splitter

import za.co.absa.pramen.api.RunMode
import za.co.absa.pramen.core.app.config.RuntimeConfig

import java.time.LocalDate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.pramen.core.runner.splitter

import za.co.absa.pramen.api.RunMode
import za.co.absa.pramen.api.jobdef.Schedule
import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,43 +82,16 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
}

override def getState: PipelineStateSnapshot = synchronized {
val appException = if (!exitedNormally && failureException.isEmpty && signalException.isDefined) {
signalException
} else
failureException

val notificationBuilderImpl = notificationBuilder.asInstanceOf[NotificationBuilderImpl]
val customNotification = CustomNotification (
notificationBuilderImpl.entries,
notificationBuilderImpl.signature
)

val minRps = ConfigUtils.getOptionInt(conf, WARN_THROUGHPUT_RPS).getOrElse(0)
val goodRps = ConfigUtils.getOptionInt(conf, GOOD_THROUGHPUT_RPS).getOrElse(0)
val dryRun = ConfigUtils.getOptionBoolean(conf, DRY_RUN).getOrElse(false)
val undercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false)

PipelineStateSnapshot(
PipelineInfo(
pipelineName,
environmentName,
RuntimeInfo(
dryRun,
undercover,
minRps,
goodRps
),
startedInstant,
finishedInstant,
sparkAppId,
appException,
pipelineNotificationFailures.toSeq,
pipelineId,
tenant
),
getPipelineInfo,
batchId,
isFinished,
warningFlag,
exitedNormally,
exitCode,
customShutdownHookCanRun,
Expand All @@ -128,6 +101,43 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
)
}

def getPipelineInfo: PipelineInfo = synchronized {
val appException = if (!exitedNormally && failureException.isEmpty && signalException.isDefined) {
signalException
} else
failureException

val minRps = ConfigUtils.getOptionInt(conf, WARN_THROUGHPUT_RPS).getOrElse(0)
val goodRps = ConfigUtils.getOptionInt(conf, GOOD_THROUGHPUT_RPS).getOrElse(0)
val dryRun = ConfigUtils.getOptionBoolean(conf, DRY_RUN).getOrElse(false)
val undercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false)

PipelineInfo(
pipelineName,
environmentName,
RuntimeInfo(
runtimeConfig.runDate,
runtimeConfig.runDateTo,
runtimeConfig.runDateTo.map(_ => runtimeConfig.historicalRunMode),
runtimeConfig.isRerun,
dryRun,
undercover,
runtimeConfig.checkOnlyNewData,
runtimeConfig.checkOnlyLateData,
minRps,
goodRps
),
startedInstant,
finishedInstant,
warningFlag,
sparkAppId,
appException,
pipelineNotificationFailures.toSeq,
pipelineId,
tenant
)
}

override def getBatchId: Long = batchId

override def setShutdownHookCanRun(): Unit = synchronized {
Expand Down Expand Up @@ -263,19 +273,17 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
val customEntries = notificationBuilderImpl.entries
val customSignature = notificationBuilderImpl.signature

val notification = PipelineNotification(failureException,
warningFlag,
pipelineName,
environmentName,
sparkAppId,
startedInstant,
finishedInstant.getOrElse(Instant.now()),
val pipelineInfo = getPipelineInfo
val finishedAt = pipelineInfo.finishedAt.getOrElse(Instant.now())
val finishedPipelineInfo = pipelineInfo.copy(finishedAt = Option(finishedAt))

val notification = PipelineNotification(
finishedPipelineInfo,
realTaskResults.toList,
pipelineNotificationFailures.toList,
customEntries.toList,
customSignature.toList)
if (realTaskResults.nonEmpty || sendEmailIfNoNewData || failureException.nonEmpty) {
val email = new PipelineNotificationEmail(notification, runtimeConfig)
val email = new PipelineNotificationEmail(notification)
email.send()
} else {
log.info("No tasks were ran. The empty notification email won't be sent.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
</style>
<body><p>Hi,</p>

<p><b>(DRY RUN) </b>This is a notification from Pramen for <b>MyApp</b> on <b>MyEnv</b>. The job has <span class="tdwarn">succeeded with warnings</span>. Application ID: <b>app_123</b>.</p>
<p><b>(DRY RUN) </b>This is a notification from Pramen for <b>MyApp</b> on <b>MyEnv</b>. The job has <span class="tdwarn">succeeded with warnings</span>. Execution for the run date <b>2022-02-18</b>. Application ID: <b>app_123</b>.</p>

<p>Job started at <b>2022-03-21 16:29 +0200</b>, finished at <b>2022-03-21 19:16 +0200</b>. Elapsed time: <b>2 hours and 46 minutes</b>. The job ran in <i>undercover</i> mode - no updates to bookkeeping tables are saved.</p>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
</style>
<body><p>Hi,</p>

<p>This is a notification from Pramen for <b>MyApp</b> on <b>MyEnv</b>. The job has <span class="tdgreen">succeeded</span>.</p>
<p>This is a notification from Pramen for <b>MyApp</b> on <b>MyEnv</b>. The job has <span class="tdgreen">succeeded</span>. </p>

<p>Job started at <b>2022-03-21 16:29 +0200</b>, finished at <b>2022-03-21 19:16 +0200</b>. Elapsed time: <b>2 hours and 46 minutes</b>.</p>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package za.co.absa.pramen.core

import za.co.absa.pramen.api.RunMode
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.runner.splitter.RunMode

import java.time.LocalDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.app.config

import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.runner.splitter.RunMode
import za.co.absa.pramen.api.RunMode

class RunModeSuite extends AnyWordSpec {
"RunMode.fromString()" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ package za.co.absa.pramen.core.mocks
import za.co.absa.pramen.api.PipelineInfo
import za.co.absa.pramen.api.status.{PipelineNotificationFailure, RuntimeInfo}

import java.time.Instant
import java.time.{Instant, LocalDate}

object PipelineInfoFactory {
def getDummyPipelineInfo(pipelineName: String = "Dummy Pipeline",
environment: String = "DEV",
runtimeInfo: RuntimeInfo = RuntimeInfo(),
runtimeInfo: RuntimeInfo = RuntimeInfo(LocalDate.parse("2022-02-18")),
startedAt: Instant = Instant.ofEpochSecond(1718609409),
finishedAt: Option[Instant] = None,
warningFlag: Boolean = false,
sparkApplicationId: Option[String] = Some("testid-12345"),
failureException: Option[Throwable] = None,
pipelineNotificationFailures: Seq[PipelineNotificationFailure] = Seq.empty,
pipelineId: String = "dummy_pipeline_id",
tenant: Option[String] = Some("Dummy tenant")): PipelineInfo = {
PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, sparkApplicationId, failureException, pipelineNotificationFailures, java.util.UUID.randomUUID().toString,tenant)
PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, failureException, pipelineNotificationFailures, pipelineId, tenant)
}
}
Loading

0 comments on commit 2f63ac1

Please sign in to comment.