Skip to content

Commit

Permalink
chore: Test with Python 3.13, drop support for Python 3.8 (#224)
Browse files Browse the repository at this point in the history
* chore: Test with Python 3.13, drop support for Python 3.8

* ci: Use ruff as formatter

* fix: ruff errors

* fix: yield instead of return

* [pre-commit.ci] auto fixes from pre-commit.ci hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: Daniel Purtov <daniel.purtov@capgemini.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 3, 2024
1 parent 45220d9 commit b36cfe5
Show file tree
Hide file tree
Showing 9 changed files with 732 additions and 624 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
5 changes: 2 additions & 3 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"recommendations": [
"ms-python.python",
"ms-python.pylint",
"ms-python.flake8",
"ms-python.black-formatter"
"charliermarsh.ruff",
"ms-python.mypy-type-checker"
]
}
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{
"editor.defaultFormatter": "ms-python.black-formatter"
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff"
}
}
4 changes: 0 additions & 4 deletions output/.gitignore

This file was deleted.

1,214 changes: 652 additions & 562 deletions poetry.lock

Large diffs are not rendered by default.

30 changes: 19 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "MIT"
readme = "README.md"
homepage = "https://meltano.com"
repository = "https://github.com/danielptv/tap-db2"
documentation = "https://github.com/danielptv/tap-db2/blob/main/README.md"
keywords = [
"DB2",
"IBM DB2",
Expand All @@ -18,19 +19,19 @@ classifiers = [
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: Implementation :: CPython",
]
packages = [
{ include = "tap_db2" },
]

[tool.poetry.dependencies]
python = ">=3.8,<4"
python = ">=3.9"
singer-sdk = "0.42.1"
ibm-db-sa = "0.4.1"
sqlalchemy = "2.0.36"
Expand Down Expand Up @@ -61,18 +62,25 @@ vcs = "git"
style = "semver"

[tool.ruff]
target-version = "py38"
target-version = "py39"

[tool.ruff.lint]
select = [
"F", # Pyflakes
"W", # pycodestyle warnings
"E", # pycodestyle errors
"I", # isort
"N", # pep8-naming
"D", # pydocsyle
"ICN", # flake8-import-conventions
"RUF", # ruff
"F", # Pyflakes
"W", # pycodestyle warnings
"E", # pycodestyle errors
"FA", # flake8-future-annotations
"I", # isort
"N", # pep8-naming
"D", # pydocsyle
"UP", # pyupgrade
"ICN", # flake8-import-conventions
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"PL", # Pylint
"PERF", # Perflint
"RUF", # ruff
]

[tool.ruff.lint.flake8-import-conventions]
Expand Down
10 changes: 5 additions & 5 deletions tap_db2/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import sqlalchemy as sa
from singer_sdk import SQLConnector
from singer_sdk import typing as th
from sqlalchemy.engine import Engine
from sqlalchemy.types import TypeEngine

if t.TYPE_CHECKING:
from sqlalchemy.engine import Engine
from sqlalchemy.types import TypeEngine

SUPPLIED_USER_TABLES_PATTERN = r"^(DSN_|PLAN_TABLE)"

Expand Down Expand Up @@ -105,9 +107,7 @@ def discover_catalog_entries(self) -> list[dict]:
# Filter by schema
# Connection parameter 'CURRENTSCHEMA=mySchema;' doesn't work
# https://www.ibm.com/support/pages/525-error-nullidsysstat-package-when-trying-set-current-schema-against-db2-zos-database
target_schema = (
self.config["schema"] if "schema" in self.config else None
)
target_schema = self.config.get("schema", None)
if (
target_schema is None
or target_schema.strip().lower() == schema_name.strip().lower()
Expand Down
78 changes: 44 additions & 34 deletions tap_db2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import sqlalchemy as sa
from singer_sdk import SQLStream
from singer_sdk.helpers._state import STARTING_MARKER
from singer_sdk.helpers.types import Context

from tap_db2.connector import DB2Connector

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context


class DB2Stream(SQLStream):
"""Stream class for IBM DB2 streams."""
Expand Down Expand Up @@ -84,11 +86,7 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
if self.ABORT_AT_RECORD_COUNT is not None:
query = query.limit(self.ABORT_AT_RECORD_COUNT + 1)

filter_configs = self.config.get("filter", {})
if self.tap_stream_id in filter_configs:
query = query.where(sa.text(filter_configs[self.tap_stream_id]["where"]))
elif "*" in filter_configs:
query = query.where(sa.text(filter_configs["*"]["where"]))
query = self._apply_filter_config(query)

with self.connector._connect() as conn:
if partition_key is None:
Expand All @@ -100,35 +98,9 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
yield transformed_record

else:
lower_limit = None
yield from self._get_partitioned_records(query, table, conn)

termination_query = sa.select(
sa.func.count(table.columns[partition_key])
)
if query.whereclause is not None:
termination_query = termination_query.where(query.whereclause)
termination_query_result = conn.execute(termination_query).first()
assert termination_query_result is not None, "Invalid termination query"
termination_limit = int(str(termination_query_result[0]))
fetched_count = 0

while fetched_count < termination_limit:
limited_query = query.limit(partition_size)
if lower_limit is not None:
limited_query = limited_query.where(
table.columns[partition_key] > lower_limit
)
for record in conn.execute(limited_query):
transformed_record = self.post_process(dict(record._mapping))

if transformed_record is None:
# Record filtered out during post_process()
continue
lower_limit = transformed_record[partition_key]
fetched_count += 1
yield transformed_record

def _get_partition_config(self) -> t.Tuple[t.Optional[str], t.Optional[int]]:
def _get_partition_config(self) -> tuple[str | None, int | None]:
partitioning_configs = self.config.get("query_partition", {})
partition_config = partitioning_configs.get(
self.tap_stream_id
Expand All @@ -142,6 +114,44 @@ def _get_partition_config(self) -> t.Tuple[t.Optional[str], t.Optional[int]]:

return partition_key, partition_size

def _apply_filter_config(self, query: sa.sql.select) -> sa.sql.select:
filter_configs = self.config.get("filter", {})
if self.tap_stream_id in filter_configs:
query = query.where(sa.text(filter_configs[self.tap_stream_id]["where"]))
elif "*" in filter_configs:
query = query.where(sa.text(filter_configs["*"]["where"]))
return query

def _get_partitioned_records(
self, query: sa.sql.select, table: sa.Table, conn: sa.engine.Connection
) -> t.Iterable[dict[str, t.Any]]:
partition_key, partition_size = self._get_partition_config()
lower_limit = None

termination_query = sa.select(sa.func.count(table.columns[partition_key]))
if query.whereclause is not None:
termination_query = termination_query.where(query.whereclause)
termination_query_result = conn.execute(termination_query).first()
assert termination_query_result is not None, "Invalid termination query"
termination_limit = int(str(termination_query_result[0]))
fetched_count = 0

while fetched_count < termination_limit:
limited_query = query.limit(partition_size)
if lower_limit is not None:
limited_query = limited_query.where(
table.columns[partition_key] > lower_limit
)
for record in conn.execute(limited_query):
transformed_record = self.post_process(dict(record._mapping))

if transformed_record is None:
# Record filtered out during post_process()
continue
lower_limit = transformed_record[partition_key]
fetched_count += 1
yield transformed_record

@property
def is_sorted(self) -> bool:
"""Expect stream to be sorted.
Expand Down
9 changes: 6 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
# This file can be used to customize tox tests as well as other test frameworks like flake8 and mypy

[tox]
envlist = py38
; envlist = py38, py39, py310, py311, py312
envlist =
3.{9,10,11,12,13}
format
lint
isolated_build = true
min_version = 4

[testenv]
allowlist_externals = poetry

[testenv:pytest]
# Run the python tests.
# To execute, run `tox -e pytest`
envlist = py38, py39, py310, py311, py312
envlist = 3.{9,10,11,12,13}
commands =
poetry install -v
poetry run pytest
Expand Down

0 comments on commit b36cfe5

Please sign in to comment.