diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
index b52742732..19d67403f 100644
--- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
+++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
@@ -72,8 +72,17 @@ trait Source extends ExternalChannel {
*
* If an information date is provided and available at the source, the query will be limited to that date.
*
+ *
+ * - When both `offsetFrom` from and `offsetTo` are passed the source should return offsets using an inclusive interval
+ * (offsetFrom <= offset <= offsetTo)
+ * - When only `offsetFrom` is present the source should return offsets using an exclusive interval interval
+ * (offset > offsetFrom)
+ * - When only `offsetTo` is present the source should return offsets using an inclusive interval
+ * (offset <= offsetTo)
+ *
+ *
* @param offsetFromOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
- * @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
+ * @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col <= min_offset
* @param onlyForInfoDate An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
*/
diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala
index 2ff693262..e38009542 100644
--- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala
+++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGenerator.scala
@@ -55,6 +55,15 @@ trait SqlGenerator {
/**
* Generates a query for incremental ingestion, the result can be restricted by an information column, if present, but also by offset range.
+ *
+ *
+ * - When both `offsetFrom` from and `offsetTo` are passed the generator should return offsets using an inclusive interval
+ * (offsetFrom <= offset <= offsetTo)
+ * - When only `offsetFrom` is present the generator should return offsets using an exclusive interval interval
+ * (offset > offsetFrom)
+ * - When only `offsetTo` is present the generator should return offsets using an inclusive interval
+ * (offset <= offsetTo)
+ *
*/
def getDataQueryIncremental(tableName: String,
onlyForInfoDate: Option[LocalDate],
diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala
index ade09afda..f79ee5cbb 100644
--- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala
+++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala
@@ -120,7 +120,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
s"${getOffsetWhereCondition(offsetColumn, ">=", offsetFrom)} AND ${getOffsetWhereCondition(offsetColumn, "<=", offsetTo)}"
case (Some(offsetFrom), None) =>
validateOffsetValue(offsetFrom)
- s"${getOffsetWhereCondition(offsetColumn, ">=", offsetFrom)}"
+ s"${getOffsetWhereCondition(offsetColumn, ">", offsetFrom)}"
case (None, Some(offsetTo)) =>
validateOffsetValue(offsetTo)
s"${getOffsetWhereCondition(offsetColumn, "<=", offsetTo)}"
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
index feacaf413..49562a27a 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
@@ -82,9 +82,9 @@ class TableReaderSpark(formatOpt: Option[String],
getData(query, infoDate, infoDate, columns)
.filter(offsetCol >= offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit)
case (Some(offsetFrom), None) =>
- log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} >= ${offsetFrom.valueString}")
+ log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}")
getData(query, infoDate, infoDate, columns)
- .filter(offsetCol >= offsetFrom.getSparkLit)
+ .filter(offsetCol > offsetFrom.getSparkLit)
case (None, Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getData(query, infoDate, infoDate, columns)
@@ -100,9 +100,9 @@ class TableReaderSpark(formatOpt: Option[String],
getBaseDataFrame(query)
.filter(offsetCol >= offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit)
case (Some(offsetFrom), None) =>
- log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} >= ${offsetFrom.valueString}")
+ log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}")
getBaseDataFrame(query)
- .filter(offsetCol >= offsetFrom.getSparkLit)
+ .filter(offsetCol > offsetFrom.getSparkLit)
case (None, Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getBaseDataFrame(query)
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
index 8e2d7677c..b1ea0b125 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
@@ -621,7 +621,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(offsets.length == 1)
- assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue)
+ assert(offsets.head.minOffset.get.valueString.toLong == 1)
assert(offsets.head.maxOffset.get.valueString.toLong == 3)
assert(offsets.head.committedAt.nonEmpty)
}
@@ -674,7 +674,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(offsets.length == 1)
- assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue)
+ assert(offsets.head.minOffset.get.valueString.toLong == 1)
assert(offsets.head.maxOffset.get.valueString.toLong == 3)
assert(offsets.head.committedAt.nonEmpty)
}
@@ -736,10 +736,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(offsets.length == 2)
- assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue)
+ assert(offsets.head.minOffset.get.valueString.toLong == 1)
assert(offsets.head.maxOffset.get.valueString.toLong == 3)
assert(offsets.head.committedAt.nonEmpty)
- assert(offsets(1).minOffset.get.valueString.toLong == 3)
+ assert(offsets(1).minOffset.get.valueString.toLong == 4)
assert(offsets(1).maxOffset.get.valueString.toLong == 6)
assert(offsets(1).committedAt.nonEmpty)
}
@@ -1043,20 +1043,19 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val om = new OffsetManagerJdbc(pramenDb.db, 123L)
val offsets1 = om.getOffsets("table1", infoDate.minusDays(1))
- assert(offsets1.head.minOffset.get.valueString.toLong == -62135596800000L)
+ assert(offsets1.head.minOffset.get.valueString.toLong == 1613563930000L)
assert(offsets1.head.maxOffset.get.valueString.toLong == 1613563930000L)
assert(offsets1.head.committedAt.nonEmpty)
-
val offsets2 = om.getOffsets("table1", infoDate)
assert(offsets2.length == 1)
- assert(offsets2.head.minOffset.get.valueString.toLong == 1613563930000L)
+ assert(offsets2.head.minOffset.get.valueString.toLong == 1613639398123L)
assert(offsets2.head.maxOffset.get.valueString.toLong == 1613639399123L)
assert(offsets2.head.committedAt.nonEmpty)
val offsets3 = om.getOffsets("table1", infoDate.plusDays(1))
assert(offsets3.length == 1)
- assert(offsets3.head.minOffset.get.valueString.toLong == 1613639399123L)
+ assert(offsets3.head.minOffset.get.valueString.toLong == 1613740330000L)
assert(offsets3.head.maxOffset.get.valueString.toLong == 1613740330000L)
assert(offsets3.head.committedAt.nonEmpty)
}
@@ -1109,13 +1108,13 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val offsets1 = om.getOffsets("table1", infoDate.minusDays(1))
assert(offsets1.length == 1)
- assert(offsets1.head.minOffset.get.valueString.toLong == Long.MinValue)
+ assert(offsets1.head.minOffset.get.valueString.toLong == 1)
assert(offsets1.head.maxOffset.get.valueString.toLong == 2)
assert(offsets1.head.committedAt.nonEmpty)
val offsets2 = om.getOffsets("table1", infoDate)
assert(offsets2.length == 1)
- assert(offsets2.head.minOffset.get.valueString.toLong == 2)
+ assert(offsets2.head.minOffset.get.valueString.toLong == 3)
assert(offsets2.head.maxOffset.get.valueString.toLong == 4)
assert(offsets2.head.committedAt.nonEmpty)
}