Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#374 Incremental Ingestion #487

Merged
merged 55 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f5211b4
#374 Add a table in bookeeping for storing offsets.
yruslan Sep 6, 2024
d6d1a13
#374 Add bookeeping interfaces for offset management.
yruslan Sep 6, 2024
9b09f15
#374 Implement offset management DB operations.
yruslan Sep 9, 2024
5bda4c5
#374 Improve offset management DB operations and add test suites.
yruslan Sep 10, 2024
45fec57
#374 Bump up the minor version number since breaking changes are to b…
yruslan Sep 10, 2024
9d79f40
#374 Add the notion of 'batchId' and 'getCurrentBatch' for the metast…
yruslan Sep 10, 2024
86df4b5
#374 Remove parenthesis of several get methods.
yruslan Sep 11, 2024
71201ac
#374 Add interfaces for sources to fetch data based on offsets.
yruslan Sep 11, 2024
533bd93
#374 Add table reader interfaces for incremental processing.
yruslan Sep 11, 2024
27ab3cd
#374 Add an end to end test for incremental processing.
yruslan Sep 12, 2024
7329801
Fixup
yruslan Sep 12, 2024
ba63e16
#374 Add initial support for incremental transformers.
yruslan Sep 12, 2024
0232e9c
#374 Make normal transformers compatible with incremental ingestion, …
yruslan Sep 12, 2024
239e1f1
#374 Add support for reruns for incremental ingestion with offsets an…
yruslan Sep 13, 2024
307c0f1
#374 Add support for historical runs, and for re-committing uncommitt…
yruslan Sep 16, 2024
a8a77dc
#374 Implement the offset type: 'datetime' for incremental ingestion.
yruslan Sep 17, 2024
d6fa73c
Update Jacoco.
yruslan Sep 18, 2024
fe246e5
#374 Add integrations tests missing from the last fixup.
yruslan Sep 18, 2024
91fb48a
#374 Another fixup from compile warnings.
yruslan Sep 18, 2024
1f1d7a9
#374 Fixed Spark 2.4.8 support in integration tests.
yruslan Sep 18, 2024
edd90b3
Update Jacoco report version
yruslan Sep 19, 2024
78a0675
Make Jacoco take into account integration tests by including them in …
yruslan Sep 19, 2024
567973a
#374 Add offset configuration for JDBC sources.
yruslan Sep 19, 2024
3f59329
Update Scala, Spark versions for sbt builds.
yruslan Sep 19, 2024
537b85d
#374 Fix a corner case of running incremental ingestion out of order …
yruslan Sep 20, 2024
83b3c7a
#374 Simplify the incremental ingestion interface for data sources.
yruslan Sep 20, 2024
8fe53f1
#374 Implement offset queries generation in SQL generators.
yruslan Sep 20, 2024
5b8983c
#374 Add implementation for incremental ingestion from JDBC, and adde…
yruslan Sep 20, 2024
8b4bfb1
#374 Add server timezone to JDBC reader configuration to accommodate …
yruslan Sep 23, 2024
8b3ed71
#374 Fix the logic of incremental ingestion when information date is …
yruslan Sep 23, 2024
858138f
#374 Improve email notifications for incremental operations.
yruslan Sep 27, 2024
7da2627
#374 Calculate throughput based on appended records for incremental j…
yruslan Sep 27, 2024
6f13121
#374 Fix the way retrospective updates are determined.
yruslan Sep 27, 2024
451f556
#374 Add number of appended records to journal.
yruslan Sep 27, 2024
1ec4946
Improve performance of querying timestamp fields for PostgreSQL and M…
yruslan Sep 27, 2024
aeac279
Fix the check for retrospective updates.
yruslan Sep 27, 2024
64e47b1
#421 Allow transient non-cached jobs not return record count.
yruslan Sep 30, 2024
f861bf9
#374 Improve the description of the getCurrentBatch() metastore inter…
yruslan Sep 30, 2024
f3704b8
#374 Update README with the new feature.
yruslan Oct 1, 2024
f8d9d6e
#374 Add more tests for SQL generation related to offsets.
yruslan Oct 1, 2024
7bce016
#374 Implement offset management based on inclusive intervals.
yruslan Oct 4, 2024
99b3689
#374 Fix new incremental ingestion and integration tests to match inc…
yruslan Oct 4, 2024
fdea270
#374 Fix new incremental ingestion and integration tests to match inc…
yruslan Oct 7, 2024
745315b
Fix a timing dependency of a unit test.
yruslan Oct 7, 2024
30ee28b
#374 Remove minimum values for offset types.
yruslan Oct 8, 2024
d44a8db
#374 Fix a scenario when uncommitted offsets are not properly handled.
yruslan Oct 9, 2024
b3f91b1
#374 Refactor validation for the incremental ingestion job.
yruslan Oct 9, 2024
7df79cd
Add unit tests for the incremental scheduling strategy
yruslan Oct 24, 2024
9e86134
Remove the nasty 'var' and possible error related to it.
yruslan Oct 24, 2024
f6676f2
Update pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
yruslan Oct 24, 2024
d5f3267
Update pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/…
yruslan Oct 24, 2024
f3a8b35
Apply suggestions from code review
yruslan Oct 24, 2024
df88ce5
Fix PR suggestions.
yruslan Oct 25, 2024
aecd5e5
Fix imports removed by IDE.
yruslan Oct 25, 2024
ba5e5de
Fix more PR suggestions regarding splitting complex identifiers.
yruslan Oct 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/jacoco.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
run: sbt -DSPARK_VERSION=${{matrix.spark}} ++${{matrix.scala}} jacoco
- name: Add coverage to PR
id: jacoco
uses: madrapps/jacoco-report@v1.3
uses: madrapps/jacoco-report@v1.7.1
with:
paths: >
${{ github.workspace }}/pramen/core/target/scala-${{ matrix.scala_short }}/jacoco/report/jacoco.xml,
Expand All @@ -58,6 +58,7 @@ jobs:
min-coverage-changed-files: ${{ matrix.changed }}
title: Unit Test Coverage
update-comment: true
#debug-mode: true
- name: Get the Coverage info
run: |
echo "Total coverage ${{ steps.jacoco.outputs.coverage-overall }}"
Expand Down
26 changes: 20 additions & 6 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ name: ScalaCI

