diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 9f6e0daab..faba9d115 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -28,42 +28,42 @@ jobs:
strategy:
matrix:
python-version: [ "3.10" ]
- runs-on: ubuntu-latest
+ runs-on: ubuntu-22.04
name: Test Pramen-Py
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.branch }}
-
+
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
-
+
- name: setup poetry
uses: abatilo/actions-poetry@v2.1.6
with:
poetry-version: 1.4.2
-
+
- name: install dependencies
working-directory: "./pramen-py"
run: make --silent install
-
+
- name: test
working-directory: "./pramen-py"
env:
ENV: ci
run: make --silent test
-
+
release-python:
needs: [ "test-python" ]
- runs-on: ubuntu-latest
+ runs-on: ubuntu-22.04
name: Release Python artifact
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.branch }}
-
+
- name: Prepare the release branch
id: release_branch1
working-directory: "./pramen"
@@ -74,7 +74,7 @@ jobs:
git config --global user.name "CI/CD bot"
git checkout -b release/$VERSION
git push --set-upstream origin release/$VERSION
-
+
- name: Update version number
id: release_branch
working-directory: "./pramen-py"
@@ -88,35 +88,35 @@ jobs:
git commit -m "Update version number to $VERSION"
git push
fi
-
+
- name: install project dependencies
run: |
sudo apt install -y --no-install-recommends \
libssl-dev \
make
-
+
- uses: actions/setup-python@v4
with:
python-version: "3.10"
-
+
- uses: abatilo/actions-poetry@v2.1.6
with:
poetry-version: 1.4.2
-
+
- name: Install dependencies
working-directory: "./pramen-py"
run: poetry install --no-interaction --no-root
-
+
- name: build and publish the wheel to jfrog
working-directory: "./pramen-py"
env:
ENV: pypi
PRAMENPY_PYPI_TOKEN: ${{ secrets.PRAMENPY_PYPI_TOKEN }}
run: make --silent publish
-
+
release-sbt:
needs: [ "release-python" ]
- runs-on: ubuntu-latest
+ runs-on: ubuntu-22.04
name: Release Scala artifacts
steps:
- name: Checkout code
@@ -158,7 +158,7 @@ jobs:
create-pr:
needs: [ "release-sbt" ]
- runs-on: ubuntu-latest
+ runs-on: ubuntu-22.04
name: Create Pull Request
steps:
@@ -181,6 +181,6 @@ jobs:
- name: Create Pull Request
run: gh pr create -B "$BASE" -H "release/$VERSION" --title "Release Pramen v$VERSION" --body 'Created by Github action'
env:
- BASE: ${{ github.event.inputs.branch }}
+ BASE: ${{ github.head_ref || github.ref_name }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
VERSION: ${{ steps.release_branch3.outputs.VERSION }}
diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml
index 6e50ea496..ff6b1e1b8 100644
--- a/.github/workflows/scala.yml
+++ b/.github/workflows/scala.yml
@@ -18,7 +18,7 @@ on:
jobs:
build-sbt:
- runs-on: ubuntu-latest
+ runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
@@ -46,16 +46,6 @@ jobs:
distribution: temurin
java-version: 8
cache: sbt
- - name: Install sbt
- run: |
- sudo apt-get update
- sudo apt-get install apt-transport-https curl gnupg -yqq
- echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
- echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
- curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
- sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
- sudo apt-get update
- sudo apt-get install sbt
- name: Build and run unit tests
working-directory: ./pramen
run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}}
diff --git a/README.md b/README.md
index b573b8713..1e2956de7 100644
--- a/README.md
+++ b/README.md
@@ -2504,6 +2504,27 @@ You can use any source/sink combination in transfer jobs.
We describe here a more complicated use cases.
+### Dynamically changing Spark Application description
+You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing.
+
+Example configuration:
+```hocon
+pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate"
+```
+
+These variables are available:
+
+| Variable | Description |
+|--------------|-------------------------------------------------------------------------------|
+| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). |
+| @tenant | The name of the tenant (if defined at `pramen.environment.name`). |
+| @environment | The environment (if defined at `pramen.tenant`). |
+| @jobName | The name of the job as defined in the operation definition. |
+| @infoDate | The information date the job is running for. |
+| @outputTable | The output metastore table of the job. |
+| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. |
+
+
### Startup and shutdown hooks
Startup and shutdown hooks allow running custom code before and after the pipeline runs.
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
index edb4d9ce3..655b8edf1 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
@@ -42,7 +42,8 @@ case class RuntimeConfig(
parallelTasks: Int,
stopSparkSession: Boolean,
allowEmptyPipeline: Boolean,
- historicalRunMode: RunMode
+ historicalRunMode: RunMode,
+ sparkAppDescriptionTemplate: Option[String]
)
object RuntimeConfig {
@@ -66,6 +67,7 @@ object RuntimeConfig {
val STOP_SPARK_SESSION = "pramen.stop.spark.session"
val VERBOSE = "pramen.verbose"
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
+ val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"
def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
@@ -128,6 +130,7 @@ object RuntimeConfig {
}
val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
+ val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)
RuntimeConfig(
isDryRun = isDryRun,
@@ -144,7 +147,8 @@ object RuntimeConfig {
parallelTasks = parallelTasks,
stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION),
allowEmptyPipeline,
- runMode
+ runMode,
+ sparkAppDescriptionTemplate
)
}
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
index 3e7f6bef9..ae22f4f06 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
@@ -308,8 +308,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val errorMessage = ex.getMessage
val errorMessageTruncated = maxReasonLength match {
- case Some(maxLength) if errorMessage.length > maxLength => errorMessage.substring(0, maxLength) + "..."
- case _ => errorMessage
+ case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..."
+ case _ => StringUtils.escapeHTML(errorMessage)
}
paragraphBuilder
@@ -333,7 +333,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val stderrMsg = if (stderr.isEmpty) "" else s"""Last stderr lines:\n${stderr.mkString("", EOL, EOL)}"""
s"$msg\n$stdoutMsg\n$stderrMsg"
case ex: Throwable =>
- renderThrowable(ex, maximumLength = maxExceptionLength)
+ renderThrowable(ex, maximumLength = maxExceptionLength, escapeHTML = true)
}
builder.withUnformattedText(text)
@@ -648,10 +648,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}
- maxReasonLength match {
+ StringUtils.escapeHTML(maxReasonLength match {
case Some(maxLength) if reason.length > maxLength => reason.substring(0, maxLength) + "..."
case _ => reason
- }
+ })
}
private[core] def getFinishTime(task: TaskResult): String = {
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
index e3c747c4e..48d8bedf1 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
@@ -17,7 +17,7 @@
package za.co.absa.pramen.core.runner.task
import com.typesafe.config.Config
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.lit
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
@@ -31,12 +31,13 @@ import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
+import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.hive.HiveHelper
-import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils}
+import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils}
import java.sql.Date
import java.time.{Instant, LocalDate}
@@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config,
runtimeConfig: RuntimeConfig,
pipelineState: PipelineState,
applicationId: String) extends TaskRunner {
+ import TaskRunnerBase._
+
implicit private val ecDefault: ExecutionContext = ExecutionContext.global
implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)
@@ -118,6 +121,13 @@ abstract class TaskRunnerBase(conf: Config,
/** Runs a task in the single thread. Performs all task logging and notification sending activities. */
protected def runTask(task: Task): RunStatus = {
val started = Instant.now()
+
+ runtimeConfig.sparkAppDescriptionTemplate.foreach { template =>
+ val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf)
+ val spark = SparkSession.builder().getOrCreate()
+ spark.sparkContext.setJobDescription(description)
+ }
+
task.job.operation.killMaxExecutionTimeSeconds match {
case Some(timeout) if timeout > 0 =>
@volatile var runStatus: RunStatus = null
@@ -606,3 +616,23 @@ abstract class TaskRunnerBase(conf: Config,
}
}
}
+
+object TaskRunnerBase {
+ def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = {
+ val job = task.job
+ val pipelineName = conf.getString(PIPELINE_NAME_KEY)
+ val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN")
+ val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN")
+ val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else ""
+
+ template.replaceAll("@jobName", job.name)
+ .replaceAll("@infoDate", task.infoDate.toString)
+ .replaceAll("@metastoreTable", job.outputTable.name)
+ .replaceAll("@outputTable", job.outputTable.name)
+ .replaceAll("@table", job.outputTable.name)
+ .replaceAll("@pipeline", pipelineName)
+ .replaceAll("@tenant", tenant)
+ .replaceAll("@environment", environmentName)
+ .replaceAll("@dryRun", dryRun)
+ }
+}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala
index 279abf770..8f378f105 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala
@@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.TokenLockFactory
-import za.co.absa.pramen.core.pipeline.Task
+import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
+import za.co.absa.pramen.core.pipeline.{Job, Task}
import za.co.absa.pramen.core.state.PipelineState
-import za.co.absa.pramen.core.utils.Emoji
+import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji}
import java.util.concurrent.Executors.newFixedThreadPool
import java.util.concurrent.{ExecutorService, Semaphore}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala
index 56f456f91..09d81f7e9 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala
@@ -170,9 +170,10 @@ object StringUtils {
}
/** Renders an exception as a string */
- def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None): String = {
+ def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None, escapeHTML: Boolean = false): String = {
val prefix = " " * (level * 2)
- val base = s"""${ex.toString}\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}"""
+ val errMsg = if (escapeHTML) StringUtils.escapeHTML(ex.toString) else ex.toString
+ val base = s"""$errMsg\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}"""
val cause = Option(ex.getCause) match {
case Some(c) if level < 6 => s"\n${prefix}Caused by " + renderThrowable(c, level + 1)
case _ => ""
@@ -346,4 +347,26 @@ object StringUtils {
output.toString()
}
+ /**
+ * Escapes HTML tags and symbols from a string.
+ * Based on https://stackoverflow.com/a/25228492/1038282
+ *
+ * @param s A string to escape HTML from.
+ * @return An escaped string.
+ */
+ def escapeHTML(s: String): String = {
+ val out = new StringBuilder(Math.max(64, s.length))
+ var i = 0
+ while (i < s.length) {
+ val c = s.charAt(i)
+ if (c == '<' || c == '>' || c == '&') {
+ out.append("")
+ out.append(c.toInt)
+ out.append(';')
+ }
+ else out.append(c)
+ i += 1
+ }
+ out.toString
+ }
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
index baf164588..1121c3ece 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
@@ -112,13 +112,27 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean, incorrect
// WARNING. Do not forget that `null` is a valid value returned by RecordSet methods that return a reference type objects.
dataType match {
- case BIT | BOOLEAN => rs.getBoolean(columnIndex)
- case TINYINT => rs.getByte(columnIndex)
- case SMALLINT => rs.getShort(columnIndex)
- case INTEGER => rs.getInt(columnIndex)
- case BIGINT => rs.getLong(columnIndex)
- case FLOAT => rs.getFloat(columnIndex)
- case DOUBLE => rs.getDouble(columnIndex)
+ case BIT | BOOLEAN =>
+ val v = rs.getBoolean(columnIndex)
+ if (rs.wasNull()) null else v
+ case TINYINT =>
+ val v = rs.getByte(columnIndex)
+ if (rs.wasNull()) null else v
+ case SMALLINT =>
+ val v = rs.getShort(columnIndex)
+ if (rs.wasNull()) null else v
+ case INTEGER =>
+ val v = rs.getInt(columnIndex)
+ if (rs.wasNull()) null else v
+ case BIGINT =>
+ val v = rs.getLong(columnIndex)
+ if (rs.wasNull()) null else v
+ case FLOAT =>
+ val v = rs.getFloat(columnIndex)
+ if (rs.wasNull()) null else v
+ case DOUBLE =>
+ val v = rs.getDouble(columnIndex)
+ if (rs.wasNull()) null else v
case REAL => rs.getBigDecimal(columnIndex)
case NUMERIC => rs.getBigDecimal(columnIndex)
case DATE => sanitizeDate(rs.getDate(columnIndex))
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
index c07b6f540..542321914 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
@@ -37,7 +37,8 @@ object RuntimeConfigFactory {
parallelTasks: Int = 1,
stopSparkSession: Boolean = false,
allowEmptyPipeline: Boolean = false,
- historicalRunMode: RunMode = RunMode.CheckUpdates): RuntimeConfig = {
+ historicalRunMode: RunMode = RunMode.CheckUpdates,
+ sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = {
RuntimeConfig(isDryRun,
isRerun,
runTables,
@@ -52,7 +53,8 @@ object RuntimeConfigFactory {
parallelTasks,
stopSparkSession,
allowEmptyPipeline,
- historicalRunMode)
+ historicalRunMode,
+ sparkAppDescriptionTemplate)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala
index 49489a7f4..d590e8845 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala
@@ -42,6 +42,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
| load.date.to = 2021-01-10
| parallel.tasks = 4
| stop.spark.session = true
+ | job.description.template = "Test template"
|}
|""".stripMargin
@@ -63,6 +64,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.runDateTo.get.toString == "2021-01-10")
assert(runtimeConfig.parallelTasks == 4)
assert(runtimeConfig.stopSparkSession)
+ assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template"))
}
"have default values" in {
@@ -83,6 +85,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.runDateTo.isEmpty)
assert(runtimeConfig.parallelTasks == 1)
assert(!runtimeConfig.stopSparkSession)
+ assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala
index 026ef5a42..c9f1a556b 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala
@@ -278,7 +278,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
assert(df.schema.fields(2).name == "DESCRIPTION")
assert(df.schema.fields(3).name == "EMAIL")
assert(df.schema.fields(4).name == "FOUNDED")
- assert(df.schema.fields(5).name == "LAST_UPDATED")
+ assert(df.schema.fields(5).name == "IS_TAX_FREE")
}
"get the source data frame for source with disabled count query" in {
@@ -301,7 +301,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
assert(df.schema.fields(2).name == "DESCRIPTION")
assert(df.schema.fields(3).name == "EMAIL")
assert(df.schema.fields(4).name == "FOUNDED")
- assert(df.schema.fields(5).name == "LAST_UPDATED")
+ assert(df.schema.fields(5).name == "IS_TAX_FREE")
TransientTableManager.reset()
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala
index 73e94c0be..fd28e87f3 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala
@@ -64,6 +64,8 @@ object RdbExampleTable {
| description VARCHAR NOT NULL,
| email VARCHAR(50) NOT NULL,
| founded DATE NOT NULL,
+ | is_tax_free BOOLEAN,
+ | tax_id BIGINT,
| last_updated TIMESTAMP NOT NULL,
| info_date VARCHAR(10) NOT NULL,
| PRIMARY KEY (id))
@@ -75,10 +77,10 @@ object RdbExampleTable {
)
val inserts: Seq[String] = Seq(
- s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')",
- s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')",
- s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')",
- s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')"
+ s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', FALSE, 123, TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')",
+ s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TRUE, 456, TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')",
+ s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', FALSE, NULL, TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')",
+ s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', NULL, NULL, TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')"
)
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala
index 8cfed0f1b..fd5b0a282 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala
@@ -395,7 +395,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
val df = reader.getData(Query.Table("company"), null, null, Seq.empty[String])
assert(df.count() == 4)
- assert(df.schema.fields.length == 7)
+ assert(df.schema.fields.length == 9)
}
"return selected column for a table snapshot-like query" in {
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala
index 5325cea7e..e3ad99f03 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala
@@ -72,7 +72,4 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture {
}
}
- "Generic SQL generator" should {
- }
-
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala
index 8a3a547a5..d8cca366d 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala
@@ -68,7 +68,7 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture {
}
"generate data queries without date ranges" in {
- assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company")
+ assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company")
}
"generate data queries when list of columns is specified" in {
@@ -76,7 +76,7 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture {
}
"generate data queries with limit clause date ranges" in {
- assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100")
+ assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100")
}
"generate ranged count queries" when {
@@ -118,30 +118,30 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture {
"generate ranged data queries" when {
"date is in DATE format" in {
assert(gen.getDataQuery("company", date1, date1, Nil, None) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'")
assert(gen.getDataQuery("company", date1, date2, Nil, None) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'")
}
"date is in STRING format" in {
assert(genStr.getDataQuery("company", date1, date1, Nil, None) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'")
assert(genStr.getDataQuery("company", date1, date2, Nil, None) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}
"date is in NUMBER format" in {
assert(genNum.getDataQuery("company", date1, date1, Nil, None) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817")
assert(genNum.getDataQuery("company", date1, date2, Nil, None) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830")
}
"with limit records" in {
assert(gen.getDataQuery("company", date1, date1, Nil, Some(100)) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100")
assert(gen.getDataQuery("company", date1, date2, Nil, Some(100)) ==
- "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100")
+ "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100")
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala
index e85f2890a..35ce42264 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala
@@ -138,6 +138,16 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "nullable" : true,
| "metadata" : { }
| }, {
+ | "name" : "IS_TAX_FREE",
+ | "type" : "boolean",
+ | "nullable" : true,
+ | "metadata" : { }
+ | }, {
+ | "name" : "TAX_ID",
+ | "type" : "long",
+ | "nullable" : true,
+ | "metadata" : { }
+ | }, {
| "name" : "LAST_UPDATED",
| "type" : "timestamp",
| "nullable" : true,
@@ -163,17 +173,22 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "ID" : 1,
| "NAME" : "Company1",
| "EMAIL" : "company1@example.com",
- | "FOUNDED" : "2000-10-11"
+ | "FOUNDED" : "2000-10-11",
+ | "IS_TAX_FREE" : false,
+ | "TAX_ID" : 123
|}, {
| "ID" : 2,
| "NAME" : "Company2",
| "EMAIL" : "company2@example.com",
- | "FOUNDED" : "2005-03-29"
+ | "FOUNDED" : "2005-03-29",
+ | "IS_TAX_FREE" : true,
+ | "TAX_ID" : 456
|}, {
| "ID" : 3,
| "NAME" : "Company3",
| "EMAIL" : "company3@example.com",
- | "FOUNDED" : "2016-12-30"
+ | "FOUNDED" : "2016-12-30",
+ | "IS_TAX_FREE" : false
|}, {
| "ID" : 4,
| "NAME" : "Company4",
@@ -181,7 +196,7 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "FOUNDED" : "2016-12-31"
|} ]""".stripMargin
- val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded FROM $tableName")
+ val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded, is_tax_free, tax_id FROM $tableName")
val actual = SparkUtils.convertDataFrameToPrettyJSON(df)
compareText(actual, expected)
@@ -236,35 +251,35 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
}
"getDecimalDataType" should {
- val resultSet = mock(classOf[ResultSet])
- val resultSetMetaData = mock(classOf[ResultSetMetaData])
+ val resultSet = mock(classOf[ResultSet])
+ val resultSetMetaData = mock(classOf[ResultSetMetaData])
- when(resultSetMetaData.getColumnCount).thenReturn(1)
- when(resultSet.getMetaData).thenReturn(resultSetMetaData)
+ when(resultSetMetaData.getColumnCount).thenReturn(1)
+ when(resultSet.getMetaData).thenReturn(resultSetMetaData)
- "return normal decimal for correct precision and scale" in {
- val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
- when(resultSetMetaData.getPrecision(0)).thenReturn(10)
- when(resultSetMetaData.getScale(0)).thenReturn(2)
+ "return normal decimal for correct precision and scale" in {
+ val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
+ when(resultSetMetaData.getPrecision(0)).thenReturn(10)
+ when(resultSetMetaData.getScale(0)).thenReturn(2)
- assert(iterator.getDecimalDataType(0) == NUMERIC)
- }
+ assert(iterator.getDecimalDataType(0) == NUMERIC)
+ }
- "return fixed decimal for incorrect precision and scale" in {
- val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
- when(resultSetMetaData.getPrecision(0)).thenReturn(0)
- when(resultSetMetaData.getScale(0)).thenReturn(2)
+ "return fixed decimal for incorrect precision and scale" in {
+ val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
+ when(resultSetMetaData.getPrecision(0)).thenReturn(0)
+ when(resultSetMetaData.getScale(0)).thenReturn(2)
- assert(iterator.getDecimalDataType(0) == NUMERIC)
- }
+ assert(iterator.getDecimalDataType(0) == NUMERIC)
+ }
- "return string type for incorrect precision and scale" in {
- val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true)
- when(resultSetMetaData.getPrecision(0)).thenReturn(0)
- when(resultSetMetaData.getScale(0)).thenReturn(2)
+ "return string type for incorrect precision and scale" in {
+ val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true)
+ when(resultSetMetaData.getPrecision(0)).thenReturn(0)
+ when(resultSetMetaData.getScale(0)).thenReturn(2)
- assert(iterator.getDecimalDataType(0) == VARCHAR)
- }
+ assert(iterator.getDecimalDataType(0) == VARCHAR)
+ }
}
"sanitizeDateTime" when {
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala
index 73cddc6b2..757763046 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala
@@ -188,6 +188,12 @@ class StringUtilsSuite extends AnyWordSpec {
assert(s.contains("java.lang.RuntimeException: test"))
}
+ "render a throwable with an HTML message" in {
+ val ex = new RuntimeException("test exception")
+ val s = renderThrowable(ex, escapeHTML = true)
+ assert(s.contains("java.lang.RuntimeException: test <b>exception</b>"))
+ }
+
"render a throwable with a length limit" in {
val ex = new RuntimeException("test")
val s = renderThrowable(ex, maximumLength = Some(4))
@@ -384,4 +390,14 @@ class StringUtilsSuite extends AnyWordSpec {
}
}
+ "escapeHTML" should {
+ "escape HTML from a string" in {
+ val s = """This is an HTML &123"""
+
+ val actual = StringUtils.escapeHTML(s)
+
+ assert(actual == """This <b>is</b> an HTML &123""")
+ }
+ }
+
}