Skip to content

Commit

Permalink
fix: Mark streams using query partition as unsorted (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielptv authored Dec 2, 2024
1 parent 5a82e78 commit 45220d9
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 51 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pipx install git+https://github.com/danielptv/tap-db2.git@main
| encryption | True | None | Encryption settings for the DB2 connection. Disabled if omitted. |
| connection_parameters | False | None | Additional parameters to be appended to the connection string. This is an objects containing key-value pairs. |
| sqlalchemy_execution_options | False | None | Additional execution options to be passed to SQLAlchemy. This is an objects containing key-value pairs. |
| query_partitioning | False | None | Partition query into smaller subsets. |
| query_partition | False | None | Partition query into smaller subsets. |
| filter | False | None | Apply a custom WHERE condition per stream. Unlike the filter available in stream_maps, this will be evaluated BEFORE extracting the data. |
| ignore_supplied_tables | False | True | Ignore DB2-supplied user tables. For more info check out [Db2-supplied user tables](https://www.ibm.com/docs/en/db2-for-zos/12?topic=db2-supplied-user-tables). |
| ignore_views | False | False | Ignore views. |
Expand Down Expand Up @@ -147,13 +147,13 @@ plugins:
pip_url: tap-ibm-db2
config:
...
query_partitioning:
query_partition:
<stream>:
primary_key: <primary key>
partition_key: <partition_key>
partition_size: 1000
```

Replace `<stream>` with the stream name and `<primary key>` with the stream's primary key. Use `*` to apply a query partitioning setting to all streams not explicitly declared.
Replace `<stream>` with the stream name and `<partition_key>` with the stream's partition key. Use `*` to apply a query partitioning setting to all streams not explicitly declared.

## Usage 👷‍♀️

Expand Down
2 changes: 1 addition & 1 deletion meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ plugins:
kind: object
- name: sqlalchemy_execution_options
kind: object
- name: query_partitioning
- name: query_partition
kind: object
- name: ignore_supplied_tables
kind: boolean
Expand Down
79 changes: 39 additions & 40 deletions tap_db2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import ibm_db_sa # type: ignore
import sqlalchemy as sa
from singer_sdk import SQLStream
from singer_sdk.connectors import SQLConnector
from singer_sdk.helpers._state import STARTING_MARKER
from singer_sdk.helpers.types import Context
from singer_sdk.tap_base import Tap

from tap_db2.connector import DB2Connector

Expand All @@ -21,34 +19,6 @@ class DB2Stream(SQLStream):

connector_class = DB2Connector

def __init__(
self, tap: Tap, catalog_entry: dict, connector: SQLConnector | None = None
) -> None:
"""Initialize the database stream.
If connector is omitted, a new connector will be created.
Args:
tap: The parent tap object.
catalog_entry: Catalog entry dict.
connector: Optional connector to reuse.
"""
super().__init__(tap, catalog_entry, connector)
self.query_partitioning_pk = None
self.query_partitioning_size = None

partitioning_configs = self.config.get("query_partitioning", {})
if self.tap_stream_id in partitioning_configs:
self.query_partitioning_pk = partitioning_configs[self.tap_stream_id][
"primary_key"
]
self.query_partitioning_size = partitioning_configs[self.tap_stream_id][
"partition_size"
]
elif "*" in partitioning_configs:
self.query_partitioning_pk = partitioning_configs["*"]["primary_key"]
self.query_partitioning_size = partitioning_configs["*"]["partition_size"]

def get_starting_replication_key_value(
self,
context: Context | None,
Expand Down Expand Up @@ -92,6 +62,8 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
msg = f"Stream '{self.name}' does not support partitioning."
raise NotImplementedError(msg)

partition_key, partition_size = self._get_partition_config()

selected_column_names = self.get_selected_schema()["properties"].keys()
table = self.connector.get_table(
full_table_name=self.fully_qualified_name,
Expand All @@ -102,8 +74,8 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
replication_key_col = table.columns[self.replication_key]
query = (
query.order_by(replication_key_col)
if self.query_partitioning_pk is None
else query.order_by(table.columns[self.query_partitioning_pk])
if partition_key is None
else query.order_by(table.columns[partition_key])
)
start_val = self.get_starting_replication_key_value(context)
if start_val:
Expand All @@ -119,7 +91,7 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
query = query.where(sa.text(filter_configs["*"]["where"]))

with self.connector._connect() as conn:
if self.query_partitioning_pk is None:
if partition_key is None:
for record in conn.execute(query):
transformed_record = self.post_process(dict(record._mapping))
if transformed_record is None:
Expand All @@ -128,11 +100,11 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
yield transformed_record

else:
limit = self.query_partitioning_size
primary_key = self.query_partitioning_pk
lower_limit = None

termination_query = sa.select(sa.func.count(table.columns[primary_key]))
termination_query = sa.select(
sa.func.count(table.columns[partition_key])
)
if query.whereclause is not None:
termination_query = termination_query.where(query.whereclause)
termination_query_result = conn.execute(termination_query).first()
Expand All @@ -141,21 +113,48 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
fetched_count = 0

while fetched_count < termination_limit:
limited_query = query.limit(limit)
limited_query = query.limit(partition_size)
if lower_limit is not None:
limited_query = limited_query.where(
table.columns[primary_key] > lower_limit
table.columns[partition_key] > lower_limit
)

for record in conn.execute(limited_query):
transformed_record = self.post_process(dict(record._mapping))

if transformed_record is None:
# Record filtered out during post_process()
continue
lower_limit = transformed_record[primary_key]
lower_limit = transformed_record[partition_key]
fetched_count += 1
yield transformed_record

def _get_partition_config(self) -> t.Tuple[t.Optional[str], t.Optional[int]]:
partitioning_configs = self.config.get("query_partition", {})
partition_config = partitioning_configs.get(
self.tap_stream_id
) or partitioning_configs.get("*")

if partition_config:
partition_key = partition_config.get("partition_key")
partition_size = partition_config.get("partition_size")
else:
partition_key, partition_size = None, None

return partition_key, partition_size

@property
def is_sorted(self) -> bool:
"""Expect stream to be sorted.
When `True`, incremental streams will attempt to resume if unexpectedly
interrupted.
Returns:
`True` if stream is sorted. Defaults to `False`.
"""
partition_key, _ = self._get_partition_config()
return self.replication_method == "INCREMENTAL" and partition_key is None


class ROWID(sa.sql.sqltypes.String):
"""Custom SQL type for 'ROWID'."""
Expand Down
4 changes: 2 additions & 2 deletions tap_db2/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ class TapDB2(SQLTap):
description="Additional execution options to be passed to SQLAlchemy. This is an objects containing key-value pairs.", # noqa: E501
),
th.Property(
"query_partitioning",
"query_partition",
th.ObjectType(
additional_properties=th.CustomType(
{
"type": ["object", "null"],
"properties": {
"primary_key": {"type": ["string"]},
"partition_key": {"type": ["string"]},
"partition_size": {"type": ["integer"]},
},
}
Expand Down
8 changes: 4 additions & 4 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
"password": "password",
}

TEST_CONFIG_QUERY_PARTITIONING = {
TEST_CONFIG_QUERY_PARTITION = {
**TEST_CONFIG,
"query_partitioning": {
"db2inst1-test_table": {"primary_key": "id", "partition_size": 5}
"query_partition": {
"db2inst1-test_table": {"partition_key_key": "id", "partition_size": 5}
},
}

Expand All @@ -27,6 +27,6 @@
)
TestTapDB2WithQueryPartitioning = get_tap_test_class(
tap_class=TapDB2,
config=TEST_CONFIG_QUERY_PARTITIONING,
config=TEST_CONFIG_QUERY_PARTITION,
catalog="tests/data/catalog.json",
)

0 comments on commit 45220d9

Please sign in to comment.