on:
push:
branches: [ main ]
branches:
- "main"
- "support/*"
paths:
- "pramen/**"
- ".github/workflows/scala.yml"
pull_request:
branches: [ main ]
branches:
- "main"
- "support/*"
paths:
- "pramen/**"
- ".github/workflows/scala.yml"
Expand All @@ -18,7 +22,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scala: [2.11.12, 2.12.19, 2.13.13]
scala: [2.11.12, 2.12.20, 2.13.14]
spark: [2.4.8, 3.3.4, 3.4.2, 3.5.1]
exclude:
- scala: 2.11.12
Expand All @@ -27,9 +31,9 @@ jobs:
spark: 3.4.2
- scala: 2.11.12
spark: 3.5.1
- scala: 2.12.19
- scala: 2.12.20
spark: 2.4.8
- scala: 2.13.13
- scala: 2.13.14
spark: 2.4.8
name: Test Spark ${{matrix.spark}} on Scala ${{matrix.scala}}
steps:
Expand All @@ -42,9 +46,19 @@ jobs:
distribution: temurin
java-version: 8
cache: sbt
- name: Install sbt
run: |
sudo apt-get update
sudo apt-get install apt-transport-https curl gnupg -yqq
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update
sudo apt-get install sbt
- name: Build and run unit tests
working-directory: ./pramen
run: sbt ++${{matrix.scala}} test -DSPARK_VERSION=${{matrix.spark}}
run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}}
- name: Run integration tests
working-directory: ./pramen
run: sbt ++${{matrix.scala}} integration:test -DSPARK_VERSION=${{matrix.spark}}
58 changes: 54 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ In addition to basic error notification, typical operational warnings are genera

Pramen is built using SBT.

**Note** By default `sbt test` runs unit tests and integration tests. In order to run just unit tests, please use
`sbt t` alias.

- `sbt +t` - runs unit tests only, for all Scala versions
- `sbt test` - runs all tests (unit and integration)
- `sbt unit:test` - runs unit tests only
- `sbt integration:test` - runs integration tests only

Install locally for `sbt` projects:
```
sbt +publishLocal
```

Install locally for `Maven` projects:
```
sbt +publishM2
```

## Project structure
Pramen consists of a few components:
- `pramen-api` - contains traits (interfaces) for defining custom transformations, sources and sinks.
Expand Down Expand Up @@ -188,8 +206,8 @@ dependencies in an uber jar that you can build for your Scala version. You can d
Creating an uber jar for Pramen is very easy. Just clone the repository and run one of the following commands:
```sh
sbt ++2.11.12 assembly
sbt ++2.12.18 assembly
sbt ++2.13.12 assembly
sbt ++2.12.20 assembly
sbt ++2.13.14 assembly
```

