diff --git a/README.md b/README.md index e892b2c..d77254c 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ pipx install git+https://github.com/danielptv/tap-db2.git@main | encryption | True | None | Encryption settings for the DB2 connection. Disabled if omitted. | | connection_parameters | False | None | Additional parameters to be appended to the connection string. This is an objects containing key-value pairs. | | sqlalchemy_execution_options | False | None | Additional execution options to be passed to SQLAlchemy. This is an objects containing key-value pairs. | -| query_partitioning | False | None | Partition query into smaller subsets. | +| query_partition | False | None | Partition query into smaller subsets. | | filter | False | None | Apply a custom WHERE condition per stream. Unlike the filter available in stream_maps, this will be evaluated BEFORE extracting the data. | | ignore_supplied_tables | False | True | Ignore DB2-supplied user tables. For more info check out [Db2-supplied user tables](https://www.ibm.com/docs/en/db2-for-zos/12?topic=db2-supplied-user-tables). | | ignore_views | False | False | Ignore views. | @@ -147,13 +147,13 @@ plugins: pip_url: tap-ibm-db2 config: ... - query_partitioning: + query_partition: : - primary_key: + partition_key: partition_size: 1000 ``` -Replace `` with the stream name and `` with the stream's primary key. Use `*` to apply a query partitioning setting to all streams not explicitly declared. +Replace `` with the stream name and `` with the stream's partition key. Use `*` to apply a query partitioning setting to all streams not explicitly declared. ## Usage 👷‍♀️ diff --git a/meltano.yml b/meltano.yml index ee61a8b..6384237 100644 --- a/meltano.yml +++ b/meltano.yml @@ -34,7 +34,7 @@ plugins: kind: object - name: sqlalchemy_execution_options kind: object - - name: query_partitioning + - name: query_partition kind: object - name: ignore_supplied_tables kind: boolean diff --git a/tap_db2/stream.py b/tap_db2/stream.py index f16eade..3fdd91f 100644 --- a/tap_db2/stream.py +++ b/tap_db2/stream.py @@ -8,10 +8,8 @@ import ibm_db_sa # type: ignore import sqlalchemy as sa from singer_sdk import SQLStream -from singer_sdk.connectors import SQLConnector from singer_sdk.helpers._state import STARTING_MARKER from singer_sdk.helpers.types import Context -from singer_sdk.tap_base import Tap from tap_db2.connector import DB2Connector @@ -21,34 +19,6 @@ class DB2Stream(SQLStream): connector_class = DB2Connector - def __init__( - self, tap: Tap, catalog_entry: dict, connector: SQLConnector | None = None - ) -> None: - """Initialize the database stream. - - If connector is omitted, a new connector will be created. - - Args: - tap: The parent tap object. - catalog_entry: Catalog entry dict. - connector: Optional connector to reuse. - """ - super().__init__(tap, catalog_entry, connector) - self.query_partitioning_pk = None - self.query_partitioning_size = None - - partitioning_configs = self.config.get("query_partitioning", {}) - if self.tap_stream_id in partitioning_configs: - self.query_partitioning_pk = partitioning_configs[self.tap_stream_id][ - "primary_key" - ] - self.query_partitioning_size = partitioning_configs[self.tap_stream_id][ - "partition_size" - ] - elif "*" in partitioning_configs: - self.query_partitioning_pk = partitioning_configs["*"]["primary_key"] - self.query_partitioning_size = partitioning_configs["*"]["partition_size"] - def get_starting_replication_key_value( self, context: Context | None, @@ -92,6 +62,8 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]: msg = f"Stream '{self.name}' does not support partitioning." raise NotImplementedError(msg) + partition_key, partition_size = self._get_partition_config() + selected_column_names = self.get_selected_schema()["properties"].keys() table = self.connector.get_table( full_table_name=self.fully_qualified_name, @@ -102,8 +74,8 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]: replication_key_col = table.columns[self.replication_key] query = ( query.order_by(replication_key_col) - if self.query_partitioning_pk is None - else query.order_by(table.columns[self.query_partitioning_pk]) + if partition_key is None + else query.order_by(table.columns[partition_key]) ) start_val = self.get_starting_replication_key_value(context) if start_val: @@ -119,7 +91,7 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]: query = query.where(sa.text(filter_configs["*"]["where"])) with self.connector._connect() as conn: - if self.query_partitioning_pk is None: + if partition_key is None: for record in conn.execute(query): transformed_record = self.post_process(dict(record._mapping)) if transformed_record is None: @@ -128,11 +100,11 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]: yield transformed_record else: - limit = self.query_partitioning_size - primary_key = self.query_partitioning_pk lower_limit = None - termination_query = sa.select(sa.func.count(table.columns[primary_key])) + termination_query = sa.select( + sa.func.count(table.columns[partition_key]) + ) if query.whereclause is not None: termination_query = termination_query.where(query.whereclause) termination_query_result = conn.execute(termination_query).first() @@ -141,21 +113,48 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]: fetched_count = 0 while fetched_count < termination_limit: - limited_query = query.limit(limit) + limited_query = query.limit(partition_size) if lower_limit is not None: limited_query = limited_query.where( - table.columns[primary_key] > lower_limit + table.columns[partition_key] > lower_limit ) - for record in conn.execute(limited_query): transformed_record = self.post_process(dict(record._mapping)) + if transformed_record is None: # Record filtered out during post_process() continue - lower_limit = transformed_record[primary_key] + lower_limit = transformed_record[partition_key] fetched_count += 1 yield transformed_record + def _get_partition_config(self) -> t.Tuple[t.Optional[str], t.Optional[int]]: + partitioning_configs = self.config.get("query_partition", {}) + partition_config = partitioning_configs.get( + self.tap_stream_id + ) or partitioning_configs.get("*") + + if partition_config: + partition_key = partition_config.get("partition_key") + partition_size = partition_config.get("partition_size") + else: + partition_key, partition_size = None, None + + return partition_key, partition_size + + @property + def is_sorted(self) -> bool: + """Expect stream to be sorted. + + When `True`, incremental streams will attempt to resume if unexpectedly + interrupted. + + Returns: + `True` if stream is sorted. Defaults to `False`. + """ + partition_key, _ = self._get_partition_config() + return self.replication_method == "INCREMENTAL" and partition_key is None + class ROWID(sa.sql.sqltypes.String): """Custom SQL type for 'ROWID'.""" diff --git a/tap_db2/tap.py b/tap_db2/tap.py index e4ea68e..5e5d827 100644 --- a/tap_db2/tap.py +++ b/tap_db2/tap.py @@ -100,13 +100,13 @@ class TapDB2(SQLTap): description="Additional execution options to be passed to SQLAlchemy. This is an objects containing key-value pairs.", # noqa: E501 ), th.Property( - "query_partitioning", + "query_partition", th.ObjectType( additional_properties=th.CustomType( { "type": ["object", "null"], "properties": { - "primary_key": {"type": ["string"]}, + "partition_key": {"type": ["string"]}, "partition_size": {"type": ["integer"]}, }, } diff --git a/tests/test_core.py b/tests/test_core.py index 62b3c3b..6cc4666 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -15,10 +15,10 @@ "password": "password", } -TEST_CONFIG_QUERY_PARTITIONING = { +TEST_CONFIG_QUERY_PARTITION = { **TEST_CONFIG, - "query_partitioning": { - "db2inst1-test_table": {"primary_key": "id", "partition_size": 5} + "query_partition": { + "db2inst1-test_table": {"partition_key_key": "id", "partition_size": 5} }, } @@ -27,6 +27,6 @@ ) TestTapDB2WithQueryPartitioning = get_tap_test_class( tap_class=TapDB2, - config=TEST_CONFIG_QUERY_PARTITIONING, + config=TEST_CONFIG_QUERY_PARTITION, catalog="tests/data/catalog.json", )