diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9f6e0daab..faba9d115 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -28,42 +28,42 @@ jobs: strategy: matrix: python-version: [ "3.10" ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 name: Test Pramen-Py steps: - uses: actions/checkout@v4 with: ref: ${{ github.event.inputs.branch }} - + - uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - + - name: setup poetry uses: abatilo/actions-poetry@v2.1.6 with: poetry-version: 1.4.2 - + - name: install dependencies working-directory: "./pramen-py" run: make --silent install - + - name: test working-directory: "./pramen-py" env: ENV: ci run: make --silent test - + release-python: needs: [ "test-python" ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 name: Release Python artifact steps: - name: Checkout code uses: actions/checkout@v4 with: ref: ${{ github.event.inputs.branch }} - + - name: Prepare the release branch id: release_branch1 working-directory: "./pramen" @@ -74,7 +74,7 @@ jobs: git config --global user.name "CI/CD bot" git checkout -b release/$VERSION git push --set-upstream origin release/$VERSION - + - name: Update version number id: release_branch working-directory: "./pramen-py" @@ -88,35 +88,35 @@ jobs: git commit -m "Update version number to $VERSION" git push fi - + - name: install project dependencies run: | sudo apt install -y --no-install-recommends \ libssl-dev \ make - + - uses: actions/setup-python@v4 with: python-version: "3.10" - + - uses: abatilo/actions-poetry@v2.1.6 with: poetry-version: 1.4.2 - + - name: Install dependencies working-directory: "./pramen-py" run: poetry install --no-interaction --no-root - + - name: build and publish the wheel to jfrog working-directory: "./pramen-py" env: ENV: pypi PRAMENPY_PYPI_TOKEN: ${{ secrets.PRAMENPY_PYPI_TOKEN }} run: make --silent publish - + release-sbt: needs: [ "release-python" ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 name: Release Scala artifacts steps: - name: Checkout code @@ -158,7 +158,7 @@ jobs: create-pr: needs: [ "release-sbt" ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 name: Create Pull Request steps: @@ -181,6 +181,6 @@ jobs: - name: Create Pull Request run: gh pr create -B "$BASE" -H "release/$VERSION" --title "Release Pramen v$VERSION" --body 'Created by Github action' env: - BASE: ${{ github.event.inputs.branch }} + BASE: ${{ github.head_ref || github.ref_name }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} VERSION: ${{ steps.release_branch3.outputs.VERSION }} diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 6e50ea496..ff6b1e1b8 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -18,7 +18,7 @@ on: jobs: build-sbt: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: @@ -46,16 +46,6 @@ jobs: distribution: temurin java-version: 8 cache: sbt - - name: Install sbt - run: | - sudo apt-get update - sudo apt-get install apt-transport-https curl gnupg -yqq - echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list - echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list - curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import - sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg - sudo apt-get update - sudo apt-get install sbt - name: Build and run unit tests working-directory: ./pramen run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}} diff --git a/README.md b/README.md index b573b8713..1e2956de7 100644 --- a/README.md +++ b/README.md @@ -2504,6 +2504,27 @@ You can use any source/sink combination in transfer jobs. We describe here a more complicated use cases. +### Dynamically changing Spark Application description +You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing. + +Example configuration: +```hocon +pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate" +``` + +These variables are available: + +| Variable | Description | +|--------------|-------------------------------------------------------------------------------| +| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). | +| @tenant | The name of the tenant (if defined at `pramen.environment.name`). | +| @environment | The environment (if defined at `pramen.tenant`). | +| @jobName | The name of the job as defined in the operation definition. | +| @infoDate | The information date the job is running for. | +| @outputTable | The output metastore table of the job. | +| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. | + + ### Startup and shutdown hooks Startup and shutdown hooks allow running custom code before and after the pipeline runs. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index edb4d9ce3..655b8edf1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -42,7 +42,8 @@ case class RuntimeConfig( parallelTasks: Int, stopSparkSession: Boolean, allowEmptyPipeline: Boolean, - historicalRunMode: RunMode + historicalRunMode: RunMode, + sparkAppDescriptionTemplate: Option[String] ) object RuntimeConfig { @@ -66,6 +67,7 @@ object RuntimeConfig { val STOP_SPARK_SESSION = "pramen.stop.spark.session" val VERBOSE = "pramen.verbose" val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline" + val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -128,6 +130,7 @@ object RuntimeConfig { } val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false) + val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) RuntimeConfig( isDryRun = isDryRun, @@ -144,7 +147,8 @@ object RuntimeConfig { parallelTasks = parallelTasks, stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION), allowEmptyPipeline, - runMode + runMode, + sparkAppDescriptionTemplate ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index 3e7f6bef9..ae22f4f06 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -308,8 +308,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val errorMessage = ex.getMessage val errorMessageTruncated = maxReasonLength match { - case Some(maxLength) if errorMessage.length > maxLength => errorMessage.substring(0, maxLength) + "..." - case _ => errorMessage + case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..." + case _ => StringUtils.escapeHTML(errorMessage) } paragraphBuilder @@ -333,7 +333,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val stderrMsg = if (stderr.isEmpty) "" else s"""Last stderr lines:\n${stderr.mkString("", EOL, EOL)}""" s"$msg\n$stdoutMsg\n$stderrMsg" case ex: Throwable => - renderThrowable(ex, maximumLength = maxExceptionLength) + renderThrowable(ex, maximumLength = maxExceptionLength, escapeHTML = true) } builder.withUnformattedText(text) @@ -648,10 +648,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } } - maxReasonLength match { + StringUtils.escapeHTML(maxReasonLength match { case Some(maxLength) if reason.length > maxLength => reason.substring(0, maxLength) + "..." case _ => reason - } + }) } private[core] def getFinishTime(task: TaskResult): String = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index e3c747c4e..48d8bedf1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.runner.task import com.typesafe.config.Config -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.lit import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ @@ -31,12 +31,13 @@ import za.co.absa.pramen.core.lock.TokenLockFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.pipeline.JobPreRunStatus._ +import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY} import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.state.PipelineState import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.SparkUtils._ import za.co.absa.pramen.core.utils.hive.HiveHelper -import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils} +import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils} import java.sql.Date import java.time.{Instant, LocalDate} @@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config, runtimeConfig: RuntimeConfig, pipelineState: PipelineState, applicationId: String) extends TaskRunner { + import TaskRunnerBase._ + implicit private val ecDefault: ExecutionContext = ExecutionContext.global implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay) @@ -118,6 +121,13 @@ abstract class TaskRunnerBase(conf: Config, /** Runs a task in the single thread. Performs all task logging and notification sending activities. */ protected def runTask(task: Task): RunStatus = { val started = Instant.now() + + runtimeConfig.sparkAppDescriptionTemplate.foreach { template => + val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf) + val spark = SparkSession.builder().getOrCreate() + spark.sparkContext.setJobDescription(description) + } + task.job.operation.killMaxExecutionTimeSeconds match { case Some(timeout) if timeout > 0 => @volatile var runStatus: RunStatus = null @@ -606,3 +616,23 @@ abstract class TaskRunnerBase(conf: Config, } } } + +object TaskRunnerBase { + def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = { + val job = task.job + val pipelineName = conf.getString(PIPELINE_NAME_KEY) + val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN") + val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN") + val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else "" + + template.replaceAll("@jobName", job.name) + .replaceAll("@infoDate", task.infoDate.toString) + .replaceAll("@metastoreTable", job.outputTable.name) + .replaceAll("@outputTable", job.outputTable.name) + .replaceAll("@table", job.outputTable.name) + .replaceAll("@pipeline", pipelineName) + .replaceAll("@tenant", tenant) + .replaceAll("@environment", environmentName) + .replaceAll("@dryRun", dryRun) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala index 279abf770..8f378f105 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala @@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.FatalErrorWrapper import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.lock.TokenLockFactory -import za.co.absa.pramen.core.pipeline.Task +import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY} +import za.co.absa.pramen.core.pipeline.{Job, Task} import za.co.absa.pramen.core.state.PipelineState -import za.co.absa.pramen.core.utils.Emoji +import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji} import java.util.concurrent.Executors.newFixedThreadPool import java.util.concurrent.{ExecutorService, Semaphore} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala index 56f456f91..09d81f7e9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala @@ -170,9 +170,10 @@ object StringUtils { } /** Renders an exception as a string */ - def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None): String = { + def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None, escapeHTML: Boolean = false): String = { val prefix = " " * (level * 2) - val base = s"""${ex.toString}\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}""" + val errMsg = if (escapeHTML) StringUtils.escapeHTML(ex.toString) else ex.toString + val base = s"""$errMsg\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}""" val cause = Option(ex.getCause) match { case Some(c) if level < 6 => s"\n${prefix}Caused by " + renderThrowable(c, level + 1) case _ => "" @@ -346,4 +347,26 @@ object StringUtils { output.toString() } + /** + * Escapes HTML tags and symbols from a string. + * Based on https://stackoverflow.com/a/25228492/1038282 + * + * @param s A string to escape HTML from. + * @return An escaped string. + */ + def escapeHTML(s: String): String = { + val out = new StringBuilder(Math.max(64, s.length)) + var i = 0 + while (i < s.length) { + val c = s.charAt(i) + if (c == '<' || c == '>' || c == '&') { + out.append("&#") + out.append(c.toInt) + out.append(';') + } + else out.append(c) + i += 1 + } + out.toString + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala index baf164588..1121c3ece 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala @@ -112,13 +112,27 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean, incorrect // WARNING. Do not forget that `null` is a valid value returned by RecordSet methods that return a reference type objects. dataType match { - case BIT | BOOLEAN => rs.getBoolean(columnIndex) - case TINYINT => rs.getByte(columnIndex) - case SMALLINT => rs.getShort(columnIndex) - case INTEGER => rs.getInt(columnIndex) - case BIGINT => rs.getLong(columnIndex) - case FLOAT => rs.getFloat(columnIndex) - case DOUBLE => rs.getDouble(columnIndex) + case BIT | BOOLEAN => + val v = rs.getBoolean(columnIndex) + if (rs.wasNull()) null else v + case TINYINT => + val v = rs.getByte(columnIndex) + if (rs.wasNull()) null else v + case SMALLINT => + val v = rs.getShort(columnIndex) + if (rs.wasNull()) null else v + case INTEGER => + val v = rs.getInt(columnIndex) + if (rs.wasNull()) null else v + case BIGINT => + val v = rs.getLong(columnIndex) + if (rs.wasNull()) null else v + case FLOAT => + val v = rs.getFloat(columnIndex) + if (rs.wasNull()) null else v + case DOUBLE => + val v = rs.getDouble(columnIndex) + if (rs.wasNull()) null else v case REAL => rs.getBigDecimal(columnIndex) case NUMERIC => rs.getBigDecimal(columnIndex) case DATE => sanitizeDate(rs.getDate(columnIndex)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index c07b6f540..542321914 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -37,7 +37,8 @@ object RuntimeConfigFactory { parallelTasks: Int = 1, stopSparkSession: Boolean = false, allowEmptyPipeline: Boolean = false, - historicalRunMode: RunMode = RunMode.CheckUpdates): RuntimeConfig = { + historicalRunMode: RunMode = RunMode.CheckUpdates, + sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -52,7 +53,8 @@ object RuntimeConfigFactory { parallelTasks, stopSparkSession, allowEmptyPipeline, - historicalRunMode) + historicalRunMode, + sparkAppDescriptionTemplate) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala index 49489a7f4..d590e8845 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala @@ -42,6 +42,7 @@ class RuntimeConfigSuite extends AnyWordSpec { | load.date.to = 2021-01-10 | parallel.tasks = 4 | stop.spark.session = true + | job.description.template = "Test template" |} |""".stripMargin @@ -63,6 +64,7 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.runDateTo.get.toString == "2021-01-10") assert(runtimeConfig.parallelTasks == 4) assert(runtimeConfig.stopSparkSession) + assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template")) } "have default values" in { @@ -83,6 +85,7 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.runDateTo.isEmpty) assert(runtimeConfig.parallelTasks == 1) assert(!runtimeConfig.stopSparkSession) + assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala index 026ef5a42..c9f1a556b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala @@ -278,7 +278,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis assert(df.schema.fields(2).name == "DESCRIPTION") assert(df.schema.fields(3).name == "EMAIL") assert(df.schema.fields(4).name == "FOUNDED") - assert(df.schema.fields(5).name == "LAST_UPDATED") + assert(df.schema.fields(5).name == "IS_TAX_FREE") } "get the source data frame for source with disabled count query" in { @@ -301,7 +301,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis assert(df.schema.fields(2).name == "DESCRIPTION") assert(df.schema.fields(3).name == "EMAIL") assert(df.schema.fields(4).name == "FOUNDED") - assert(df.schema.fields(5).name == "LAST_UPDATED") + assert(df.schema.fields(5).name == "IS_TAX_FREE") TransientTableManager.reset() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala index 73e94c0be..fd28e87f3 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala @@ -64,6 +64,8 @@ object RdbExampleTable { | description VARCHAR NOT NULL, | email VARCHAR(50) NOT NULL, | founded DATE NOT NULL, + | is_tax_free BOOLEAN, + | tax_id BIGINT, | last_updated TIMESTAMP NOT NULL, | info_date VARCHAR(10) NOT NULL, | PRIMARY KEY (id)) @@ -75,10 +77,10 @@ object RdbExampleTable { ) val inserts: Seq[String] = Seq( - s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')", - s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')", - s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')", - s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')" + s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', FALSE, 123, TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')", + s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TRUE, 456, TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')", + s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', FALSE, NULL, TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')", + s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', NULL, NULL, TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')" ) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala index 8cfed0f1b..fd5b0a282 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala @@ -395,7 +395,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark val df = reader.getData(Query.Table("company"), null, null, Seq.empty[String]) assert(df.count() == 4) - assert(df.schema.fields.length == 7) + assert(df.schema.fields.length == 9) } "return selected column for a table snapshot-like query" in { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala index 5325cea7e..e3ad99f03 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala @@ -72,7 +72,4 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture { } } - "Generic SQL generator" should { - } - } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala index 8a3a547a5..d8cca366d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala @@ -68,7 +68,7 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture { } "generate data queries without date ranges" in { - assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company") + assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company") } "generate data queries when list of columns is specified" in { @@ -76,7 +76,7 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture { } "generate data queries with limit clause date ranges" in { - assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100") + assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100") } "generate ranged count queries" when { @@ -118,30 +118,30 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture { "generate ranged data queries" when { "date is in DATE format" in { assert(gen.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'") assert(gen.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'") } "date is in STRING format" in { assert(genStr.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'") assert(genStr.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'") } "date is in NUMBER format" in { assert(genNum.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817") assert(genNum.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830") } "with limit records" in { assert(gen.getDataQuery("company", date1, date1, Nil, Some(100)) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100") assert(gen.getDataQuery("company", date1, date2, Nil, Some(100)) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100") } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala index e85f2890a..35ce42264 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala @@ -138,6 +138,16 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa | "nullable" : true, | "metadata" : { } | }, { + | "name" : "IS_TAX_FREE", + | "type" : "boolean", + | "nullable" : true, + | "metadata" : { } + | }, { + | "name" : "TAX_ID", + | "type" : "long", + | "nullable" : true, + | "metadata" : { } + | }, { | "name" : "LAST_UPDATED", | "type" : "timestamp", | "nullable" : true, @@ -163,17 +173,22 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa | "ID" : 1, | "NAME" : "Company1", | "EMAIL" : "company1@example.com", - | "FOUNDED" : "2000-10-11" + | "FOUNDED" : "2000-10-11", + | "IS_TAX_FREE" : false, + | "TAX_ID" : 123 |}, { | "ID" : 2, | "NAME" : "Company2", | "EMAIL" : "company2@example.com", - | "FOUNDED" : "2005-03-29" + | "FOUNDED" : "2005-03-29", + | "IS_TAX_FREE" : true, + | "TAX_ID" : 456 |}, { | "ID" : 3, | "NAME" : "Company3", | "EMAIL" : "company3@example.com", - | "FOUNDED" : "2016-12-30" + | "FOUNDED" : "2016-12-30", + | "IS_TAX_FREE" : false |}, { | "ID" : 4, | "NAME" : "Company4", @@ -181,7 +196,7 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa | "FOUNDED" : "2016-12-31" |} ]""".stripMargin - val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded FROM $tableName") + val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded, is_tax_free, tax_id FROM $tableName") val actual = SparkUtils.convertDataFrameToPrettyJSON(df) compareText(actual, expected) @@ -236,35 +251,35 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa } "getDecimalDataType" should { - val resultSet = mock(classOf[ResultSet]) - val resultSetMetaData = mock(classOf[ResultSetMetaData]) + val resultSet = mock(classOf[ResultSet]) + val resultSetMetaData = mock(classOf[ResultSetMetaData]) - when(resultSetMetaData.getColumnCount).thenReturn(1) - when(resultSet.getMetaData).thenReturn(resultSetMetaData) + when(resultSetMetaData.getColumnCount).thenReturn(1) + when(resultSet.getMetaData).thenReturn(resultSetMetaData) - "return normal decimal for correct precision and scale" in { - val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) - when(resultSetMetaData.getPrecision(0)).thenReturn(10) - when(resultSetMetaData.getScale(0)).thenReturn(2) + "return normal decimal for correct precision and scale" in { + val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) + when(resultSetMetaData.getPrecision(0)).thenReturn(10) + when(resultSetMetaData.getScale(0)).thenReturn(2) - assert(iterator.getDecimalDataType(0) == NUMERIC) - } + assert(iterator.getDecimalDataType(0) == NUMERIC) + } - "return fixed decimal for incorrect precision and scale" in { - val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) - when(resultSetMetaData.getPrecision(0)).thenReturn(0) - when(resultSetMetaData.getScale(0)).thenReturn(2) + "return fixed decimal for incorrect precision and scale" in { + val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) + when(resultSetMetaData.getPrecision(0)).thenReturn(0) + when(resultSetMetaData.getScale(0)).thenReturn(2) - assert(iterator.getDecimalDataType(0) == NUMERIC) - } + assert(iterator.getDecimalDataType(0) == NUMERIC) + } - "return string type for incorrect precision and scale" in { - val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true) - when(resultSetMetaData.getPrecision(0)).thenReturn(0) - when(resultSetMetaData.getScale(0)).thenReturn(2) + "return string type for incorrect precision and scale" in { + val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true) + when(resultSetMetaData.getPrecision(0)).thenReturn(0) + when(resultSetMetaData.getScale(0)).thenReturn(2) - assert(iterator.getDecimalDataType(0) == VARCHAR) - } + assert(iterator.getDecimalDataType(0) == VARCHAR) + } } "sanitizeDateTime" when { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala index 73cddc6b2..757763046 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala @@ -188,6 +188,12 @@ class StringUtilsSuite extends AnyWordSpec { assert(s.contains("java.lang.RuntimeException: test")) } + "render a throwable with an HTML message" in { + val ex = new RuntimeException("test exception") + val s = renderThrowable(ex, escapeHTML = true) + assert(s.contains("java.lang.RuntimeException: test <b>exception</b>")) + } + "render a throwable with a length limit" in { val ex = new RuntimeException("test") val s = renderThrowable(ex, maximumLength = Some(4)) @@ -384,4 +390,14 @@ class StringUtilsSuite extends AnyWordSpec { } } + "escapeHTML" should { + "escape HTML from a string" in { + val s = """This is an HTML &123""" + + val actual = StringUtils.escapeHTML(s) + + assert(actual == """This <b>is</b> an HTML &123""") + } + } + }