You can collect the uber jar of Pramen either at
Expand All @@ -201,8 +219,8 @@ Spark distributions. This makes the runner independent of Spark version. But if
in your bundle, use one of example commands specifying your Spark version:
```sh
sbt -DSPARK_VERSION="2.4.8" -Dassembly.features="includeDelta" ++2.11.12 assembly
sbt -DSPARK_VERSION="3.3.3" -Dassembly.features="includeDelta" ++2.12.18 assembly
sbt -DSPARK_VERSION="3.4.1" -Dassembly.features="includeDelta" ++2.13.12 assembly
sbt -DSPARK_VERSION="3.3.4" -Dassembly.features="includeDelta" ++2.12.20 assembly
sbt -DSPARK_VERSION="3.5.2" -Dassembly.features="includeDelta" ++2.13.14 assembly
```

Then, run `spark-shell` or `spark-submit` adding the fat jar as the option.
Expand Down Expand Up @@ -602,6 +620,10 @@ is determined by the pipeline configuration.
# Specifies the maximum number of records to fetch. Good for testing purposes.
#limit.records = 100

# Specify the timezone of the database server, if it is different from the default timezone.
# It is needed for incremental ingestion based on offset field that has a timestamp or datetime data type.
#server.timezone = "Africa/Johannesburg"

