From 5a8cc55b87cade3954e4edf015c4599471e0c2c0 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 14 Oct 2024 09:47:48 +0200 Subject: [PATCH 01/14] #496 Fix handling of retrospectively updated jobs. --- .../absa/pramen/core/pipeline/JobBase.scala | 39 +++++++++++++++---- .../splitter/ScheduleStrategyUtils.scala | 2 +- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index ad14fa81f..14c547a2a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -25,7 +25,7 @@ import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.utils.Emoji._ -import za.co.absa.pramen.core.utils.TimeUtils +import za.co.absa.pramen.core.utils.{Emoji, TimeUtils} import java.time.{Instant, LocalDate} import scala.util.{Failure, Success, Try} @@ -114,15 +114,40 @@ abstract class JobBase(operationDef: OperationDef, } protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = { - if (bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) { - log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") - Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) - } else { - log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") - None + bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match { + case Some(chunk) => + val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished) + if (outOfDateTables.nonEmpty) { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}") + val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}" + Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning))) + } else { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") + Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) + } + case None => + log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") + None } } + private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = { + operationDef.dependencies + .filter(d => !d.isOptional && !d.isPassive) + .flatMap(_.tables) + .distinct + .filter { table => + bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match { + case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds => + log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " + + s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .") + true + case _ => + false + } + } + } + protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = { val evaluator = new DateExprEvaluator evaluator.setValue("infoDate", infoDate) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala index a62c300dd..070c7d7b5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala @@ -214,7 +214,7 @@ object ScheduleStrategyUtils { dependency.tables.foldLeft(false)((acc, table) => { bookkeeper.getLatestDataChunk(table, dateFrom, dateTo) match { case Some(dependencyUpdated) => - val isUpdatedRetrospectively = dependencyUpdated.jobFinished > lastUpdated.jobFinished + val isUpdatedRetrospectively = dependencyUpdated.jobFinished >= lastUpdated.jobFinished if (isUpdatedRetrospectively) { log.warn(s"Input table '$table' has updated retrospectively${renderPeriod(Option(dateFrom), Option(dateTo))}. " + s"Adding '$outputTable' to rerun for $infoDate.") From 8ea77b6108ed51bd9ebc6980ce618fad48327381 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 15 Oct 2024 09:15:02 +0200 Subject: [PATCH 02/14] Update CI for support branches. --- .github/workflows/scala.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 1cf8cd1c0..0261a4fda 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -2,12 +2,16 @@ name: ScalaCI on: push: - branches: [ main ] + branches: + - "main" + - "support/*" paths: - "pramen/**" - ".github/workflows/scala.yml" pull_request: - branches: [ main ] + branches: + - "main" + - "support/*" paths: - "pramen/**" - ".github/workflows/scala.yml" From 4a9abe0cafd779ea7b32082957b9c157f58925c4 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 15 Oct 2024 09:32:39 +0200 Subject: [PATCH 03/14] Fix sbt availability in CI. --- .github/workflows/scala.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 0261a4fda..c988005b1 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -46,6 +46,16 @@ jobs: distribution: temurin java-version: 8 cache: sbt + - name: Install sbt + run: | + sudo apt-get update + sudo apt-get install apt-transport-https curl gnupg -yqq + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list + echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list + curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import + sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg + sudo apt-get update + sudo apt-get install sbt - name: Build and run unit tests working-directory: ./pramen run: sbt ++${{matrix.scala}} test -DSPARK_VERSION=${{matrix.spark}} From 4834440bc1af4892455f3814df309e62c35ca537 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 16 Oct 2024 11:47:40 +0200 Subject: [PATCH 04/14] Remove Python support releases. --- .github/workflows/release.yml | 101 ++++------------------------------ 1 file changed, 11 insertions(+), 90 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9f6e0daab..05a563e4a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -24,96 +24,6 @@ defaults: shell: bash jobs: - test-python: - strategy: - matrix: - python-version: [ "3.10" ] - runs-on: ubuntu-latest - name: Test Pramen-Py - steps: - - uses: actions/checkout@v4 - with: - ref: ${{ github.event.inputs.branch }} - - - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - - name: setup poetry - uses: abatilo/actions-poetry@v2.1.6 - with: - poetry-version: 1.4.2 - - - name: install dependencies - working-directory: "./pramen-py" - run: make --silent install - - - name: test - working-directory: "./pramen-py" - env: - ENV: ci - run: make --silent test - - release-python: - needs: [ "test-python" ] - runs-on: ubuntu-latest - name: Release Python artifact - steps: - - name: Checkout code - uses: actions/checkout@v4 - with: - ref: ${{ github.event.inputs.branch }} - - - name: Prepare the release branch - id: release_branch1 - working-directory: "./pramen" - run: | - VERSION=$(grep "ThisBuild / version" version.sbt | cut -d\" -f2 | sed 's/-SNAPSHOT//') - echo "VERSION=$VERSION" >> $GITHUB_OUTPUT - git config --global user.email "absa_ci_cd_bot@absa.africa" - git config --global user.name "CI/CD bot" - git checkout -b release/$VERSION - git push --set-upstream origin release/$VERSION - - - name: Update version number - id: release_branch - working-directory: "./pramen-py" - env: - VERSION: ${{ steps.release_branch1.outputs.VERSION }} - run: | - PY_VERSION=$(grep -m 1 ^version pyproject.toml | tr -s ' ' | tr -d '"' | tr -d "'" | cut -d' ' -f3) - if [[ "$PY_VERSION" != "$VERSION" ]]; then - sed -i "s/version = \"$PY_VERSION\"/version = \"$VERSION\"/g" pyproject.toml - git add pyproject.toml - git commit -m "Update version number to $VERSION" - git push - fi - - - name: install project dependencies - run: | - sudo apt install -y --no-install-recommends \ - libssl-dev \ - make - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - - - uses: abatilo/actions-poetry@v2.1.6 - with: - poetry-version: 1.4.2 - - - name: Install dependencies - working-directory: "./pramen-py" - run: poetry install --no-interaction --no-root - - - name: build and publish the wheel to jfrog - working-directory: "./pramen-py" - env: - ENV: pypi - PRAMENPY_PYPI_TOKEN: ${{ secrets.PRAMENPY_PYPI_TOKEN }} - run: make --silent publish - release-sbt: needs: [ "release-python" ] runs-on: ubuntu-latest @@ -131,6 +41,17 @@ jobs: java-version: 8 cache: sbt + - name: Install sbt + run: | + sudo apt-get update + sudo apt-get install apt-transport-https curl gnupg -yqq + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list + echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list + curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import + sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg + sudo apt-get update + sudo apt-get install sbt + - name: Import GPG Key run: | echo "${{ secrets.ABSA_OSS_CI_CD_BOT_GPG_KEY }}" > gpg-secret-key.asc From 3e648cd573527489a0843d4f737d4fe7c7073635 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 18 Oct 2024 12:19:12 +0200 Subject: [PATCH 05/14] #499 Escape HTML in error messages. --- .../PipelineNotificationBuilderHtml.scala | 10 ++++---- .../absa/pramen/core/utils/StringUtils.scala | 25 +++++++++++++++++-- .../core/tests/utils/StringUtilsSuite.scala | 16 ++++++++++++ 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index d5bc35ab4..08fed6edc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -300,8 +300,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val errorMessage = ex.getMessage val errorMessageTruncated = maxReasonLength match { - case Some(maxLength) if errorMessage.length > maxLength => errorMessage.substring(0, maxLength) + "..." - case _ => errorMessage + case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..." + case _ => StringUtils.escapeHTML(errorMessage) } paragraphBuilder @@ -325,7 +325,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val stderrMsg = if (stderr.isEmpty) "" else s"""Last stderr lines:\n${stderr.mkString("", EOL, EOL)}""" s"$msg\n$stdoutMsg\n$stderrMsg" case ex: Throwable => - renderThrowable(ex, maximumLength = maxExceptionLength) + renderThrowable(ex, maximumLength = maxExceptionLength, escapeHTML = true) } builder.withUnformattedText(text) @@ -624,10 +624,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } } - maxReasonLength match { + StringUtils.escapeHTML(maxReasonLength match { case Some(maxLength) if reason.length > maxLength => reason.substring(0, maxLength) + "..." case _ => reason - } + }) } private[core] def getFinishTime(task: TaskResult): String = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala index 56f456f91..e4163d1c3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala @@ -170,9 +170,10 @@ object StringUtils { } /** Renders an exception as a string */ - def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None): String = { + def renderThrowable(ex: Throwable, level: Int = 1, maximumLength: Option[Int] = None, escapeHTML: Boolean = false): String = { val prefix = " " * (level * 2) - val base = s"""${ex.toString}\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}""" + val errMsg = if (escapeHTML) StringUtils.escapeHTML(ex.toString) else ex.toString + val base = s"""$errMsg\n${ex.getStackTrace.map(s => s"$prefix$s").mkString("", EOL, EOL)}""" val cause = Option(ex.getCause) match { case Some(c) if level < 6 => s"\n${prefix}Caused by " + renderThrowable(c, level + 1) case _ => "" @@ -346,4 +347,24 @@ object StringUtils { output.toString() } + /** + * Escapes HTML tags and symbols from a string. + * Based on https://stackoverflow.com/a/25228492/1038282 + * + * @param s A string to escape HTML from. + * @return An escaped string. + */ + def escapeHTML(s: String): String = { + val out = new StringBuilder(Math.max(16, s.length)) + for (i <- 0 until s.length) { + val c = s.charAt(i) + if (c > 127 || c == '"' || c == '\'' || c == '<' || c == '>' || c == '&') { + out.append("&#") + out.append(c.toInt) + out.append(';') + } + else out.append(c) + } + out.toString + } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala index 73cddc6b2..757763046 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/StringUtilsSuite.scala @@ -188,6 +188,12 @@ class StringUtilsSuite extends AnyWordSpec { assert(s.contains("java.lang.RuntimeException: test")) } + "render a throwable with an HTML message" in { + val ex = new RuntimeException("test exception") + val s = renderThrowable(ex, escapeHTML = true) + assert(s.contains("java.lang.RuntimeException: test <b>exception</b>")) + } + "render a throwable with a length limit" in { val ex = new RuntimeException("test") val s = renderThrowable(ex, maximumLength = Some(4)) @@ -384,4 +390,14 @@ class StringUtilsSuite extends AnyWordSpec { } } + "escapeHTML" should { + "escape HTML from a string" in { + val s = """This is an HTML &123""" + + val actual = StringUtils.escapeHTML(s) + + assert(actual == """This <b>is</b> an HTML &123""") + } + } + } From 0c2e6a5bcdf4712948afd8d00535feacea1705ce Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 18 Oct 2024 13:59:15 +0200 Subject: [PATCH 06/14] #499 Improve performance of HTML escaping by approx. 25%. Slow: Average execution time: 0.00126 ms Standard deviation: 0.047 ms Fast: Average execution time: 0.00096 ms Standard deviation: 0.0013 ms --- .../scala/za/co/absa/pramen/core/utils/StringUtils.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala index e4163d1c3..09d81f7e9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/StringUtils.scala @@ -355,15 +355,17 @@ object StringUtils { * @return An escaped string. */ def escapeHTML(s: String): String = { - val out = new StringBuilder(Math.max(16, s.length)) - for (i <- 0 until s.length) { + val out = new StringBuilder(Math.max(64, s.length)) + var i = 0 + while (i < s.length) { val c = s.charAt(i) - if (c > 127 || c == '"' || c == '\'' || c == '<' || c == '>' || c == '&') { + if (c == '<' || c == '>' || c == '&') { out.append("&#") out.append(c.toInt) out.append(';') } else out.append(c) + i += 1 } out.toString } From cb68ec67ed4124a044fffa64efe648151b4a9cae Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 18 Oct 2024 10:31:47 +0200 Subject: [PATCH 07/14] #501 Fix Jdbc Native treating of nullable fields. --- .../utils/impl/ResultSetToRowIterator.scala | 28 ++++++-- .../core/pipeline/IngestionJobSuite.scala | 4 +- .../pramen/core/samples/RdbExampleTable.scala | 10 +-- .../tests/reader/TableReaderJdbcSuite.scala | 2 +- .../tests/sql/SqlGeneratorLoaderSuite.scala | 20 +++--- .../tests/utils/JdbcNativeUtilsSuite.scala | 67 ++++++++++++------- .../core/tests/utils/SparkUtilsSuite.scala | 3 - 7 files changed, 81 insertions(+), 53 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala index baf164588..1121c3ece 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala @@ -112,13 +112,27 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean, incorrect // WARNING. Do not forget that `null` is a valid value returned by RecordSet methods that return a reference type objects. dataType match { - case BIT | BOOLEAN => rs.getBoolean(columnIndex) - case TINYINT => rs.getByte(columnIndex) - case SMALLINT => rs.getShort(columnIndex) - case INTEGER => rs.getInt(columnIndex) - case BIGINT => rs.getLong(columnIndex) - case FLOAT => rs.getFloat(columnIndex) - case DOUBLE => rs.getDouble(columnIndex) + case BIT | BOOLEAN => + val v = rs.getBoolean(columnIndex) + if (rs.wasNull()) null else v + case TINYINT => + val v = rs.getByte(columnIndex) + if (rs.wasNull()) null else v + case SMALLINT => + val v = rs.getShort(columnIndex) + if (rs.wasNull()) null else v + case INTEGER => + val v = rs.getInt(columnIndex) + if (rs.wasNull()) null else v + case BIGINT => + val v = rs.getLong(columnIndex) + if (rs.wasNull()) null else v + case FLOAT => + val v = rs.getFloat(columnIndex) + if (rs.wasNull()) null else v + case DOUBLE => + val v = rs.getDouble(columnIndex) + if (rs.wasNull()) null else v case REAL => rs.getBigDecimal(columnIndex) case NUMERIC => rs.getBigDecimal(columnIndex) case DATE => sanitizeDate(rs.getDate(columnIndex)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala index 080bd80b3..3bb0d1a84 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala @@ -278,7 +278,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis assert(df.schema.fields(2).name == "DESCRIPTION") assert(df.schema.fields(3).name == "EMAIL") assert(df.schema.fields(4).name == "FOUNDED") - assert(df.schema.fields(5).name == "LAST_UPDATED") + assert(df.schema.fields(5).name == "IS_TAX_FREE") } "get the source data frame for source with disabled count query" in { @@ -301,7 +301,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis assert(df.schema.fields(2).name == "DESCRIPTION") assert(df.schema.fields(3).name == "EMAIL") assert(df.schema.fields(4).name == "FOUNDED") - assert(df.schema.fields(5).name == "LAST_UPDATED") + assert(df.schema.fields(5).name == "IS_TAX_FREE") TransientTableManager.reset() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala index 9eebbdb9e..c3571d716 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala @@ -64,6 +64,8 @@ object RdbExampleTable { | description VARCHAR NOT NULL, | email VARCHAR(50) NOT NULL, | founded DATE NOT NULL, + | is_tax_free BOOLEAN, + | tax_id BIGINT, | last_updated TIMESTAMP NOT NULL, | info_date VARCHAR(10) NOT NULL, | PRIMARY KEY (id)) @@ -75,10 +77,10 @@ object RdbExampleTable { ) val inserts: Seq[String] = Seq( - s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')", - s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')", - s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')", - s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')" + s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', FALSE, 123, TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')", + s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TRUE, 456, TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')", + s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', FALSE, NULL, TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')", + s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', NULL, NULL, TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')" ) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala index 77aad6f17..86a308de0 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala @@ -386,7 +386,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark val df = reader.getData(Query.Table("company"), null, null, Seq.empty[String]) assert(df.count() == 4) - assert(df.schema.fields.length == 7) + assert(df.schema.fields.length == 9) } "return selected column for a table snapshot-like query" in { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala index c420a1423..6a19e8127 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorLoaderSuite.scala @@ -711,7 +711,7 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture { } "generate data queries without date ranges" in { - assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company") + assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company") } "generate data queries when list of columns is specified" in { @@ -719,7 +719,7 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture { } "generate data queries with limit clause date ranges" in { - assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100") + assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100") } "generate ranged count queries" when { @@ -761,30 +761,30 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture { "generate ranged data queries" when { "date is in DATE format" in { assert(gen.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'") assert(gen.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'") } "date is in STRING format" in { assert(genStr.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'") assert(genStr.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'") } "date is in NUMBER format" in { assert(genNum.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817") assert(genNum.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830") } "with limit records" in { assert(gen.getDataQuery("company", date1, date1, Nil, Some(100)) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100") assert(gen.getDataQuery("company", date1, date2, Nil, Some(100)) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100") } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala index e85f2890a..35ce42264 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala @@ -138,6 +138,16 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa | "nullable" : true, | "metadata" : { } | }, { + | "name" : "IS_TAX_FREE", + | "type" : "boolean", + | "nullable" : true, + | "metadata" : { } + | }, { + | "name" : "TAX_ID", + | "type" : "long", + | "nullable" : true, + | "metadata" : { } + | }, { | "name" : "LAST_UPDATED", | "type" : "timestamp", | "nullable" : true, @@ -163,17 +173,22 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa | "ID" : 1, | "NAME" : "Company1", | "EMAIL" : "company1@example.com", - | "FOUNDED" : "2000-10-11" + | "FOUNDED" : "2000-10-11", + | "IS_TAX_FREE" : false, + | "TAX_ID" : 123 |}, { | "ID" : 2, | "NAME" : "Company2", | "EMAIL" : "company2@example.com", - | "FOUNDED" : "2005-03-29" + | "FOUNDED" : "2005-03-29", + | "IS_TAX_FREE" : true, + | "TAX_ID" : 456 |}, { | "ID" : 3, | "NAME" : "Company3", | "EMAIL" : "company3@example.com", - | "FOUNDED" : "2016-12-30" + | "FOUNDED" : "2016-12-30", + | "IS_TAX_FREE" : false |}, { | "ID" : 4, | "NAME" : "Company4", @@ -181,7 +196,7 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa | "FOUNDED" : "2016-12-31" |} ]""".stripMargin - val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded FROM $tableName") + val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded, is_tax_free, tax_id FROM $tableName") val actual = SparkUtils.convertDataFrameToPrettyJSON(df) compareText(actual, expected) @@ -236,35 +251,35 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa } "getDecimalDataType" should { - val resultSet = mock(classOf[ResultSet]) - val resultSetMetaData = mock(classOf[ResultSetMetaData]) + val resultSet = mock(classOf[ResultSet]) + val resultSetMetaData = mock(classOf[ResultSetMetaData]) - when(resultSetMetaData.getColumnCount).thenReturn(1) - when(resultSet.getMetaData).thenReturn(resultSetMetaData) + when(resultSetMetaData.getColumnCount).thenReturn(1) + when(resultSet.getMetaData).thenReturn(resultSetMetaData) - "return normal decimal for correct precision and scale" in { - val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) - when(resultSetMetaData.getPrecision(0)).thenReturn(10) - when(resultSetMetaData.getScale(0)).thenReturn(2) + "return normal decimal for correct precision and scale" in { + val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) + when(resultSetMetaData.getPrecision(0)).thenReturn(10) + when(resultSetMetaData.getScale(0)).thenReturn(2) - assert(iterator.getDecimalDataType(0) == NUMERIC) - } + assert(iterator.getDecimalDataType(0) == NUMERIC) + } - "return fixed decimal for incorrect precision and scale" in { - val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) - when(resultSetMetaData.getPrecision(0)).thenReturn(0) - when(resultSetMetaData.getScale(0)).thenReturn(2) + "return fixed decimal for incorrect precision and scale" in { + val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false) + when(resultSetMetaData.getPrecision(0)).thenReturn(0) + when(resultSetMetaData.getScale(0)).thenReturn(2) - assert(iterator.getDecimalDataType(0) == NUMERIC) - } + assert(iterator.getDecimalDataType(0) == NUMERIC) + } - "return string type for incorrect precision and scale" in { - val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true) - when(resultSetMetaData.getPrecision(0)).thenReturn(0) - when(resultSetMetaData.getScale(0)).thenReturn(2) + "return string type for incorrect precision and scale" in { + val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true) + when(resultSetMetaData.getPrecision(0)).thenReturn(0) + when(resultSetMetaData.getScale(0)).thenReturn(2) - assert(iterator.getDecimalDataType(0) == VARCHAR) - } + assert(iterator.getDecimalDataType(0) == VARCHAR) + } } "sanitizeDateTime" when { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index f572dce6d..608c1a941 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -263,9 +263,6 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture val newField2 = schema1Orig.fields.head.copy(metadata = metadata2) val schema2 = schema1Orig.copy(fields = newField2 +: schema1Orig.fields.tail) - println(schema1.prettyJson) - println(schema2.prettyJson) - val diff = compareSchemas(schema1, schema2) assert(diff.length == 1) From d2fd7e0f4982adc904830937183c3e285691a8d4 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 17 Oct 2024 10:33:08 +0200 Subject: [PATCH 08/14] #498 Implement the setting of the Spark Application description. --- README.md | 21 ++++++++++++ .../core/app/config/RuntimeConfig.scala | 8 +++-- .../jobrunner/ConcurrentJobRunnerImpl.scala | 4 +++ .../core/runner/task/TaskRunnerBase.scala | 33 +++++++++++++++++-- .../runner/task/TaskRunnerMultithreaded.scala | 5 +-- .../pramen/core/RuntimeConfigFactory.scala | 6 ++-- .../core/app/config/RuntimeConfigSuite.scala | 3 ++ 7 files changed, 72 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index e4bf267ba..4f5889e83 100644 --- a/README.md +++ b/README.md @@ -2454,6 +2454,27 @@ You can use any source/sink combination in transfer jobs. We describe here a more complicated use cases. +### Dynamically changing Spark Application description +You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing. + +Example configuration: +```hocon +pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate" +``` + +These variables are available: + +| Variable | Description | +|--------------|-------------------------------------------------------------------------------| +| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). | +| @tenant | The name of the tenant (if defined at `pramen.environment.name`). | +| @environment | The environment (if defined at `pramen.tenant`). | +| @jobName | The name of the job as defined in the operation definition. | +| @infoDate | The information date the job is running for. | +| @outputTable | The output metastore table of the job. | +| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. | + + ### Startup and shutdown hooks Startup and shutdown hooks allow running custom code before and after the pipeline runs. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index edb4d9ce3..655b8edf1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -42,7 +42,8 @@ case class RuntimeConfig( parallelTasks: Int, stopSparkSession: Boolean, allowEmptyPipeline: Boolean, - historicalRunMode: RunMode + historicalRunMode: RunMode, + sparkAppDescriptionTemplate: Option[String] ) object RuntimeConfig { @@ -66,6 +67,7 @@ object RuntimeConfig { val STOP_SPARK_SESSION = "pramen.stop.spark.session" val VERBOSE = "pramen.verbose" val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline" + val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -128,6 +130,7 @@ object RuntimeConfig { } val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false) + val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) RuntimeConfig( isDryRun = isDryRun, @@ -144,7 +147,8 @@ object RuntimeConfig { parallelTasks = parallelTasks, stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION), allowEmptyPipeline, - runMode + runMode, + sparkAppDescriptionTemplate ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index 4122bea5c..b29ffc7a3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -99,6 +99,10 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, completedJobsChannel.close() } + private[core] def setSparkAppDescription(): Unit = synchronized { + ??? + } + private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = { log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex) val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index de2683a82..07ce3e557 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.runner.task import com.typesafe.config.Config -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.lit import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ @@ -31,11 +31,12 @@ import za.co.absa.pramen.core.lock.TokenLockFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.pipeline.JobPreRunStatus._ +import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY} import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.state.PipelineState import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.SparkUtils._ -import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils} +import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils} import za.co.absa.pramen.core.utils.hive.HiveHelper import java.sql.Date @@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config, runtimeConfig: RuntimeConfig, pipelineState: PipelineState, applicationId: String) extends TaskRunner { + import TaskRunnerBase._ + implicit private val ecDefault: ExecutionContext = ExecutionContext.global implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay) @@ -123,6 +126,12 @@ abstract class TaskRunnerBase(conf: Config, @volatile var runStatus: RunStatus = null try { + runtimeConfig.sparkAppDescriptionTemplate.foreach { template => + val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf) + val spark = SparkSession.builder().getOrCreate() + spark.sparkContext.setJobDescription(description) + } + ThreadUtils.runWithTimeout(Duration(timeout, TimeUnit.SECONDS)) { log.info(s"Running ${task.job.name} with the hard timeout = $timeout seconds.") runStatus = doValidateAndRunTask(task) @@ -604,3 +613,23 @@ abstract class TaskRunnerBase(conf: Config, } } } + +object TaskRunnerBase { + def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = { + val job = task.job + val pipelineName = conf.getString(PIPELINE_NAME_KEY) + val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN") + val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN") + val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else "" + + template.replaceAll("@jobName", job.name) + .replaceAll("@infoDate", task.infoDate.toString) + .replaceAll("@metastoreTable", job.outputTable.name) + .replaceAll("@outputTable", job.outputTable.name) + .replaceAll("@table", job.outputTable.name) + .replaceAll("@pipeline", pipelineName) + .replaceAll("@tenant", tenant) + .replaceAll("@environment", environmentName) + .replaceAll("@dryRun", dryRun) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala index 279abf770..8f378f105 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala @@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.FatalErrorWrapper import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.lock.TokenLockFactory -import za.co.absa.pramen.core.pipeline.Task +import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY} +import za.co.absa.pramen.core.pipeline.{Job, Task} import za.co.absa.pramen.core.state.PipelineState -import za.co.absa.pramen.core.utils.Emoji +import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji} import java.util.concurrent.Executors.newFixedThreadPool import java.util.concurrent.{ExecutorService, Semaphore} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index c07b6f540..542321914 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -37,7 +37,8 @@ object RuntimeConfigFactory { parallelTasks: Int = 1, stopSparkSession: Boolean = false, allowEmptyPipeline: Boolean = false, - historicalRunMode: RunMode = RunMode.CheckUpdates): RuntimeConfig = { + historicalRunMode: RunMode = RunMode.CheckUpdates, + sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -52,7 +53,8 @@ object RuntimeConfigFactory { parallelTasks, stopSparkSession, allowEmptyPipeline, - historicalRunMode) + historicalRunMode, + sparkAppDescriptionTemplate) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala index 7e7cf21f6..22c1535d1 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala @@ -43,6 +43,7 @@ class RuntimeConfigSuite extends AnyWordSpec { | load.date.to = 2021-01-10 | parallel.tasks = 4 | stop.spark.session = true + | job.description.template = "Test template" |} |""".stripMargin @@ -64,6 +65,7 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.runDateTo.get.toString == "2021-01-10") assert(runtimeConfig.parallelTasks == 4) assert(runtimeConfig.stopSparkSession) + assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template")) } "have default values" in { @@ -84,6 +86,7 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.runDateTo.isEmpty) assert(runtimeConfig.parallelTasks == 1) assert(!runtimeConfig.stopSparkSession) + assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty) } } From d287619606c97eec9e604c9fbd0897fdb048bd78 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 21 Oct 2024 10:47:21 +0200 Subject: [PATCH 09/14] Fix Release CI. --- .github/workflows/release.yml | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 05a563e4a..771c2f2ec 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,8 +25,7 @@ defaults: jobs: release-sbt: - needs: [ "release-python" ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 name: Release Scala artifacts steps: - name: Checkout code @@ -34,6 +33,17 @@ jobs: with: ref: ${{ github.event.inputs.branch }} + - name: Prepare the release branch + id: release_branch1 + working-directory: "./pramen" + run: | + VERSION=$(grep "ThisBuild / version" version.sbt | cut -d\" -f2 | sed 's/-SNAPSHOT//') + echo "VERSION=$VERSION" >> $GITHUB_OUTPUT + git config --global user.email "absa_ci_cd_bot@absa.africa" + git config --global user.name "CI/CD bot" + git checkout -b release/$VERSION + git push --set-upstream origin release/$VERSION + - name: Setup JDK and sbt uses: actions/setup-java@v4.2.1 with: @@ -41,17 +51,6 @@ jobs: java-version: 8 cache: sbt - - name: Install sbt - run: | - sudo apt-get update - sudo apt-get install apt-transport-https curl gnupg -yqq - echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list - echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list - curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import - sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg - sudo apt-get update - sudo apt-get install sbt - - name: Import GPG Key run: | echo "${{ secrets.ABSA_OSS_CI_CD_BOT_GPG_KEY }}" > gpg-secret-key.asc @@ -79,7 +78,7 @@ jobs: create-pr: needs: [ "release-sbt" ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 name: Create Pull Request steps: From d3612c9b3b74fc305d6a61c749971252422cb5ed Mon Sep 17 00:00:00 2001 From: CI/CD bot Date: Mon, 21 Oct 2024 09:04:37 +0000 Subject: [PATCH 10/14] Setting version to 1.9.10 --- pramen/version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pramen/version.sbt b/pramen/version.sbt index b5d8d385c..a9a8dbe14 100644 --- a/pramen/version.sbt +++ b/pramen/version.sbt @@ -1 +1 @@ -ThisBuild / version := "1.9.10-SNAPSHOT" +ThisBuild / version := "1.9.10" From aab8fde04e8296a3cb9cb9c6c806fb3bd605adef Mon Sep 17 00:00:00 2001 From: CI/CD bot Date: Mon, 21 Oct 2024 09:10:07 +0000 Subject: [PATCH 11/14] Setting version to 1.9.11-SNAPSHOT --- pramen/version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pramen/version.sbt b/pramen/version.sbt index a9a8dbe14..ace69169f 100644 --- a/pramen/version.sbt +++ b/pramen/version.sbt @@ -1 +1 @@ -ThisBuild / version := "1.9.10" +ThisBuild / version := "1.9.11-SNAPSHOT" From a8c3d75b0d3c1b88f9f2ed45b5ada24ed3b4199f Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 22 Oct 2024 17:35:20 +0200 Subject: [PATCH 12/14] Fix the bug in setting custom Spark Application descriptions dynamically. --- .../pramen/core/runner/task/TaskRunnerBase.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 07ce3e557..831d205a0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -121,17 +121,18 @@ abstract class TaskRunnerBase(conf: Config, /** Runs a task in the single thread. Performs all task logging and notification sending activities. */ protected def runTask(task: Task): RunStatus = { val started = Instant.now() + + runtimeConfig.sparkAppDescriptionTemplate.foreach { template => + val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf) + val spark = SparkSession.builder().getOrCreate() + spark.sparkContext.setJobDescription(description) + } + task.job.operation.killMaxExecutionTimeSeconds match { case Some(timeout) if timeout > 0 => @volatile var runStatus: RunStatus = null try { - runtimeConfig.sparkAppDescriptionTemplate.foreach { template => - val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf) - val spark = SparkSession.builder().getOrCreate() - spark.sparkContext.setJobDescription(description) - } - ThreadUtils.runWithTimeout(Duration(timeout, TimeUnit.SECONDS)) { log.info(s"Running ${task.job.name} with the hard timeout = $timeout seconds.") runStatus = doValidateAndRunTask(task) From feb975a5b54ddf933950ed97824424680771899e Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 22 Oct 2024 17:35:38 +0200 Subject: [PATCH 13/14] Fix CI - the way the base branch is determined. --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 771c2f2ec..963cfdbe2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -101,6 +101,6 @@ jobs: - name: Create Pull Request run: gh pr create -B "$BASE" -H "release/$VERSION" --title "Release Pramen v$VERSION" --body 'Created by Github action' env: - BASE: ${{ github.event.inputs.branch }} + BASE: ${{ github.head_ref || github.ref_name }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} VERSION: ${{ steps.release_branch3.outputs.VERSION }} From e0da69fb061c3c43a8d956b82e28a703f6f96082 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 29 Oct 2024 13:12:29 +0100 Subject: [PATCH 14/14] Fix the build after merging the support branch for 1.9 into the main branch. --- .github/workflows/release.yml | 84 ++++++++++++++++++- .github/workflows/scala.yml | 12 +-- .../core/tests/sql/SqlGeneratorSasSuite.scala | 20 ++--- 3 files changed, 93 insertions(+), 23 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 963cfdbe2..faba9d115 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -24,9 +24,40 @@ defaults: shell: bash jobs: - release-sbt: + test-python: + strategy: + matrix: + python-version: [ "3.10" ] runs-on: ubuntu-22.04 - name: Release Scala artifacts + name: Test Pramen-Py + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.branch }} + + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: setup poetry + uses: abatilo/actions-poetry@v2.1.6 + with: + poetry-version: 1.4.2 + + - name: install dependencies + working-directory: "./pramen-py" + run: make --silent install + + - name: test + working-directory: "./pramen-py" + env: + ENV: ci + run: make --silent test + + release-python: + needs: [ "test-python" ] + runs-on: ubuntu-22.04 + name: Release Python artifact steps: - name: Checkout code uses: actions/checkout@v4 @@ -44,6 +75,55 @@ jobs: git checkout -b release/$VERSION git push --set-upstream origin release/$VERSION + - name: Update version number + id: release_branch + working-directory: "./pramen-py" + env: + VERSION: ${{ steps.release_branch1.outputs.VERSION }} + run: | + PY_VERSION=$(grep -m 1 ^version pyproject.toml | tr -s ' ' | tr -d '"' | tr -d "'" | cut -d' ' -f3) + if [[ "$PY_VERSION" != "$VERSION" ]]; then + sed -i "s/version = \"$PY_VERSION\"/version = \"$VERSION\"/g" pyproject.toml + git add pyproject.toml + git commit -m "Update version number to $VERSION" + git push + fi + + - name: install project dependencies + run: | + sudo apt install -y --no-install-recommends \ + libssl-dev \ + make + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + + - uses: abatilo/actions-poetry@v2.1.6 + with: + poetry-version: 1.4.2 + + - name: Install dependencies + working-directory: "./pramen-py" + run: poetry install --no-interaction --no-root + + - name: build and publish the wheel to jfrog + working-directory: "./pramen-py" + env: + ENV: pypi + PRAMENPY_PYPI_TOKEN: ${{ secrets.PRAMENPY_PYPI_TOKEN }} + run: make --silent publish + + release-sbt: + needs: [ "release-python" ] + runs-on: ubuntu-22.04 + name: Release Scala artifacts + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.branch }} + - name: Setup JDK and sbt uses: actions/setup-java@v4.2.1 with: diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 6e50ea496..ff6b1e1b8 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -18,7 +18,7 @@ on: jobs: build-sbt: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: @@ -46,16 +46,6 @@ jobs: distribution: temurin java-version: 8 cache: sbt - - name: Install sbt - run: | - sudo apt-get update - sudo apt-get install apt-transport-https curl gnupg -yqq - echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list - echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list - curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import - sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg - sudo apt-get update - sudo apt-get install sbt - name: Build and run unit tests working-directory: ./pramen run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala index 8a3a547a5..d8cca366d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlGeneratorSasSuite.scala @@ -68,7 +68,7 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture { } "generate data queries without date ranges" in { - assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company") + assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company") } "generate data queries when list of columns is specified" in { @@ -76,7 +76,7 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture { } "generate data queries with limit clause date ranges" in { - assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100") + assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100") } "generate ranged count queries" when { @@ -118,30 +118,30 @@ class SqlGeneratorSasSuite extends AnyWordSpec with RelationalDbFixture { "generate ranged data queries" when { "date is in DATE format" in { assert(gen.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'") assert(gen.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'") } "date is in STRING format" in { assert(genStr.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'") assert(genStr.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'") } "date is in NUMBER format" in { assert(genNum.getDataQuery("company", date1, date1, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817") assert(genNum.getDataQuery("company", date1, date2, Nil, None) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830") } "with limit records" in { assert(gen.getDataQuery("company", date1, date1, Nil, Some(100)) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100") assert(gen.getDataQuery("company", date1, date2, Nil, Some(100)) == - "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100") + "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100") } }