Skip to content

Commit

Permalink
feat: Filter schemas (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianswms authored Aug 29, 2023
1 parent 3ce8679 commit bd7f2df
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
6 changes: 4 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ services:
mysqldb:
image: mysql
restart: always
command: --default-authentication-plugin=mysql_native_password
command: --default-authentication-plugin=mysql_native_password --bind-address=0.0.0.0
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: melty
MYSQL_ROOT_HOST: '%'
ports:
- 3306:3306
mysqldb_ssh:
image: mysql
restart: always
command: --default-authentication-plugin=mysql_native_password
command: --default-authentication-plugin=mysql_native_password --bind-address=0.0.0.0
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: melty
MYSQL_ROOT_HOST: '%'
networks:
inner:
ipv4_address: 10.5.0.5
Expand Down
16 changes: 16 additions & 0 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

if TYPE_CHECKING:
from sqlalchemy.dialects import mysql
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector

unpatched_conform = (
singer_sdk.helpers._typing._conform_primitive_property # noqa: SLF001
Expand Down Expand Up @@ -156,6 +158,20 @@ def sdk_typing_object(

return sqltype_lookup["string"] # safe failover to str

def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
"""Return a list of schema names in DB, or overrides with user-provided values.
Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine
Returns:
List of schema names
"""
if "filter_schemas" in self.config and len(self.config["filter_schemas"]) != 0:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)


class MySQLStream(SQLStream):
"""Stream class for MySQL streams."""
Expand Down
9 changes: 9 additions & 0 deletions tap_mysql/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ def __init__(
"Example mysql://[username]:[password]@localhost:3306/[db_name]"
),
),
th.Property(
"filter_schemas",
th.ArrayType(th.StringType),
description=(
"If an array of schema names is provided, the tap will only process "
"the specified MySQL schemas and ignore others. If left blank, the "
"tap automatically determines ALL available MySQL schemas."
),
),
th.Property(
"ssh_tunnel",
th.ObjectType(
Expand Down
24 changes: 24 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# flake8: noqa

import copy
import datetime
import decimal
import json
Expand Down Expand Up @@ -274,6 +275,29 @@ def test_decimal():
assert "number" in schema_message["schema"]["properties"]["column"]["type"]


def test_filter_schemas():
"""Only return tables from a given schema"""
table_name = "test_filter_schemas"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
table = Table(table_name, metadata_obj, Column("id", Integer), schema="new_schema")

with engine.connect() as conn:
conn.execute("CREATE SCHEMA IF NOT EXISTS new_schema")
if table.exists(conn):
table.drop(conn)
metadata_obj.create_all(conn)
filter_schemas_config = copy.deepcopy(SAMPLE_CONFIG)
filter_schemas_config.update({"filter_schemas": ["new_schema"]})
tap = TapMySQL(config=filter_schemas_config)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"new_schema-{table_name}"
# Check that the only stream in the catalog is the one table put into new_schema
assert len(tap_catalog["streams"]) == 1
assert tap_catalog["streams"][0]["stream"] == altered_table_name


class MySQLTestRunner(TapTestRunner):
def run_sync_dry_run(self) -> bool:
"""Dislike this function and how TestRunner does this so just hacking it here.
Expand Down

0 comments on commit bd7f2df

Please sign in to comment.