# Optionally, you can specify a class for a custom SQL generator for your RDMS engine.
# The class whould extend 'za.co.absa.pramen.api.sql.SqlGenerator'
#sql.generator.class = "com.example.MySqlGenerator"
Expand Down Expand Up @@ -786,6 +808,34 @@ pramen.operations = [
]
```

### Incremental Ingestion (experimental)
Pramen `version 1.10` introduces the concept of incremental ingestion. It allows running a pipeline multiple times a day
without reprocessing data that was already processed. In order to enable it, use `incremental` schedule when defining your
ingestion operation:
```hocon
schedule = "incremental"
```

In order for the incremental ingestion to work you need to define a monotonically increasing field, called an offset.
Usually, this incremental field can be a counter, or a record creation timestamp. You need to define the offset field in
your source. The source should support incremental ingestion in order to use this mode.
```hocon
offset.column {
name = "created_at"
type = "datetime"
}
```

Offset types available at the moment:

| Type | Description |
|----------|--------------------------------------------|
| integral | Any integral type (`short`, `int`, `long`) |
| datetime | A `datetime `or `timestamp` fields |
| string | Only `string` / `varchar(n)` types. |

Only ingestion jobs support incremental schedule at the moment. Incremental transformations and sinks are planned to be
available soon.

### Sinks
Sinks define a way data needs to be sent to a target system. Built-in sinks include:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.time.LocalDate
* @param format The format of the table.
* @param infoDateColumn The name of the column that contains the information date (partitioned by).
* @param infoDateFormat The format of the information date.
* @param batchIdColumn The name of the column that contains the batch id.
* @param hiveTable The name of the Hive table.
* @param hivePath The path of the Hive table (if it differs from the path in the underlying format).
* @param infoDateStart The start date of the information date.
Expand All @@ -38,6 +39,7 @@ case class MetaTableDef(
format: DataFormat,
infoDateColumn: String,
infoDateFormat: String,
batchIdColumn: String,
hiveTable: Option[String],
hivePath: Option[String],
infoDateStart: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package za.co.absa.pramen.api

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.api.offset.DataOffset
import za.co.absa.pramen.api.status.TaskRunReason

import java.time.LocalDate

Expand All @@ -27,7 +29,7 @@ import java.time.LocalDate
trait MetastoreReader {

/**
* Reads a table given th range of information dates, and returns back the dataframe.
* Reads a table given the range of information dates, and returns back the dataframe.
*
* In order to read a table it is not sufficient the table to be registered in the metastore. It also
* should be defined as input tables of the job. Otherwise, a runtime exception will be thrown.
Expand All @@ -41,6 +43,29 @@ trait MetastoreReader {
infoDateFrom: Option[LocalDate] = None,
infoDateTo: Option[LocalDate] = None): DataFrame

/**
* Reads the 'current batch' of the table to be processed incrementally.
*
* For incremental processing this method returns the current chunk being processed.
* It may include multiple chunks from non-processed data if transformer has failed previously.
*
* For non-incremental processing the call to this method is equivalent to:
* {{{
* val df = getTable(tableName)
* }}}
*
* which returns all data for the current information date being processed.
*
* This method is the method to use for transformers that would use 'incremental' schedule.
*
* In order to read a table it is not sufficient the table to be registered in the metastore. It also
* should be defined as input tables of the job. Otherwise, a runtime exception will be thrown.
*
* @param tableName The name of the table to read.
* @return The dataframe containing data from the table.
*/
def getCurrentBatch(tableName: String): DataFrame

/**
* Reads the latest partition of a given table.
*
Expand All @@ -66,7 +91,6 @@ trait MetastoreReader {
*/
def getLatestAvailableDate(tableName: String, until: Option[LocalDate] = None): Option[LocalDate]


/**
* Returns true if data for the specified table is available for the specified range.
*
Expand All @@ -79,6 +103,15 @@ trait MetastoreReader {
*/
def isDataAvailable(tableName: String, from: Option[LocalDate], until: Option[LocalDate]): Boolean

/**
* Returns offsets for an information date (both committed and uncommitted).
*
* This info can be used by transformers and sinks to decide if actions need to be taken depending on the
* current micro batch. For example, adding partitions to Hive needs to happen only once per info date,
* so a sink that does this can check if micro-batches have been ran for the current day.
*/
def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset]

/**
* Gets definition of a metastore table. Please, use with caution and do not write to the underlying path
* from transformers.
Expand All @@ -99,6 +132,12 @@ trait MetastoreReader {
*/
def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo]

/**
* Returns the reason of running the task. This helps transformers and sinks to determine logic based on whether
* thr run is a normal run or a force re-run.
*/
def getRunReason: TaskRunReason

/**
* Returns an object that allows accessing metadata of metastore tables.
*/
Expand Down
54 changes: 54 additions & 0 deletions pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package za.co.absa.pramen.api

import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}

import java.time.LocalDate

/**
Expand All @@ -42,6 +44,13 @@ trait Source extends ExternalChannel {
*/
def hasInfoDateColumn(query: Query): Boolean = true

/**
* If non-empty, the source is configured for incremental ingestion, returns minimum value with type
*
* If empty, the source can't be used for incremental ingestion.
*/
def getOffsetInfo: Option[OffsetInfo] = None

/**
* Validates if the source is okay and the ingestion can proceed.
*/
Expand All @@ -57,6 +66,51 @@ trait Source extends ExternalChannel {
*/
def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): SourceResult

/**
* Returns the incremental data between specified offsets. The offset intervals could be half open,
* e.g. only offsetFrom or offsetTo is specified.
*
* If an information date is provided and available at the source, the query will be limited to that date.
*
* <ul>
* <li> When both `offsetFrom` from and `offsetTo` are passed the source should return offsets using an inclusive interval
* (offsetFrom <= offset <= offsetTo) </li>
* <li> When only `offsetFrom` is present the source should return offsets using an exclusive interval interval
* (offset > offsetFrom)</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it offsetFrom an exclusive interval if offsetTo is not given, but inclusive if also given?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we want to query new data the caller would specify only offsetFrom, and the query is going to look like:

SELECT * FROM table WHERE offset > offsetFrom

(exclusive)

When we want to do a rerun for a day, we get the minimum and maximum offsets for the day and run:

SELECT * FROM table WHERE offset >= offsetFrom offset <= offsetTo

(inclusive)

The last case, when only offsetTo is available might not be used in practice. Added it for completion. Potentially it can be used to query the old database for all data that was already loaded. Maybe for reconciliation or something like this. In this case only inclusive interval makes sense for such queries.

SELECT * FROM table WHERE offset <= offsetTo

(inclusive)

I've could have added a flag, to make the caller choose, or split this method in 2, just for top 2 cases. But then it would require the implementer of a source to implement 2 very similar methods.

Added this reasoning to the method documentation

* <li> When only `offsetTo` is present the source should return offsets using an inclusive interval
* (offset <= offsetTo)</li>
*</ul>
*
* The method will be used in incremental ingestion like this. When the framework queries new data the caller would
* specify only `offsetFrom`, and the query is going to look like:
*
* {{{
* SELECT * FROM table WHERE offset > offsetFrom
* (exclusive)
* }}}
*
* When a rerun is happening for a day, the caller provided both minimum and maximum offsets for that day and runs:
*
* {{{
* SELECT * FROM table WHERE offset >= offsetFrom offset <= offsetTo
* (inclusive)
* }}}
*
* The last case, when only `offsetTo` is available might not be used in practice. Added it for completion.
* Potentially it can be used to query the old database for all data that was already loaded:
*
* {{{
* SELECT * FROM table WHERE offset <= offsetTo
* (inclusive)
* }}}
*
* @param offsetFromOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col <= min_offset
* @param onlyForInfoDate An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
*/
def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult

/**
* This method is called after the ingestion is finished. You can query the output table form the output information
* data and the data should be there.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package za.co.absa.pramen.api

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.api.offset.OffsetValue

import java.time.LocalDate

trait TableReader {
def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long

def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame

def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): DataFrame
}
Loading
Loading