From 95a766f885dafbfd68c728a868570e4003421a7c Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 12:01:44 +0100 Subject: [PATCH 01/16] format docs --- .gitignore | 5 ++++- mkdocs.yml | 5 ++++- src/msfabricutils/core/generic.py | 4 ++-- src/msfabricutils/core/lakehouse.py | 2 +- src/msfabricutils/core/workspace.py | 10 +++++----- src/msfabricutils/helpers/separator_indices.py | 4 ++-- 6 files changed, 18 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index f927fc0..2a97176 100644 --- a/.gitignore +++ b/.gitignore @@ -132,4 +132,7 @@ dmypy.json cython_debug/ # Ruff -.ruff_cache \ No newline at end of file +.ruff_cache + +# MacOS +**/.DS_Store diff --git a/mkdocs.yml b/mkdocs.yml index 555bed1..98d0f44 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -36,14 +36,17 @@ plugins: python: paths: [src] options: + show_root_toc_entry: false docstring_style: google - show_source: true + show_source: false show_root_heading: true + filters: ["!^_", "^__init__$"] show_object_full_path: false heading_level: 1 members_order: source separate_signature: true show_signature_annotations: true + docstring_section_style: table nav: - Home: index.md diff --git a/src/msfabricutils/core/generic.py b/src/msfabricutils/core/generic.py index af9fe5f..2d4b9e1 100644 --- a/src/msfabricutils/core/generic.py +++ b/src/msfabricutils/core/generic.py @@ -16,7 +16,7 @@ def get_paginated(endpoint: str, data_key: str) -> list[dict]: data_key (str): The key in the response JSON that contains the list of data to be returned. Returns: - list[dict]: A list of dictionaries containing the data from all pages. + A list of dictionaries containing the data from all pages. Raises: requests.exceptions.RequestException: If the HTTP request fails or returns an error. @@ -55,7 +55,7 @@ def get_page(endpoint: str) -> list[dict]: endpoint (str): The API endpoint to send the GET request to. Returns: - list[dict]: A list of dictionaries containing the data returned from the API. + A list of dictionaries containing the data returned from the API. Raises: requests.exceptions.RequestException: If the HTTP request fails or returns an error. diff --git a/src/msfabricutils/core/lakehouse.py b/src/msfabricutils/core/lakehouse.py index ff0304f..cb77a5d 100644 --- a/src/msfabricutils/core/lakehouse.py +++ b/src/msfabricutils/core/lakehouse.py @@ -13,7 +13,7 @@ def get_workspace_lakehouses(workspace_id: str) -> list[dict]: workspace_id (str): The ID of the workspace to retrieve lakehouses from. Returns: - list[dict]: A list of dictionaries containing lakehouse data for the specified workspace. + A list of dictionaries containing lakehouse data for the specified workspace. See Also: `get_paginated`: A helper function that handles paginated API requests. diff --git a/src/msfabricutils/core/workspace.py b/src/msfabricutils/core/workspace.py index f2d4119..0c9f914 100644 --- a/src/msfabricutils/core/workspace.py +++ b/src/msfabricutils/core/workspace.py @@ -1,7 +1,7 @@ from msfabricutils.core.generic import get_paginated, get_page +from typing import Any - -def get_workspaces() -> list[dict]: +def get_workspaces() -> list[dict[str, Any]]: """ Retrieves a list of workspaces. @@ -10,7 +10,7 @@ def get_workspaces() -> list[dict]: with workspaces. Returns: - list[dict]: A list of dictionaries containing data for the available workspaces. + A list of dictionaries containing data for the available workspaces. See Also: `get_paginated`: A helper function that handles paginated API requests. @@ -21,7 +21,7 @@ def get_workspaces() -> list[dict]: return get_paginated(endpoint, data_key) -def get_workspace(workspace_id: str) -> dict: +def get_workspace(workspace_id: str) -> dict[str, Any]: """ Retrieves details of a specified workspace. @@ -32,7 +32,7 @@ def get_workspace(workspace_id: str) -> dict: workspace_id (str): The ID of the workspace to retrieve details for. Returns: - dict: A dictionary containing the details of the specified workspace. + A dictionary containing the details of the specified workspace. See Also: `get_page`: A helper function that retrieves a single page of data from the API. diff --git a/src/msfabricutils/helpers/separator_indices.py b/src/msfabricutils/helpers/separator_indices.py index 3fc53cd..ce9fb44 100644 --- a/src/msfabricutils/helpers/separator_indices.py +++ b/src/msfabricutils/helpers/separator_indices.py @@ -1,4 +1,4 @@ -def _separator_indices(string: str, separator: str): +def _separator_indices(string: str, separator: str) -> list[int]: """Find indices of a separator character in a string, ignoring separators inside quotes. Args: @@ -6,7 +6,7 @@ def _separator_indices(string: str, separator: str): separator (str): The separator character to find Returns: - list[int]: List of indices where the separator character appears outside of quotes + A list of indices where the separator character appears outside of quotes Example: >>> separator_indices('a,b,"c,d",e', ',') From 05b313be04fc75d2a4d3fca03126bb02357f0c13 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 12:02:10 +0100 Subject: [PATCH 02/16] refactor token func --- src/msfabricutils/core/auth.py | 81 +++++++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 16 deletions(-) diff --git a/src/msfabricutils/core/auth.py b/src/msfabricutils/core/auth.py index 96a8cac..78f3636 100644 --- a/src/msfabricutils/core/auth.py +++ b/src/msfabricutils/core/auth.py @@ -1,29 +1,41 @@ from azure.identity import DefaultAzureCredential -def get_onelake_access_token() -> str: +def get_access_token(audience: str) -> str: """ - Retrieves an access token for Azure OneLake storage. - - This function attempts to obtain an access token for accessing Azure storage. + Retrieves an access token for a given audience. + + This function attempts to obtain an access token for a given audience. It first checks if the code is running in a Microsoft Fabric notebook environment and attempts to use the `notebookutils` library to get the token. If the library is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. + """ - Returns: - str: The access token used for authenticating requests to Azure OneLake storage. - """ - audience = "https://storage.azure.com" try: import notebookutils # type: ignore - token = notebookutils.credentials.getToken(audience) except ModuleNotFoundError: token = DefaultAzureCredential().get_token(f"{audience}/.default").token return token +def get_onelake_access_token() -> str: + """ + Retrieves an access token for Azure OneLake storage. + + This function attempts to obtain an access token for accessing Azure storage. + It first checks if the code is running in a Microsoft Fabric notebook environment + and attempts to use the `notebookutils` library to get the token. If the library + is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK + to fetch the token. + + Returns: + The access token used for authenticating requests to Azure OneLake storage. + """ + audience = "https://storage.azure.com" + return get_access_token(audience) + def get_fabric_bearer_token() -> str: """ @@ -36,14 +48,51 @@ def get_fabric_bearer_token() -> str: from the Azure SDK to fetch the token. Returns: - str: The bearer token used for authenticating requests to the Azure Fabric (Power BI) API. + The bearer token used for authenticating requests to the Azure Fabric (Power BI) API. """ audience = "https://analysis.windows.net/powerbi/api" - try: - import notebookutils # type: ignore + return get_access_token(audience) - token = notebookutils.credentials.getToken(audience) - except ModuleNotFoundError: - token = DefaultAzureCredential().get_token(f"{audience}/.default").token - return token +def get_azure_devops_access_token() -> str: + """ + Retrieves a bearer token for Azure Fabric (Power BI) API. + + This function attempts to obtain a bearer token for authenticating requests to the + Azure Power BI API. It first checks if the code is running in a Microsoft Fabric + notebook environment and tries to use the `notebookutils` library to get the token. + If the library is not available, it falls back to using the `DefaultAzureCredential` + from the Azure SDK to fetch the token. + + Returns: + The bearer token used for authenticating requests to the Azure Fabric (Power BI) API. + """ + audience = "499b84ac-1321-427f-aa17-267ca6975798" + return get_access_token(audience) + + +def get_storage_options() -> dict[str, str]: + """ + Retrieves storage options including a bearer token for Azure OneLake storage. + + This function calls `get_onelake_access_token` to obtain a bearer token + and returns a dictionary containing the token and a flag indicating + whether to use the Fabric endpoint. + + Returns: + A dictionary containing the storage options for OneLake. + + Example: + **Retrieve storage options** + ```python + from msfabricutils import get_storage_options + + options = get_storage_options() + options + {'bearer_token': 'your_token_here', 'use_fabric_endpoint': 'true'} + ``` + """ + return { + "bearer_token": get_onelake_access_token(), + "use_fabric_endpoint": "true" + } \ No newline at end of file From 109cf3eac3a6271b832ad460cc9b416b8b887951 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 12:33:38 +0100 Subject: [PATCH 03/16] add common helpers for etl --- pyproject.toml | 5 +- src/msfabricutils/helpers/__init__.py | 8 +- src/msfabricutils/helpers/merge_helpers.py | 100 ++++++++++++++++++ src/msfabricutils/helpers/quote_identifier.py | 20 ++++ .../helpers/string_normalization.py | 40 +++++++ tests/etl/test_transforms.py | 38 +++++++ tests/helpers/test_escape_object_name.py | 9 ++ tests/helpers/test_merge_helpers.py | 51 +++++++++ tests/helpers/test_separator_indices.py | 5 + tests/helpers/test_string_normalization.py | 42 ++++++++ 10 files changed, 316 insertions(+), 2 deletions(-) create mode 100644 src/msfabricutils/helpers/merge_helpers.py create mode 100644 src/msfabricutils/helpers/quote_identifier.py create mode 100644 src/msfabricutils/helpers/string_normalization.py create mode 100644 tests/etl/test_transforms.py create mode 100644 tests/helpers/test_escape_object_name.py create mode 100644 tests/helpers/test_merge_helpers.py create mode 100644 tests/helpers/test_string_normalization.py diff --git a/pyproject.toml b/pyproject.toml index 934e8a8..7ee26ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,13 +41,16 @@ docs = [ dev = [ "pytest>=6.2.5", "ruff>=0.8.1", + "freezegun>=1.5.1", ] [tool.ruff] line-length = 100 +[tool.ruff.lint] +extend-select = ["Q001"] + [tool.pytest.ini_options] pythonpath = [ "src", ] - diff --git a/src/msfabricutils/helpers/__init__.py b/src/msfabricutils/helpers/__init__.py index 03a2bcb..48c6e47 100644 --- a/src/msfabricutils/helpers/__init__.py +++ b/src/msfabricutils/helpers/__init__.py @@ -1,5 +1,11 @@ from .separator_indices import _separator_indices +from .quote_identifier import quote_identifier +from .merge_helpers import build_merge_predicate, build_when_matched_update_columns, build_when_matched_update_predicate __all__ = ( "_separator_indices", -) \ No newline at end of file + "quote_identifier", + "build_merge_predicate", + "build_when_matched_update_columns", + "build_when_matched_update_predicate", +) diff --git a/src/msfabricutils/helpers/merge_helpers.py b/src/msfabricutils/helpers/merge_helpers.py new file mode 100644 index 0000000..115ba0c --- /dev/null +++ b/src/msfabricutils/helpers/merge_helpers.py @@ -0,0 +1,100 @@ +from msfabricutils.helpers.quote_identifier import quote_identifier + + +def build_merge_predicate(columns: list[str]) -> str: + """ + Constructs a SQL merge predicate based on the provided column names. + + This function generates a string that represents the condition for merging + records based on equality of the specified columns. + + Args: + columns (list[str]): A list of column names to be used in the merge predicate. + + Returns: + A SQL string representing the merge predicate. + + Example: + ```python + predicate = build_merge_predicate(['id', 'name']) + print(predicate) + \"\"\" + (target."id" = source."id") AND (target."name" = source."name") + \"\"\" + ``` + """ + merge_predicate = [ + f""" + (target.{quote_identifier(column)} = source.{quote_identifier(column)}) + """ + for column in columns + ] + return " AND ".join(merge_predicate) + + +def build_when_matched_update_predicate(columns: list[str]) -> str: + """ + Constructs a SQL predicate for when matched update conditions. + + This function generates a string that represents the conditions for updating + records when a match is found based on the specified columns. + + Args: + columns (list[str]): A list of column names to be used in the update predicate. + + Returns: + A SQL string representing the when matched update predicate. + + Example: + ```python + update_predicate = build_when_matched_update_predicate(['id', 'status']) + print(update_predicate) + \"\"\" + ( + (target."id" != source."id") + OR (target."id" IS NULL AND source."id" IS NOT NULL) + OR (target."id" IS NOT NULL AND source."id" IS NULL) + ) OR ... + \"\"\" + ``` + """ + when_matched_update_predicates = [ + f""" + ( + (target.{quote_identifier(column)} != source.{quote_identifier(column)}) + OR (target.{quote_identifier(column)} IS NULL AND source.{quote_identifier(column)} IS NOT NULL) + OR (target.{quote_identifier(column)} IS NOT NULL AND source.{quote_identifier(column)} IS NULL) + ) + """ + for column in columns + ] + return " OR ".join(when_matched_update_predicates) + + +def build_when_matched_update_columns(columns: list[str]) -> dict[str, str]: + """ + Constructs a mapping of columns to be updated when a match is found. + + This function generates a dictionary where the keys are the target column + names and the values are the corresponding source column names. + + Args: + columns (list[str]): A list of column names to be used in the update mapping. + + Returns: + A dictionary mapping target columns to source columns. + + Example: + ```python + update_columns = build_when_matched_update_columns(['id', 'name']) + print(update_columns) + { + 'target."id"': 'source."id"', + 'target."name"': 'source."name"' + } + ``` + """ + return { + f"target.{quote_identifier(column)}": f"source.{quote_identifier(column)}" + for column in columns + } diff --git a/src/msfabricutils/helpers/quote_identifier.py b/src/msfabricutils/helpers/quote_identifier.py new file mode 100644 index 0000000..d5a11a7 --- /dev/null +++ b/src/msfabricutils/helpers/quote_identifier.py @@ -0,0 +1,20 @@ +def quote_identifier(identifier: str, quote_character: str = '"') -> str: + """ + Quotes the given identifier by surrounding it with the specified quote character. + + Args: + identifier (str): The identifier to be quoted. + quote_character (str, optional): The character to use for quoting. Defaults to '"'. + + Returns: + The quoted identifier. + + Example: + ```python + quote_identifier("my_object") + '"my_object"' + quote_identifier("my_object", "'") + "'my_object'" + ``` + """ + return f"{quote_character}{identifier.strip(quote_character)}{quote_character}" diff --git a/src/msfabricutils/helpers/string_normalization.py b/src/msfabricutils/helpers/string_normalization.py new file mode 100644 index 0000000..ff0fb6c --- /dev/null +++ b/src/msfabricutils/helpers/string_normalization.py @@ -0,0 +1,40 @@ +import re + +def to_snake_case(text: str) -> str: + """Convert a string to snake case. + + Args: + text (str): The string to convert to snake case. Can be converted from PascalCase, camelCase, kebab-case, or mixed case. Non-alphanumeric characters are converted to underscores. + + Returns: + The string in snake case. + + Examples: + >>> to_snake_case("CustomerID") + "customer_id" + """ + text = text.replace(" ", "_") + text = text.replace("-", "_") + text = re.sub(r"([a-z])([A-Z0-9])", r"\1_\2", text) + text = re.sub(r"([A-Z0-9])([A-Z0-9][a-z])", r"\1_\2", text) + text = re.sub(r"(? str: + """Translate characters in a string using a translation map. + + Args: + text (str): The string to translate. + translation_map (dict[str, str]): A dictionary mapping characters to their replacements. + + Returns: + The translated string. + + Examples: + >>> character_translation("Profit&Loss", {"&": "_and"}) + "Profit_and_Loss" + """ + for character, replacement in translation_map.items(): + text = text.replace(character, replacement) + return text diff --git a/tests/etl/test_transforms.py b/tests/etl/test_transforms.py new file mode 100644 index 0000000..a1e8ae9 --- /dev/null +++ b/tests/etl/test_transforms.py @@ -0,0 +1,38 @@ + +import polars as pl +from msfabricutils.etl import get_default_config +from msfabricutils.etl.transforms import add_audit_columns_transform +from polars.testing import assert_frame_equal +from freezegun import freeze_time +from datetime import datetime, timezone + + +@freeze_time("2024-12-06 12:00:00") +def test_add_audit_columns(): + config = get_default_config() + + df = pl.DataFrame( + { + "data": [1, 2, 3] + } + ) + + actual_df = add_audit_columns_transform(df, config) + + datetime_now = datetime(2024, 12, 6, 12, 0, tzinfo=timezone.utc) + + expected_df = pl.DataFrame( + { + "data": [1, 2, 3], + "__created_at": [datetime_now, datetime_now, datetime_now], + "__modified_at": [datetime_now, datetime_now, datetime_now], + "__deleted_at": [None, None, None], + "__valid_from": [datetime_now, datetime_now, datetime_now], + "__valid_to": [datetime_now, datetime_now, datetime_now], + }, + schema_overrides={ + "__deleted_at": pl.Datetime(time_zone="UTC") + } + ) + assert actual_df.columns == expected_df.columns + assert_frame_equal(actual_df, expected_df) diff --git a/tests/helpers/test_escape_object_name.py b/tests/helpers/test_escape_object_name.py new file mode 100644 index 0000000..3b8bc9c --- /dev/null +++ b/tests/helpers/test_escape_object_name.py @@ -0,0 +1,9 @@ +from msfabricutils.helpers.quote_identifier import quote_identifier + +def test_quote_character(): + assert quote_identifier("my_object") == '"my_object"' + assert quote_identifier("my_object", "'") == "'my_object'" + assert quote_identifier('"my_object"') == '"my_object"' + assert quote_identifier("'''my_object'''", "'") == "'my_object'" + assert quote_identifier("") == '""' + diff --git a/tests/helpers/test_merge_helpers.py b/tests/helpers/test_merge_helpers.py new file mode 100644 index 0000000..d1bc2dd --- /dev/null +++ b/tests/helpers/test_merge_helpers.py @@ -0,0 +1,51 @@ +from msfabricutils.helpers.merge_helpers import build_merge_predicate, build_when_matched_update_predicate, build_when_matched_update_columns +import re + +def test_build_merge_predicate(): + columns = ["column1", "column2"] + expected_output = """ + (target."column1" = source."column1") + AND + (target."column2" = source."column2") + """ + + actual_output = build_merge_predicate(columns) + assert re.sub(r"\s+", " ", actual_output.strip()) == re.sub(r"\s+", " ", expected_output.strip()) + +def test_build_when_matched_update_predicate(): + + column_names = ["column1", "column2", "column3"] + expected_output = """ + ( + (target."column1" != source."column1") + OR (target."column1" IS NULL AND source."column1" IS NOT NULL) + OR (target."column1" IS NOT NULL AND source."column1" IS NULL) + ) + OR + ( + (target."column2" != source."column2") + OR (target."column2" IS NULL AND source."column2" IS NOT NULL) + OR (target."column2" IS NOT NULL AND source."column2" IS NULL) + ) + OR + ( + (target."column3" != source."column3") + OR (target."column3" IS NULL AND source."column3" IS NOT NULL) + OR (target."column3" IS NOT NULL AND source."column3" IS NULL) + ) + """ + + actual_output = build_when_matched_update_predicate(column_names) + assert re.sub(r"\s+", " ", actual_output.strip()) == re.sub(r"\s+", " ", expected_output.strip()) + + +def test_build_when_matched_update_columns(): + column_names = ["column1", "column2", "column3"] + expected_output = { + 'target."column1"': 'source."column1"', + 'target."column2"': 'source."column2"', + 'target."column3"': 'source."column3"' + } + + actual_output = build_when_matched_update_columns(column_names) + assert actual_output == expected_output \ No newline at end of file diff --git a/tests/helpers/test_separator_indices.py b/tests/helpers/test_separator_indices.py index dd38f0e..c3c59a0 100644 --- a/tests/helpers/test_separator_indices.py +++ b/tests/helpers/test_separator_indices.py @@ -19,3 +19,8 @@ def test_seperator_indices_string_with_both_quotes(): def test_seperator_indices_only_dots(): chars = "..." assert _separator_indices(chars, ".") == [0, 1, 2] + + +def test_seperator_indices_start_and_end_with_dots(): + chars = ".my-text." + assert _separator_indices(chars, ".") == [0, 8] diff --git a/tests/helpers/test_string_normalization.py b/tests/helpers/test_string_normalization.py new file mode 100644 index 0000000..676a555 --- /dev/null +++ b/tests/helpers/test_string_normalization.py @@ -0,0 +1,42 @@ +from msfabricutils.helpers.string_normalization import to_snake_case, character_translation +from msfabricutils.etl import get_default_config + +def test_to_snake_case(): + assert to_snake_case("CustomerID") == "customer_id" + assert to_snake_case("IDNumber") == "id_number" + assert to_snake_case("DebtorIDNumber") == "debtor_id_number" + assert to_snake_case("HTMLLink") == "html_link" + assert to_snake_case("ThisIsATest") == "this_is_a_test" + assert to_snake_case("__DebtorID") == "__debtor_id" + assert to_snake_case("This-Is-A-Test") == "this_is_a_test" + assert to_snake_case("I'm a teapot") == "i'm_a_teapot" + assert to_snake_case("__batch_id") == "__batch_id" + assert to_snake_case("__created_at") == "__created_at" + assert to_snake_case("__modified_at") == "__modified_at" + assert to_snake_case("__valid_from") == "__valid_from" + assert to_snake_case("__valid_to") == "__valid_to" + +def test_special_character_translation(): + assert character_translation("Profit&Loss", {"&": "_and_"}) == "Profit_and_Loss" + assert character_translation("Profit/Loss", {"/": "_or_"}) == "Profit_or_Loss" + assert character_translation("Profit*Loss", {"*": "_times_"}) == "Profit_times_Loss" + assert character_translation("Profit\\Loss", {"\\": "_or_"}) == "Profit_or_Loss" + assert character_translation("Profit(Loss", {"(": "_"}) == "Profit_Loss" + assert character_translation("Profit)Loss", {")": "_"}) == "Profit_Loss" + + +def test_default_normalization_strategy(): + + config = get_default_config() + + def combined_normalization(text: str) -> str: + translated = character_translation(text, config.character_translation_map) + return to_snake_case(translated) + + assert combined_normalization("Profit&Loss") == "profit_and_loss" + assert combined_normalization("Profit/Loss") == "profit_or_loss" + assert combined_normalization("Profit*Loss") == "profit_times_loss" + assert combined_normalization("Profit\\Loss") == "profit_or_loss" + assert combined_normalization("Profit(Loss") == "profit_loss" + assert combined_normalization("Profit)Loss") == "profit_loss" + assert combined_normalization("Growth% and Loss + EBIDTA") == "growth_percent_and_loss_plus_ebidta" \ No newline at end of file From 4afcf703698fef4dad953531e257a96f30b98fcc Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:12:42 +0100 Subject: [PATCH 04/16] add docs --- docs/common/fabric-duckdb-connection.md | 3 + docs/common/utilities.md | 5 ++ docs/core/authentication.md | 3 + docs/core/fabric-api.md | 4 + docs/etl/index.md | 3 + docs/etl/sinks.md | 3 + docs/etl/sources.md | 4 + docs/etl/transforms.md | 3 + docs/index.md | 62 +++++----------- docs/usage/etl.md | 99 +++++++++++++++++++++++++ docs/usage/fabric-api.md | 11 +++ docs/usage/installation.md | 7 ++ mkdocs.yml | 42 ++++++++--- pyproject.toml | 2 +- 14 files changed, 196 insertions(+), 55 deletions(-) create mode 100644 docs/common/fabric-duckdb-connection.md create mode 100644 docs/common/utilities.md create mode 100644 docs/core/authentication.md create mode 100644 docs/core/fabric-api.md create mode 100644 docs/etl/index.md create mode 100644 docs/etl/sinks.md create mode 100644 docs/etl/sources.md create mode 100644 docs/etl/transforms.md create mode 100644 docs/usage/etl.md create mode 100644 docs/usage/fabric-api.md create mode 100644 docs/usage/installation.md diff --git a/docs/common/fabric-duckdb-connection.md b/docs/common/fabric-duckdb-connection.md new file mode 100644 index 0000000..382cdda --- /dev/null +++ b/docs/common/fabric-duckdb-connection.md @@ -0,0 +1,3 @@ +# Fabric DuckDB Connection + +::: msfabricutils.common.fabric_duckdb_connection \ No newline at end of file diff --git a/docs/common/utilities.md b/docs/common/utilities.md new file mode 100644 index 0000000..776adc5 --- /dev/null +++ b/docs/common/utilities.md @@ -0,0 +1,5 @@ +# Utilities + +::: msfabricutils.common.quote_identifier +::: msfabricutils.common.separator_indices +::: msfabricutils.common.string_normalization \ No newline at end of file diff --git a/docs/core/authentication.md b/docs/core/authentication.md new file mode 100644 index 0000000..75b427b --- /dev/null +++ b/docs/core/authentication.md @@ -0,0 +1,3 @@ +# Authentication + +::: msfabricutils.core.auth \ No newline at end of file diff --git a/docs/core/fabric-api.md b/docs/core/fabric-api.md new file mode 100644 index 0000000..258a69b --- /dev/null +++ b/docs/core/fabric-api.md @@ -0,0 +1,4 @@ +# Fabric API + +::: msfabricutils.core.lakehouse +::: msfabricutils.core.workspace diff --git a/docs/etl/index.md b/docs/etl/index.md new file mode 100644 index 0000000..923f424 --- /dev/null +++ b/docs/etl/index.md @@ -0,0 +1,3 @@ +# ETL + +The `msfabricutils.etl` module provides a set of functions and classes to facilitate ETL (Extract, Transform, Load) operations in the Microsoft Fabric environment. diff --git a/docs/etl/sinks.md b/docs/etl/sinks.md new file mode 100644 index 0000000..7e9a13c --- /dev/null +++ b/docs/etl/sinks.md @@ -0,0 +1,3 @@ +# Sinks + +::: msfabricutils.etl.sinks diff --git a/docs/etl/sources.md b/docs/etl/sources.md new file mode 100644 index 0000000..81047c9 --- /dev/null +++ b/docs/etl/sources.md @@ -0,0 +1,4 @@ +# Sources + +::: msfabricutils.etl.sources + diff --git a/docs/etl/transforms.md b/docs/etl/transforms.md new file mode 100644 index 0000000..ddf8f66 --- /dev/null +++ b/docs/etl/transforms.md @@ -0,0 +1,3 @@ +# Transforms + +::: msfabricutils.etl.transforms diff --git a/docs/index.md b/docs/index.md index 883daff..e3c48bd 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,55 +1,33 @@ # MSFabricUtils -A collection of **Spark-free** Python utilities for working with Microsoft Fabric in the Python Notebook experience. +A collection of **Spark-free** and **local-first** Python utilities for working with Microsoft Fabric Lakehouses locally or in the Fabric Python Notebook experience. ## Features -### Local development first -- Aim to provide a local development support "within" Fabric +- **ETL Utilities** - Extract, Transform, Load data from and to Microsoft Fabric Lakehouses. While the utilities can be configured to fit different needs, its defaults are highly opinionated for what we believe are sensible defaults for many use cases +- **Fabric API** - Access Fabric APIs from Python, such as workspaces and lakehouses +- **Local development first** - Aim to provide a local development for Microsoft Fabric solutions +- **DuckDB Connection** + - Seamless integration between DuckDB and Microsoft Fabric Lakehouses + - Cross-lakehouse and cross-workspace querying + - Delta Lake writing features -### DuckDB Connection -- Seamless integration between DuckDB and Microsoft Fabric Lakehouses -- Cross-lakehouse and cross-workspace querying -- Delta Lake writing features +## Core dependencies -### Ideas for improvements -- ETL/ELT Helpers -- Lakehouse Management Tools -- Got an idea? Add an issue on [github](https://www.github.com/mrjsj/msfabricutils/issues) +MSFabricUtils is built on top of modern, high-performance Python libraries: -## Installation +- **[delta-rs](https://delta-io.github.io/delta-rs)** - A native Rust implementation of Delta Lake, providing fast and reliable Delta Lake operations without the need for a Spark cluster +- **[Polars](https://pola.rs)** - A lightning-fast DataFrame library written in Rust, offering superior performance for data manipulation tasks +- **[DuckDB](https://duckdb.org)** - An embedded analytical database engine, enabling SQL queries with at blazing speed -```bash -pip install msfabricutils -``` +These dependencies were chosen specifically to: -## Quick Start +- Provide Spark-like functionality without the overhead of a Spark cluster +- Enable high-performance data processing on a single machine +- Support both local development and cloud deployment scenarios +- Maintain compatibility with Delta Lake format used in Microsoft Fabric -Ensure you are working in a Python Notebook: - -![Select Python Notebook](images/select-python-notebooks.png) - -```python -from msfabricutils import FabricDuckDBConnection, get_onelake_access_token - -#Initialize connection -access_token = get_onelake_access_token() - -conn = FabricDuckDBConnection(access_token=access_token) - -#Register lakehouses -conn.register_workspace_lakehouses( - workspace_id = "your-workspace-id", - lakehouses = ["sales", "marketing"] -) - -# Query across lakehouses -df = conn.sql(""" - SELECT - * - FROM sales.customers - JOIN marketing.segments USING (customer_id) -""").df() -``` +## Ideas for improvements +Got an idea? Add an issue on [github](https://www.github.com/mrjsj/msfabricutils/issues)! diff --git a/docs/usage/etl.md b/docs/usage/etl.md new file mode 100644 index 0000000..32485ec --- /dev/null +++ b/docs/usage/etl.md @@ -0,0 +1,99 @@ +# ETL + +The ETL module provides a set of helper functions for extracting, transforming, and loading data in a Fabric Lakehouse. + + + +## Basic extract, transform, load + +```python +from msfabricutils.etl import ( + source_parquet, + upsert_scd_type_1 +) +import polars as pl + +# Create a source parquet file +df = pl.DataFrame({ + "id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"] +}) +df.write_parquet("source.parquet") + +# Read the source parquet file +source_df = source_parquet("source.parquet") + +# Upsert to a target table +upsert_scd_type_1(dedup_df, "target_table") +``` + + + +## Advanced example + +Consider an example, where + +- The source needs to be incrementially loaded +- The source has duplicate rows +- The target table need audit columns, such as created_at, updated_at etc. +- The target table should have normalized column names + +The following code will read the delta table, deduplicate the rows, normalize the column names, add audit columns and upsert to the target table. +The result will be a new row for AliceS and BobB in the target table. + +```python +from msfabricutils.etl import ( + get_default_config, + source_delta, + get_incremental_column_value, + deduplicate_transform, + normalize_column_names_transform, + add_audit_columns_transform, + upsert_scd_type_1, +) + +source_table_path = "source_table" +target_table_path = "target_table" + +# Create a source table +source_df = pl.DataFrame({ + "ID": [1, 2, 3, 1, 2], + "FirstName": ["Alice", "Bob", "Charlie", "AliceS", "BobB"], + "batch_id": [1, 1, 1, 2, 2] +}) +source_df.write_delta(source_table_path) + +# Get the default config +config = get_default_config() + +# Read the source delta table +source_df = source_delta(source_table_path) + +# Get the incremental column value +incremental_column_value = get_incremental_column_value(target_table_path, "batch_id") + +# Filter the source dataframe to only get the rows with a modified_at greater than the incremental column value +filtered_df = source_df.filter(pl.col("batch_id") > incremental_column_value) + +# Deduplicate the source dataframe +deduped_df = deduplicate_transform(filtered_df, primary_key_columns="ID", deduplication_order_columns="batch_id") + +# Normalize the column names +normalized_df = normalize_column_names_transform(deduped_df, config) + +# Add audit columns +audit_df = add_audit_columns_transform(normalized_df, config) + +# Upsert to a target table +upsert_scd_type_1(target_table_path, audit_df, primary_key_columns="ID", config=config) + + +``` + +| id | first_name | batch_id | __created_at | __modified_at | __deleted_at | __valid_from | __valid_to | +| --- | ---------- | -------- | ----------------- | ----------------- | ----------------- | ----------------------- | ----------------- | +| i64 | str | i64 | datetime[μs, UTC] | datetime[μs, UTC] | datetime[μs, UTC] | datetime[μs, UTC] | datetime[μs, UTC] | +| 3 | Charlie | 1 | 2024-01-01 00:00:00 UTC | 2024-01-01 00:00:00 UTC | null | 2024-01-01 00:00:00 UTC | null | +| 2 | BobB | 2 | 2024-01-01 00:00:00 UTC | 2024-01-01 00:00:00 UTC | null | 2024-01-01 00:00:00 UTC | null | +| 1 | AliceS | 2 | 2024-01-01 00:00:00 UTC | 2024-01-01 00:00:00 UTC | null | 2024-01-01 00:00:00 UTC | null | + diff --git a/docs/usage/fabric-api.md b/docs/usage/fabric-api.md new file mode 100644 index 0000000..35746b5 --- /dev/null +++ b/docs/usage/fabric-api.md @@ -0,0 +1,11 @@ +# Fabric API + +A collection of helper functions for working with the Fabric API. + +## List workspaces + +```python +from msfabricutils.fabric import get_workspaces + +get_workspaces() +``` diff --git a/docs/usage/installation.md b/docs/usage/installation.md new file mode 100644 index 0000000..30babc3 --- /dev/null +++ b/docs/usage/installation.md @@ -0,0 +1,7 @@ +# Installation + +The `msfabricutils` package can be installed using pip: + +```bash +pip install msfabricutils +``` diff --git a/mkdocs.yml b/mkdocs.yml index 98d0f44..d233443 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -18,15 +18,22 @@ theme: toggle: icon: material/brightness-4 name: Switch to light mode + navigation_depth: 3 features: - - navigation.instant - navigation.tracking - - navigation.sections + - navigation.instant - navigation.expand - - navigation.top + - navigation.tabs + - navigation.indexes + - navigation.tabs.sticky + - navigation.footer + - content.tabs.link + - content.code.annotation + - content.code.copy plugins: - - search + - search: + lang: en - gen-files: scripts: - docs/gen_ref_pages.py @@ -36,10 +43,10 @@ plugins: python: paths: [src] options: - show_root_toc_entry: false + show_root_toc_entry: true docstring_style: google show_source: false - show_root_heading: true + show_root_heading: false filters: ["!^_", "^__init__$"] show_object_full_path: false heading_level: 1 @@ -49,14 +56,24 @@ plugins: docstring_section_style: table nav: - - Home: index.md + - Home: + - index.md + - Usage: + - usage/installation.md + - usage/etl.md + - usage/fabric-api.md - API Reference: - - FabricDuckDBConnection: reference/msfabricutils/fabric_duckdb_connection.md + - Common: + - common/fabric-duckdb-connection.md + - common/utilities.md - Core: - - Authentication: reference/msfabricutils/core/auth.md - - Utilities: - - Lakehouse: reference/msfabricutils/core/lakehouse.md - - Workspace: reference/msfabricutils/core/workspace.md + - core/authentication.md + - core/fabric-api.md + - ETL: + - etl/index.md + - etl/sources.md + - etl/transforms.md + - etl/sinks.md # Formatting options markdown_extensions: @@ -64,6 +81,7 @@ markdown_extensions: - pymdownx.details - attr_list - pymdownx.superfences + - tables - pymdownx.tabbed: alternate_style: true - pymdownx.snippets: diff --git a/pyproject.toml b/pyproject.toml index 7ee26ae..cd8f6f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dev = [ line-length = 100 [tool.ruff.lint] -extend-select = ["Q001"] +extend-select = ["Q001", "I001"] [tool.pytest.ini_options] pythonpath = [ From f15a643f8d550410275c61d4897dbc40086c3d7e Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:13:25 +0100 Subject: [PATCH 05/16] relocate duckdb conn --- src/msfabricutils/__init__.py | 2 +- .../{ => common}/fabric_duckdb_connection.py | 114 +++++++++--------- 2 files changed, 57 insertions(+), 59 deletions(-) rename src/msfabricutils/{ => common}/fabric_duckdb_connection.py (90%) diff --git a/src/msfabricutils/__init__.py b/src/msfabricutils/__init__.py index 41aa9b0..49abffb 100644 --- a/src/msfabricutils/__init__.py +++ b/src/msfabricutils/__init__.py @@ -1,4 +1,4 @@ -from msfabricutils.fabric_duckdb_connection import FabricDuckDBConnection +from msfabricutils.common.fabric_duckdb_connection import FabricDuckDBConnection from msfabricutils.core import ( get_fabric_bearer_token, get_onelake_access_token, diff --git a/src/msfabricutils/fabric_duckdb_connection.py b/src/msfabricutils/common/fabric_duckdb_connection.py similarity index 90% rename from src/msfabricutils/fabric_duckdb_connection.py rename to src/msfabricutils/common/fabric_duckdb_connection.py index 673a8a2..93cd1b8 100644 --- a/src/msfabricutils/fabric_duckdb_connection.py +++ b/src/msfabricutils/common/fabric_duckdb_connection.py @@ -1,20 +1,18 @@ from typing import Any -from msfabricutils.helpers import _separator_indices +import duckdb +import sqlglot +from deltalake import write_deltalake +from sqlglot import exp + +from msfabricutils.common import _separator_indices from msfabricutils.core import ( get_workspace, - get_workspace_lakehouses, get_workspace_lakehouse_tables, + get_workspace_lakehouses, ) -import duckdb -from deltalake import write_deltalake - -import sqlglot -from sqlglot import exp - - class FabricDuckDBConnection: """A DuckDB connection wrapper for Microsoft Fabric Lakehouses. @@ -34,33 +32,33 @@ class FabricDuckDBConnection: Example: ```python - >>> # Initialize connection - >>> access_token = notebookutils.credentials.getToken('storage') - >>> conn = FabricDuckDBConnection(access_token=access_token) - >>> - >>> # Register lakehouses from different workspaces - >>> conn.register_workspace_lakehouses( - ... workspace_id='12345678-1234-5678-1234-567812345678', - ... lakehouses=['sales', 'marketing'] - ... ) - >>> conn.register_workspace_lakehouses( - ... workspace_id='87654321-8765-4321-8765-432187654321', - ... lakehouses=['marketing'] - ... ) - >>> - >>> # Query across workspaces using fully qualified names - >>> df = conn.sql(''' - ... SELECT - ... c.customer_id, - ... c.name, - ... c.region, - ... s.segment, - ... s.lifetime_value - ... FROM sales_workspace.sales.main.customers c - ... JOIN marketing_workspace.marketing.main.customer_segments s - ... ON c.customer_id = s.customer_id - ... WHERE c.region = 'EMEA' - ... ''').df() + # Initialize connection + access_token = notebookutils.credentials.getToken('storage') + conn = FabricDuckDBConnection(access_token=access_token) + + # Register lakehouses from different workspaces + conn.register_workspace_lakehouses( + workspace_id='12345678-1234-5678-1234-567812345678', + lakehouses=['sales', 'marketing'] + ) + conn.register_workspace_lakehouses( + workspace_id='87654321-8765-4321-8765-432187654321', + lakehouses=['marketing'] + ) + + # Query across workspaces using fully qualified names + df = conn.sql(\"\"\" + SELECT + c.customer_id, + c.name, + c.region, + s.segment, + s.lifetime_value + FROM sales_workspace.sales.main.customers c + JOIN marketing_workspace.marketing.main.customer_segments s + ON c.customer_id = s.customer_id + WHERE c.region = 'EMEA' + \"\"\").df() ``` """ @@ -95,12 +93,12 @@ def refresh_access_token(self, access_token: str): Example: ```python - >>> # Initialize connection - >>> conn = FabricDuckDBConnection(access_token='old_token') - >>> - >>> # When token expires, refresh it - >>> new_token = notebookutils.credentials.getToken('storage') - >>> conn.refresh_access_token(new_token) + # Initialize connection + conn = FabricDuckDBConnection(access_token='old_token') + + # When token expires, refresh it + new_token = notebookutils.credentials.getToken('storage') + conn.refresh_access_token(new_token) ``` """ self._access_token = access_token @@ -387,21 +385,21 @@ def register_workspace_lakehouses(self, workspace_id: str, lakehouses: str | lis Example: ```python - >>> # Initialize connection with access token - >>> access_token = notebookutils.credentials.getToken('storage') - >>> conn = FabricDuckDBConnection(access_token=access_token) - >>> - >>> # Register a single lakehouse - >>> conn.register_workspace_lakehouses( - ... workspace_id='12345678-1234-5678-1234-567812345678', - ... lakehouses='sales_lakehouse' - ... ) - >>> - >>> # Register multiple lakehouses - >>> conn.register_workspace_lakehouses( - ... workspace_id='12345678-1234-5678-1234-567812345678', - ... lakehouses=['sales_lakehouse', 'marketing_lakehouse'] - ... ) + # Initialize connection with access token + access_token = notebookutils.credentials.getToken('storage') + conn = FabricDuckDBConnection(access_token=access_token) + + # Register a single lakehouse + conn.register_workspace_lakehouses( + workspace_id='12345678-1234-5678-1234-567812345678', + lakehouses='sales_lakehouse' + ) + + # Register multiple lakehouses + conn.register_workspace_lakehouses( + workspace_id='12345678-1234-5678-1234-567812345678', + lakehouses=['sales_lakehouse', 'marketing_lakehouse'] + ) ``` """ @@ -443,7 +441,7 @@ def print_lakehouse_catalog(self): Example: ```python - >>> conn.print_lakehouse_catalog() + conn.print_lakehouse_catalog() 📁 Database: workspace1.sales_lakehouse └─📂 Schema: main ├─📄 customers From 0a0b8e7b87018ae63a183fe35cd3a0f27427ee9a Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:13:57 +0100 Subject: [PATCH 06/16] add common utils --- src/msfabricutils/common/__init__.py | 10 +++++ src/msfabricutils/common/quote_identifier.py | 20 +++++++++ src/msfabricutils/common/separator_indices.py | 31 +++++++++++++ .../common/string_normalization.py | 45 +++++++++++++++++++ 4 files changed, 106 insertions(+) create mode 100644 src/msfabricutils/common/__init__.py create mode 100644 src/msfabricutils/common/quote_identifier.py create mode 100644 src/msfabricutils/common/separator_indices.py create mode 100644 src/msfabricutils/common/string_normalization.py diff --git a/src/msfabricutils/common/__init__.py b/src/msfabricutils/common/__init__.py new file mode 100644 index 0000000..277fb41 --- /dev/null +++ b/src/msfabricutils/common/__init__.py @@ -0,0 +1,10 @@ +from .quote_identifier import quote_identifier +from .separator_indices import _separator_indices +from .string_normalization import character_translation, to_snake_case + +__all__ = ( + "_separator_indices", + "quote_identifier", + "to_snake_case", + "character_translation", +) diff --git a/src/msfabricutils/common/quote_identifier.py b/src/msfabricutils/common/quote_identifier.py new file mode 100644 index 0000000..d5a11a7 --- /dev/null +++ b/src/msfabricutils/common/quote_identifier.py @@ -0,0 +1,20 @@ +def quote_identifier(identifier: str, quote_character: str = '"') -> str: + """ + Quotes the given identifier by surrounding it with the specified quote character. + + Args: + identifier (str): The identifier to be quoted. + quote_character (str, optional): The character to use for quoting. Defaults to '"'. + + Returns: + The quoted identifier. + + Example: + ```python + quote_identifier("my_object") + '"my_object"' + quote_identifier("my_object", "'") + "'my_object'" + ``` + """ + return f"{quote_character}{identifier.strip(quote_character)}{quote_character}" diff --git a/src/msfabricutils/common/separator_indices.py b/src/msfabricutils/common/separator_indices.py new file mode 100644 index 0000000..c216c30 --- /dev/null +++ b/src/msfabricutils/common/separator_indices.py @@ -0,0 +1,31 @@ +def _separator_indices(string: str, separator: str) -> list[int]: + """Find indices of a separator character in a string, ignoring separators inside quotes. + + Args: + string (str): The input string to search through + separator (str): The separator character to find + + Returns: + A list of indices where the separator character appears outside of quotes + + Example: + ```python + separator_indices('a,b,"c,d",e', ',') + [1, 8] + ``` + """ + inside_double_quotes = False + inside_single_quotes = False + indices = [] + + for idx, char in enumerate(string): + if char == '"' and not inside_single_quotes: + inside_double_quotes = not inside_double_quotes + elif char == "'" and not inside_double_quotes: + inside_single_quotes = not inside_single_quotes + elif inside_double_quotes or inside_single_quotes: + continue + elif char == separator: + indices.append(idx) + + return indices diff --git a/src/msfabricutils/common/string_normalization.py b/src/msfabricutils/common/string_normalization.py new file mode 100644 index 0000000..1f8ed52 --- /dev/null +++ b/src/msfabricutils/common/string_normalization.py @@ -0,0 +1,45 @@ +import re + + +def to_snake_case(text: str) -> str: + """Convert a string to snake case. + + Args: + text (str): The string to convert to snake case. Can be converted from PascalCase, camelCase, kebab-case, or mixed case. Non-alphanumeric characters are converted to underscores. + + Returns: + The string in snake case. + + Example: + ```python + to_snake_case("CustomerID") + "customer_id" + ``` + """ + text = text.replace(" ", "_") + text = text.replace("-", "_") + text = re.sub(r"([a-z])([A-Z0-9])", r"\1_\2", text) + text = re.sub(r"([A-Z0-9])([A-Z0-9][a-z])", r"\1_\2", text) + text = re.sub(r"(? str: + """Translate characters in a string using a translation map. + + Args: + text (str): The string to translate. + translation_map (dict[str, str]): A dictionary mapping characters to their replacements. + + Returns: + The translated string. + + Example: + ```python + character_translation("Profit&Loss", {"&": "_and"}) + "Profit_and_Loss" + ``` + """ + for character, replacement in translation_map.items(): + text = text.replace(character, replacement) + return text From a3fffad6515dbf93271c9247bc58933991a00fcf Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:14:47 +0100 Subject: [PATCH 07/16] better core docs, ruff lint --- src/msfabricutils/core/__init__.py | 4 ++-- src/msfabricutils/core/auth.py | 16 ++++++++-------- src/msfabricutils/core/generic.py | 1 + src/msfabricutils/core/lakehouse.py | 21 ++++++++++++++++----- src/msfabricutils/core/workspace.py | 20 +++++++++++++++----- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/src/msfabricutils/core/__init__.py b/src/msfabricutils/core/__init__.py index 7ac75f2..5a17bc9 100644 --- a/src/msfabricutils/core/__init__.py +++ b/src/msfabricutils/core/__init__.py @@ -1,6 +1,6 @@ -from .workspace import get_workspaces, get_workspace -from .lakehouse import get_workspace_lakehouse_tables, get_workspace_lakehouses from .auth import get_fabric_bearer_token, get_onelake_access_token +from .lakehouse import get_workspace_lakehouse_tables, get_workspace_lakehouses +from .workspace import get_workspace, get_workspaces __all__ = ( "get_workspace", diff --git a/src/msfabricutils/core/auth.py b/src/msfabricutils/core/auth.py index 78f3636..14586bb 100644 --- a/src/msfabricutils/core/auth.py +++ b/src/msfabricutils/core/auth.py @@ -13,7 +13,7 @@ def get_access_token(audience: str) -> str: """ try: - import notebookutils # type: ignore + import notebookutils # type: ignore token = notebookutils.credentials.getToken(audience) except ModuleNotFoundError: token = DefaultAzureCredential().get_token(f"{audience}/.default").token @@ -22,7 +22,7 @@ def get_access_token(audience: str) -> str: def get_onelake_access_token() -> str: """ - Retrieves an access token for Azure OneLake storage. + Retrieves an access token for OneLake storage. This function attempts to obtain an access token for accessing Azure storage. It first checks if the code is running in a Microsoft Fabric notebook environment @@ -39,16 +39,16 @@ def get_onelake_access_token() -> str: def get_fabric_bearer_token() -> str: """ - Retrieves a bearer token for Azure Fabric (Power BI) API. + Retrieves a bearer token for Fabric (Power BI) API. This function attempts to obtain a bearer token for authenticating requests to the - Azure Power BI API. It first checks if the code is running in a Microsoft Fabric + Power BI API. It first checks if the code is running in a Microsoft Fabric notebook environment and tries to use the `notebookutils` library to get the token. If the library is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. Returns: - The bearer token used for authenticating requests to the Azure Fabric (Power BI) API. + The bearer token used for authenticating requests to the Fabric (Power BI) API. """ audience = "https://analysis.windows.net/powerbi/api" return get_access_token(audience) @@ -56,10 +56,10 @@ def get_fabric_bearer_token() -> str: def get_azure_devops_access_token() -> str: """ - Retrieves a bearer token for Azure Fabric (Power BI) API. + Retrieves a bearer token for Fabric (Power BI) API. This function attempts to obtain a bearer token for authenticating requests to the - Azure Power BI API. It first checks if the code is running in a Microsoft Fabric + Power BI API. It first checks if the code is running in a Microsoft Fabric notebook environment and tries to use the `notebookutils` library to get the token. If the library is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. @@ -73,7 +73,7 @@ def get_azure_devops_access_token() -> str: def get_storage_options() -> dict[str, str]: """ - Retrieves storage options including a bearer token for Azure OneLake storage. + Retrieves storage options including a bearer token for OneLake storage. This function calls `get_onelake_access_token` to obtain a bearer token and returns a dictionary containing the token and a flag indicating diff --git a/src/msfabricutils/core/generic.py b/src/msfabricutils/core/generic.py index 2d4b9e1..7651118 100644 --- a/src/msfabricutils/core/generic.py +++ b/src/msfabricutils/core/generic.py @@ -1,4 +1,5 @@ import requests + from msfabricutils.core.auth import get_fabric_bearer_token diff --git a/src/msfabricutils/core/lakehouse.py b/src/msfabricutils/core/lakehouse.py index cb77a5d..d0a6704 100644 --- a/src/msfabricutils/core/lakehouse.py +++ b/src/msfabricutils/core/lakehouse.py @@ -15,8 +15,12 @@ def get_workspace_lakehouses(workspace_id: str) -> list[dict]: Returns: A list of dictionaries containing lakehouse data for the specified workspace. - See Also: - `get_paginated`: A helper function that handles paginated API requests. + Example: + ```python + from msfabricutils.core import get_workspace_lakehouses + + lakehouses = get_workspace_lakehouses("12345678-1234-1234-1234-123456789012") + ``` """ endpoint = f"workspaces/{workspace_id}/lakehouses" data_key = "value" @@ -37,10 +41,17 @@ def get_workspace_lakehouse_tables(workspace_id: str, lakehouse_id: str) -> list lakehouse_id (str): The ID of the lakehouse to retrieve tables from. Returns: - list[dict]: A list of dictionaries containing table data for the specified lakehouse. + A list of dictionaries containing table data for the specified lakehouse. + + Example: + ```python + from msfabricutils.core import get_workspace_lakehouse_tables - See Also: - `get_paginated`: A helper function that handles paginated API requests. + tables = get_workspace_lakehouse_tables( + "12345678-1234-1234-1234-123456789012", + "beefbeef-beef-beef-beef-beefbeefbeef" + ) + ``` """ endpoint = f"workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables" data_key = "data" diff --git a/src/msfabricutils/core/workspace.py b/src/msfabricutils/core/workspace.py index 0c9f914..39c0ca7 100644 --- a/src/msfabricutils/core/workspace.py +++ b/src/msfabricutils/core/workspace.py @@ -1,6 +1,8 @@ -from msfabricutils.core.generic import get_paginated, get_page from typing import Any +from msfabricutils.core.generic import get_page, get_paginated + + def get_workspaces() -> list[dict[str, Any]]: """ Retrieves a list of workspaces. @@ -12,8 +14,12 @@ def get_workspaces() -> list[dict[str, Any]]: Returns: A list of dictionaries containing data for the available workspaces. - See Also: - `get_paginated`: A helper function that handles paginated API requests. + Example: + ```python + from msfabricutils.core import get_workspaces + + workspaces = get_workspaces() + ``` """ endpoint = "workspaces" data_key = "value" @@ -34,8 +40,12 @@ def get_workspace(workspace_id: str) -> dict[str, Any]: Returns: A dictionary containing the details of the specified workspace. - See Also: - `get_page`: A helper function that retrieves a single page of data from the API. + Example: + ```python + from msfabricutils.core import get_workspace + + workspace = get_workspace("12345678-1234-1234-1234-123456789012") + ``` """ endpoint = f"workspaces/{workspace_id}" From fd20f35f1f4fe6fc0180d00a75efffc5136bdb35 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:18:51 +0100 Subject: [PATCH 08/16] add etl helpers --- src/msfabricutils/etl/__init__.py | 24 ++ src/msfabricutils/etl/config.py | 219 ++++++++++++++++++ src/msfabricutils/etl/helpers/__init__.py | 13 ++ .../helpers/get_incremental_column_value.py | 32 +++ .../etl/helpers/merge_helpers.py | 100 ++++++++ src/msfabricutils/etl/types.py | 3 + 6 files changed, 391 insertions(+) create mode 100644 src/msfabricutils/etl/__init__.py create mode 100644 src/msfabricutils/etl/config.py create mode 100644 src/msfabricutils/etl/helpers/__init__.py create mode 100644 src/msfabricutils/etl/helpers/get_incremental_column_value.py create mode 100644 src/msfabricutils/etl/helpers/merge_helpers.py create mode 100644 src/msfabricutils/etl/types.py diff --git a/src/msfabricutils/etl/__init__.py b/src/msfabricutils/etl/__init__.py new file mode 100644 index 0000000..34257d4 --- /dev/null +++ b/src/msfabricutils/etl/__init__.py @@ -0,0 +1,24 @@ +from .config import AuditColumn, Config, IncrementalColumn, create_config, get_default_config +from .helpers import get_incremental_column_value +from .sinks import upsert_scd_type_1 +from .sources import source_delta, source_parquet +from .transforms import ( + add_audit_columns_transform, + deduplicate_transform, + normalize_column_names_transform, +) + +__all__ = ( + "get_default_config", + "create_config", + "Config", + "IncrementalColumn", + "AuditColumn", + "upsert_scd_type_1", + "source_parquet", + "source_delta", + "deduplicate_transform", + "normalize_column_names_transform", + "add_audit_columns_transform", + "get_incremental_column_value", +) diff --git a/src/msfabricutils/etl/config.py b/src/msfabricutils/etl/config.py new file mode 100644 index 0000000..a300619 --- /dev/null +++ b/src/msfabricutils/etl/config.py @@ -0,0 +1,219 @@ +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Callable + +import polars as pl + +from msfabricutils.common import character_translation, to_snake_case + + +@dataclass +class IncrementalColumn: + """ + Represents an incremental column in the configuration. + + Attributes: + name (str): The name of the incremental column. + data_type (pl.DataType): The data type of the incremental column. + + Example: + ```python + incremental_column = IncrementalColumn("batch_id", pl.Int64) + ``` + """ + name: str + data_type: pl.DataType + +@dataclass +class AuditColumn: + """ + Represents an audit column in the configuration. + + Attributes: + name (str): The name of the audit column. + default_value (pl.Expr): The default value expression for the audit column. + + Example: + ```python + audit_column = AuditColumn( + "__created_at", + pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC")) + ) + ``` + """ + name: str + default_value: pl.Expr + +@dataclass() +class Config: + """ + Configuration class that holds various columns and their properties. + + Attributes: + incremental_column (IncrementalColumn): The incremental column configuration. + column_created_at (AuditColumn): The created at audit column configuration. + column_modified_at (AuditColumn): The modified at audit column configuration. + column_deleted_at (AuditColumns): The deleted at audit column configuration. + column_valid_from (AuditColumn): The valid from audit column configuration. + column_valid_to (AuditColumn): The valid to audit column configuration. + character_translation_map (dict[str, str]): A mapping of special characters to their translations. + normalization_strategy (Callable[[str], str]): A function that takes a column name and returns the normalized name. + """ + incremental_column: IncrementalColumn + column_created_at: AuditColumn + column_modified_at: AuditColumn + column_deleted_at: AuditColumn + column_valid_from: AuditColumn + column_valid_to: AuditColumn + character_translation_map: dict[str, str] + normalization_strategy: Callable[[str], str] + + def __init__(self): + # TODO: Change to `__run_id` + self.incremental_column = IncrementalColumn("batch_id", pl.Int64) + self.column_created_at = AuditColumn("__created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) + self.column_modified_at = AuditColumn("__modified_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) + self.column_deleted_at = AuditColumn("__deleted_at", pl.lit(None).cast(pl.Datetime("us", "UTC"))) + self.column_valid_from = AuditColumn("__valid_from", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) + self.column_valid_to = AuditColumn("__valid_to", pl.lit(None).cast(pl.Datetime("us", "UTC"))) + self.character_translation_map = { + " ": "_", + "-": "_", + "'": "_", + '"': "_", + "(": "_", + ")": "_", + ",": "_", + ".": "_", + ":": "_", + ";": "_", + "!": "_", + "?": "_", + "|": "_or", + "[": "_", + "]": "_", + "{": "_", + "}": "_", + "&": "_and", + "/": "_or", + "\\": "_or", + "%": "_percent", + "+": "_plus", + "*": "_times", + "=": "_equals", + "<": "_lt", + ">": "_gt", + "@": "_at", + "$": "_dollar", + "~": "_approximate", + } + self.normalization_strategy = lambda name: to_snake_case(character_translation(name, self.character_translation_map)) + + def get_static_audit_columns(self) -> list[AuditColumn]: + """ + Returns a list of static audit columns, namely the `created_at` and `valid_from` columns. + + Returns: + A list containing the static audit columns. + + Example: + ```python + static_columns = config.get_static_audit_columns() + ``` + """ + return [ + self.column_created_at, + self.column_valid_from, + ] + + def get_dynamic_audit_columns(self) -> list[AuditColumn]: + """ + Returns a list of dynamic audit columns, namely the `modified_at` and `valid_to` columns. + + Returns: + A list containing the dynamic audit columns. + + Example: + ```python + dynamic_columns = config.get_dynamic_audit_columns() + ``` + """ + return [ + self.column_modified_at, + self.column_valid_to, + self.column_deleted_at, + ] + + def get_audit_columns(self) -> list[AuditColumn]: + """ + Returns a list of all audit columns, namely the `created_at`, `modified_at`, `valid_from`, and `valid_to` columns. + + Returns: + A list containing all audit columns. + + Example: + ```python + all_columns = config.get_audit_columns() + ``` + """ + return [ + self.column_created_at, + self.column_modified_at, + self.column_deleted_at, + self.column_valid_from, + self.column_valid_to, + ] + +def create_config(incremental_column: IncrementalColumn, created_at: AuditColumn, modified_at: AuditColumn, deleted_at: AuditColumn, valid_from: AuditColumn, valid_to: AuditColumn) -> Config: + """ + Creates a new Config instance with the provided audit and incremental columns. + + Args: + incremental_column (IncrementalColumn): The incremental column. + created_at (AuditColumn): The created at audit column. + modified_at (AuditColumn): The modified at audit column. + deleted_at (AuditColumn): The deleted at audit column. + valid_from (AuditColumn): The valid from audit column. + valid_to (AuditColumn): The valid to audit column. + + Returns: + A new instance of the Config class. + + Example: + ```python + incremental_column = IncrementalColumn("batch_id", pl.Int64) + ... + valid_to = AuditColumn("__valid_to", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) + config = create_config( + incremental_column, + created_at, + modified_at, + deleted_at, + valid_from, + valid_to, + ) + ``` + """ + return Config( + incremental_column, + created_at, + modified_at, + deleted_at, + valid_from, + valid_to, + ) + + +def get_default_config() -> Config: + """ + Returns a default Config instance with preset values. + + Returns: + A default instance of the Config class. + + Example: + ```python + default_config = get_default_config() + ``` + """ + return Config() \ No newline at end of file diff --git a/src/msfabricutils/etl/helpers/__init__.py b/src/msfabricutils/etl/helpers/__init__.py new file mode 100644 index 0000000..6b9bc33 --- /dev/null +++ b/src/msfabricutils/etl/helpers/__init__.py @@ -0,0 +1,13 @@ +from .get_incremental_column_value import get_incremental_column_value +from .merge_helpers import ( + build_merge_predicate, + build_when_matched_update_columns, + build_when_matched_update_predicate, +) + +__all__ = ( + "build_merge_predicate", + "build_when_matched_update_predicate", + "build_when_matched_update_columns", + "get_incremental_column_value", +) diff --git a/src/msfabricutils/etl/helpers/get_incremental_column_value.py b/src/msfabricutils/etl/helpers/get_incremental_column_value.py new file mode 100644 index 0000000..0f5bc90 --- /dev/null +++ b/src/msfabricutils/etl/helpers/get_incremental_column_value.py @@ -0,0 +1,32 @@ +import polars as pl +from deltalake import DeltaTable + +from msfabricutils.core.auth import get_storage_options + + +def get_incremental_column_value(table_uri: str, incremental_column: str) -> int: + """ + Retrieves the maximum value of the specified incremental column from a Delta table. + + Args: + table_uri (str): The URI of the Delta table. + incremental_column (str): The name of the incremental column. + + Returns: + The maximum value of the incremental column, or 0 if the table does not exist. + + Example: + ```python + from msfabricutils.etl import get_incremental_column_value + + max_value = get_incremental_column_value("path/to/delta_table", "incremental_id") + ``` + """ + + storage_options = get_storage_options() if table_uri.startswith("abfss://") else None + + if not DeltaTable.is_deltatable(table_uri, storage_options=storage_options): + return 0 + + return pl.scan_delta(table_uri, storage_options=storage_options).select(pl.col(incremental_column)).max().collect().item() + diff --git a/src/msfabricutils/etl/helpers/merge_helpers.py b/src/msfabricutils/etl/helpers/merge_helpers.py new file mode 100644 index 0000000..a09f769 --- /dev/null +++ b/src/msfabricutils/etl/helpers/merge_helpers.py @@ -0,0 +1,100 @@ +from msfabricutils.common.quote_identifier import quote_identifier + + +def build_merge_predicate(columns: list[str]) -> str: + """ + Constructs a SQL merge predicate based on the provided column names. + + This function generates a string that represents the condition for merging + records based on equality of the specified columns. + + Args: + columns (list[str]): A list of column names to be used in the merge predicate. + + Returns: + A SQL string representing the merge predicate. + + Example: + ```python + predicate = build_merge_predicate(['id', 'name']) + print(predicate) + \"\"\" + (target."id" = source."id") AND (target."name" = source."name") + \"\"\" + ``` + """ + merge_predicate = [ + f""" + (target.{quote_identifier(column)} = source.{quote_identifier(column)}) + """ + for column in columns + ] + return " AND ".join(merge_predicate) + + +def build_when_matched_update_predicate(columns: list[str]) -> str: + """ + Constructs a SQL predicate for when matched update conditions. + + This function generates a string that represents the conditions for updating + records when a match is found based on the specified columns. + + Args: + columns (list[str]): A list of column names to be used in the update predicate. + + Returns: + A SQL string representing the when matched update predicate. + + Example: + ```python + update_predicate = build_when_matched_update_predicate(['id', 'status']) + print(update_predicate) + \"\"\" + ( + (target."id" != source."id") + OR (target."id" IS NULL AND source."id" IS NOT NULL) + OR (target."id" IS NOT NULL AND source."id" IS NULL) + ) OR ... + \"\"\" + ``` + """ + when_matched_update_predicates = [ + f""" + ( + (target.{quote_identifier(column)} != source.{quote_identifier(column)}) + OR (target.{quote_identifier(column)} IS NULL AND source.{quote_identifier(column)} IS NOT NULL) + OR (target.{quote_identifier(column)} IS NOT NULL AND source.{quote_identifier(column)} IS NULL) + ) + """ + for column in columns + ] + return " OR ".join(when_matched_update_predicates) + + +def build_when_matched_update_columns(columns: list[str]) -> dict[str, str]: + """ + Constructs a mapping of columns to be updated when a match is found. + + This function generates a dictionary where the keys are the target column + names and the values are the corresponding source column names. + + Args: + columns (list[str]): A list of column names to be used in the update mapping. + + Returns: + A dictionary mapping target columns to source columns. + + Example: + ```python + update_columns = build_when_matched_update_columns(['id', 'name']) + print(update_columns) + { + 'target."id"': 'source."id"', + 'target."name"': 'source."name"' + } + ``` + """ + return { + f"target.{quote_identifier(column)}": f"source.{quote_identifier(column)}" + for column in columns + } diff --git a/src/msfabricutils/etl/types.py b/src/msfabricutils/etl/types.py new file mode 100644 index 0000000..6e90fcb --- /dev/null +++ b/src/msfabricutils/etl/types.py @@ -0,0 +1,3 @@ +import polars as pl + +PolarsFrame = pl.DataFrame | pl.LazyFrame From 25cc07ebb937e0f1e553f331c10474e16b157385 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:19:13 +0100 Subject: [PATCH 09/16] rm old helpers --- src/msfabricutils/helpers/__init__.py | 11 -- src/msfabricutils/helpers/merge_helpers.py | 100 ------------------ src/msfabricutils/helpers/quote_identifier.py | 20 ---- .../helpers/separator_indices.py | 29 ----- .../helpers/string_normalization.py | 40 ------- 5 files changed, 200 deletions(-) delete mode 100644 src/msfabricutils/helpers/__init__.py delete mode 100644 src/msfabricutils/helpers/merge_helpers.py delete mode 100644 src/msfabricutils/helpers/quote_identifier.py delete mode 100644 src/msfabricutils/helpers/separator_indices.py delete mode 100644 src/msfabricutils/helpers/string_normalization.py diff --git a/src/msfabricutils/helpers/__init__.py b/src/msfabricutils/helpers/__init__.py deleted file mode 100644 index 48c6e47..0000000 --- a/src/msfabricutils/helpers/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from .separator_indices import _separator_indices -from .quote_identifier import quote_identifier -from .merge_helpers import build_merge_predicate, build_when_matched_update_columns, build_when_matched_update_predicate - -__all__ = ( - "_separator_indices", - "quote_identifier", - "build_merge_predicate", - "build_when_matched_update_columns", - "build_when_matched_update_predicate", -) diff --git a/src/msfabricutils/helpers/merge_helpers.py b/src/msfabricutils/helpers/merge_helpers.py deleted file mode 100644 index 115ba0c..0000000 --- a/src/msfabricutils/helpers/merge_helpers.py +++ /dev/null @@ -1,100 +0,0 @@ -from msfabricutils.helpers.quote_identifier import quote_identifier - - -def build_merge_predicate(columns: list[str]) -> str: - """ - Constructs a SQL merge predicate based on the provided column names. - - This function generates a string that represents the condition for merging - records based on equality of the specified columns. - - Args: - columns (list[str]): A list of column names to be used in the merge predicate. - - Returns: - A SQL string representing the merge predicate. - - Example: - ```python - predicate = build_merge_predicate(['id', 'name']) - print(predicate) - \"\"\" - (target."id" = source."id") AND (target."name" = source."name") - \"\"\" - ``` - """ - merge_predicate = [ - f""" - (target.{quote_identifier(column)} = source.{quote_identifier(column)}) - """ - for column in columns - ] - return " AND ".join(merge_predicate) - - -def build_when_matched_update_predicate(columns: list[str]) -> str: - """ - Constructs a SQL predicate for when matched update conditions. - - This function generates a string that represents the conditions for updating - records when a match is found based on the specified columns. - - Args: - columns (list[str]): A list of column names to be used in the update predicate. - - Returns: - A SQL string representing the when matched update predicate. - - Example: - ```python - update_predicate = build_when_matched_update_predicate(['id', 'status']) - print(update_predicate) - \"\"\" - ( - (target."id" != source."id") - OR (target."id" IS NULL AND source."id" IS NOT NULL) - OR (target."id" IS NOT NULL AND source."id" IS NULL) - ) OR ... - \"\"\" - ``` - """ - when_matched_update_predicates = [ - f""" - ( - (target.{quote_identifier(column)} != source.{quote_identifier(column)}) - OR (target.{quote_identifier(column)} IS NULL AND source.{quote_identifier(column)} IS NOT NULL) - OR (target.{quote_identifier(column)} IS NOT NULL AND source.{quote_identifier(column)} IS NULL) - ) - """ - for column in columns - ] - return " OR ".join(when_matched_update_predicates) - - -def build_when_matched_update_columns(columns: list[str]) -> dict[str, str]: - """ - Constructs a mapping of columns to be updated when a match is found. - - This function generates a dictionary where the keys are the target column - names and the values are the corresponding source column names. - - Args: - columns (list[str]): A list of column names to be used in the update mapping. - - Returns: - A dictionary mapping target columns to source columns. - - Example: - ```python - update_columns = build_when_matched_update_columns(['id', 'name']) - print(update_columns) - { - 'target."id"': 'source."id"', - 'target."name"': 'source."name"' - } - ``` - """ - return { - f"target.{quote_identifier(column)}": f"source.{quote_identifier(column)}" - for column in columns - } diff --git a/src/msfabricutils/helpers/quote_identifier.py b/src/msfabricutils/helpers/quote_identifier.py deleted file mode 100644 index d5a11a7..0000000 --- a/src/msfabricutils/helpers/quote_identifier.py +++ /dev/null @@ -1,20 +0,0 @@ -def quote_identifier(identifier: str, quote_character: str = '"') -> str: - """ - Quotes the given identifier by surrounding it with the specified quote character. - - Args: - identifier (str): The identifier to be quoted. - quote_character (str, optional): The character to use for quoting. Defaults to '"'. - - Returns: - The quoted identifier. - - Example: - ```python - quote_identifier("my_object") - '"my_object"' - quote_identifier("my_object", "'") - "'my_object'" - ``` - """ - return f"{quote_character}{identifier.strip(quote_character)}{quote_character}" diff --git a/src/msfabricutils/helpers/separator_indices.py b/src/msfabricutils/helpers/separator_indices.py deleted file mode 100644 index ce9fb44..0000000 --- a/src/msfabricutils/helpers/separator_indices.py +++ /dev/null @@ -1,29 +0,0 @@ -def _separator_indices(string: str, separator: str) -> list[int]: - """Find indices of a separator character in a string, ignoring separators inside quotes. - - Args: - string (str): The input string to search through - separator (str): The separator character to find - - Returns: - A list of indices where the separator character appears outside of quotes - - Example: - >>> separator_indices('a,b,"c,d",e', ',') - [1, 8] - """ - inside_double_quotes = False - inside_single_quotes = False - indices = [] - - for idx, char in enumerate(string): - if char == '"' and not inside_single_quotes: - inside_double_quotes = not inside_double_quotes - elif char == "'" and not inside_double_quotes: - inside_single_quotes = not inside_single_quotes - elif inside_double_quotes or inside_single_quotes: - continue - elif char == separator: - indices.append(idx) - - return indices diff --git a/src/msfabricutils/helpers/string_normalization.py b/src/msfabricutils/helpers/string_normalization.py deleted file mode 100644 index ff0fb6c..0000000 --- a/src/msfabricutils/helpers/string_normalization.py +++ /dev/null @@ -1,40 +0,0 @@ -import re - -def to_snake_case(text: str) -> str: - """Convert a string to snake case. - - Args: - text (str): The string to convert to snake case. Can be converted from PascalCase, camelCase, kebab-case, or mixed case. Non-alphanumeric characters are converted to underscores. - - Returns: - The string in snake case. - - Examples: - >>> to_snake_case("CustomerID") - "customer_id" - """ - text = text.replace(" ", "_") - text = text.replace("-", "_") - text = re.sub(r"([a-z])([A-Z0-9])", r"\1_\2", text) - text = re.sub(r"([A-Z0-9])([A-Z0-9][a-z])", r"\1_\2", text) - text = re.sub(r"(? str: - """Translate characters in a string using a translation map. - - Args: - text (str): The string to translate. - translation_map (dict[str, str]): A dictionary mapping characters to their replacements. - - Returns: - The translated string. - - Examples: - >>> character_translation("Profit&Loss", {"&": "_and"}) - "Profit_and_Loss" - """ - for character, replacement in translation_map.items(): - text = text.replace(character, replacement) - return text From 272b82e84a0e370ee484946586c9644e934156c7 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:19:51 +0100 Subject: [PATCH 10/16] add delta and parquet sources --- src/msfabricutils/etl/sources/__init__.py | 7 +++ src/msfabricutils/etl/sources/delta_table.py | 44 +++++++++++++++ src/msfabricutils/etl/sources/parquet.py | 58 ++++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 src/msfabricutils/etl/sources/__init__.py create mode 100644 src/msfabricutils/etl/sources/delta_table.py create mode 100644 src/msfabricutils/etl/sources/parquet.py diff --git a/src/msfabricutils/etl/sources/__init__.py b/src/msfabricutils/etl/sources/__init__.py new file mode 100644 index 0000000..8041589 --- /dev/null +++ b/src/msfabricutils/etl/sources/__init__.py @@ -0,0 +1,7 @@ +from .delta_table import source_delta +from .parquet import source_parquet + +__all__ = ( + "source_delta", + "source_parquet", +) diff --git a/src/msfabricutils/etl/sources/delta_table.py b/src/msfabricutils/etl/sources/delta_table.py new file mode 100644 index 0000000..dc1a991 --- /dev/null +++ b/src/msfabricutils/etl/sources/delta_table.py @@ -0,0 +1,44 @@ +import polars as pl + +from msfabricutils.core.auth import get_storage_options +from msfabricutils.etl.types import PolarsFrame + + +def source_delta(table_uri: str, eager: bool = False) -> PolarsFrame: + """ + Reads a Delta table from the specified abfss URI. Automatically handles the authentication with OneLake. + + Args: + table_uri (str): The abfss URI of the Delta table to read. + eager (bool, optional): If True, reads the table eagerly; otherwise, returns a lazy frame. Defaults to False. + + Returns: + PolarsFrame: The data from the Delta table. + + Example: + ```python + from msfabricutils.etl import source_delta + + workspace_id = "12345678-1234-1234-1234-123456789012" + lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef" + table_name = "my-delta-table" + table_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}" + + df = source_delta(table_uri, eager=True) + lazy_df = source_delta(table_uri, eager=False) + ``` + """ + + storage_options = get_storage_options() if table_uri.startswith("abfss://") else None + + if eager: + return pl.read_delta( + source=table_uri, + storage_options=storage_options + ) + + return pl.scan_delta( + source=table_uri, + storage_options=storage_options + ) + diff --git a/src/msfabricutils/etl/sources/parquet.py b/src/msfabricutils/etl/sources/parquet.py new file mode 100644 index 0000000..d327d08 --- /dev/null +++ b/src/msfabricutils/etl/sources/parquet.py @@ -0,0 +1,58 @@ +import polars as pl + +from msfabricutils.core.auth import get_storage_options +from msfabricutils.etl.types import PolarsFrame + + +def source_parquet(table_uri: str, eager: bool = False) -> PolarsFrame: + """ + Reads a Parquet file from the specified abfss URI. Automatically handles the authentication with OneLake. + + Args: + table_uri (str): The abfss URI of the Parquet file to read. Supports globbing. + eager (bool, optional): If True, reads the file eagerly; otherwise, returns a lazy frame. Defaults to False. + + Returns: + PolarsFrame: The data from the Parquet file. + + Example: + Reading a single file + ```python + from msfabricutils.etl import source_parquet + + workspace_id = "12345678-1234-1234-1234-123456789012" + lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef" + + file_path = "my-parquet-file.parquet" + folder_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/" + + df = source_parquet(folder_uri + file_path, eager=True) + ``` + + Reading all Parquet files in a folder + ```python + from msfabricutils.etl import source_parquet + + workspace_id = "12345678-1234-1234-1234-123456789012" + lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef" + + folder_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/" + glob_df = source_parquet(folder_uri + "**/*.parquet", eager=True) + ``` + """ + + storage_options = get_storage_options() if table_uri.startswith("abfss://") else None + + if eager: + return pl.read_parquet( + source=table_uri, + hive_partitioning=True, + storage_options=storage_options + ) + + return pl.scan_parquet( + source=table_uri, + hive_partitioning=True, + storage_options=storage_options + ) + From 60ce0c25a4f252da10f7d2baa1cb8f189cbf7753 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:20:27 +0100 Subject: [PATCH 11/16] add delta table scd type 1 sink --- src/msfabricutils/etl/sinks/__init__.py | 5 ++ src/msfabricutils/etl/sinks/delta_table.py | 98 ++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 src/msfabricutils/etl/sinks/__init__.py create mode 100644 src/msfabricutils/etl/sinks/delta_table.py diff --git a/src/msfabricutils/etl/sinks/__init__.py b/src/msfabricutils/etl/sinks/__init__.py new file mode 100644 index 0000000..0b9a247 --- /dev/null +++ b/src/msfabricutils/etl/sinks/__init__.py @@ -0,0 +1,5 @@ +from .delta_table import upsert_scd_type_1 + +__all__ = ( + "upsert_scd_type_1", +) diff --git a/src/msfabricutils/etl/sinks/delta_table.py b/src/msfabricutils/etl/sinks/delta_table.py new file mode 100644 index 0000000..546f2be --- /dev/null +++ b/src/msfabricutils/etl/sinks/delta_table.py @@ -0,0 +1,98 @@ +import polars as pl +from deltalake import DeltaTable + +from msfabricutils.etl import Config +from msfabricutils.etl.helpers.merge_helpers import ( + build_merge_predicate, + build_when_matched_update_columns, + build_when_matched_update_predicate, +) +from msfabricutils.etl.types import PolarsFrame + + +def upsert_scd_type_1(table_uri: str, df: PolarsFrame, primary_key_columns: str | list[str], config: Config | None = None, exclude_columns: str | list[str] | None = None) -> dict[str: str]: + """ + Upserts dataframe into a Delta table using Slowly Changing Dimension (SCD) Type 1. + + Args: + table_uri (str): The URI of the target Delta table. + df (PolarsFrame): The dataframe to upsert. + config (Config | None): Configuration object containing audit column information. + primary_key_columns (str | list[str]): Primary key column(s) for the upsert. + exclude_columns (str | list[str] | None): Columns to exclude from the upsert. + + Returns: + Result of the merge operation. + + Example: + ```python + from msfabricutils.etl import Config, upsert_scd_type_1 + import polars as pl + + + config = get_default_config() + data = pl.DataFrame({...}) + + upsert_scd_type_1( + "path/to/delta_table", + data, + config, + primary_key_columns=["id"] + ) + ``` + """ + + dynamic_audit_columns = config.get_dynamic_audit_columns() if config else [] + static_audit_columns = config.get_static_audit_columns() if config else [] + + if exclude_columns is None: + exclude_columns = [] + + if isinstance(primary_key_columns, str): + primary_key_columns = [primary_key_columns] + primary_key_columns = [config.normalization_strategy(column) for column in primary_key_columns] + + + if isinstance(exclude_columns, str): + exclude_columns = [exclude_columns] + + if isinstance(df, pl.LazyFrame): + df = df.collect() + + df = df.to_arrow() + + if DeltaTable.is_deltatable(table_uri): + dt = DeltaTable(table_uri) + else: + dt = DeltaTable.create(table_uri, df.schema) + + merge_predicate = build_merge_predicate(primary_key_columns) + + predicate_update_columns = [ + column for column in df.column_names + if column not in primary_key_columns + exclude_columns + static_audit_columns + dynamic_audit_columns + ] + + when_matched_update_predicates = build_when_matched_update_predicate(predicate_update_columns) + update_columns = [ + column for column in df.column_names + if column not in primary_key_columns + exclude_columns + static_audit_columns + ] + + when_matched_update_columns = build_when_matched_update_columns(update_columns) + table_merger = ( + dt.merge( + df, + source_alias = "source", + target_alias = "target", + predicate = merge_predicate, + ) + .when_matched_update( + predicate = when_matched_update_predicates, + updates = when_matched_update_columns + ) + .when_not_matched_insert_all() + ) + + return table_merger.execute() + From 60fe518a97d17e603b313abe86d08299bd41106e Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:21:10 +0100 Subject: [PATCH 12/16] add default transforms --- src/msfabricutils/etl/transforms/__init__.py | 11 ++ .../etl/transforms/transforms.py | 135 ++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 src/msfabricutils/etl/transforms/__init__.py create mode 100644 src/msfabricutils/etl/transforms/transforms.py diff --git a/src/msfabricutils/etl/transforms/__init__.py b/src/msfabricutils/etl/transforms/__init__.py new file mode 100644 index 0000000..3b1a521 --- /dev/null +++ b/src/msfabricutils/etl/transforms/__init__.py @@ -0,0 +1,11 @@ +from .transforms import ( + add_audit_columns_transform, + deduplicate_transform, + normalize_column_names_transform, +) + +__all__ = ( + "add_audit_columns_transform", + "deduplicate_transform", + "normalize_column_names_transform", +) diff --git a/src/msfabricutils/etl/transforms/transforms.py b/src/msfabricutils/etl/transforms/transforms.py new file mode 100644 index 0000000..2179dd0 --- /dev/null +++ b/src/msfabricutils/etl/transforms/transforms.py @@ -0,0 +1,135 @@ + +import polars as pl +from polars.exceptions import ColumnNotFoundError + +from msfabricutils.etl import Config +from msfabricutils.etl.types import PolarsFrame + + +def add_audit_columns_transform(df: PolarsFrame, config: Config) -> PolarsFrame: + """ + Adds audit columns to the given DataFrame or LazyFrame based on the configuration. + + Args: + df (PolarsFrame): The DataFrame or LazyFrame to which audit columns will be added. + config (Config): The configuration object that provides the audit column definitions. + + Returns: + The DataFrame or LazyFrame with the added audit columns. + + Example: + ```python + from msfabricutils.etl import get_default_config, add_audit_columns_transform + import polars as pl + + + config = get_default_config() + df = pl.DataFrame({"data": [1, 2, 3]}) + updated_df = add_audit_columns_transform(df, config) + + ``` + """ + + audit_columns = config.get_audit_columns() + + df = df.with_columns( + [ + audit_column.default_value.alias(audit_column.name) + for audit_column in audit_columns + ] + ) + return df + + +def deduplicate_transform(df: PolarsFrame, primary_key_columns: str | list[str] | None = None, deduplication_order_columns: str | list[str] | None = None, deduplication_order_descending: bool | list[bool] = True) -> PolarsFrame: + """ + Removes duplicate rows from the DataFrame based on primary key columns. + + Args: + df (PolarsFrame): The DataFrame or LazyFrame from which duplicates will be removed. + primary_key_columns (list[str] | None): The columns to use as primary keys for deduplication. + deduplication_order_columns (list[str] | None): The columns to determine the order of rows for deduplication. + deduplication_order_descending (bool | list[bool]): Whether to sort the deduplication order in descending order. + + Returns: + PolarsFrame: The DataFrame or LazyFrame with duplicates removed. + + Example: + ```python + import polars as pl + + df = pl.DataFrame({ + "id": [1, 2, 2, 3], + "value": ["a", "b", "b", "c"] + }) + deduped_df = deduplicate_transform(df, primary_key_columns=["id"]) + ``` + """ + + if isinstance(primary_key_columns, str): + primary_key_columns = [primary_key_columns] + + # Temporary fix start + # See GitHub issue: https://github.com/pola-rs/polars/issues/20209 + # TODO: Remove this once the issue is fixed. + # .unique() does not check if subset columns exist in the dataframe if it is empty, so it's silently ignores. + + if isinstance(df, pl.LazyFrame): + columns = df.collect_schema().names() + else: + columns = df.schema.names() + + if primary_key_columns: + for column in primary_key_columns: + if column not in columns: + raise ColumnNotFoundError(f"unable to find column `{column}`. Valid columns: {columns}") + + # Temporary fix end + + if deduplication_order_columns: + df = df.sort(deduplication_order_columns, descending=deduplication_order_descending, nulls_last=True) + + df = df.unique(subset=primary_key_columns, keep="first") + + return df + + +def normalize_column_names_transform(df: PolarsFrame, config: Config) -> PolarsFrame: + """ + Normalizes the column names of the DataFrame using a provided normalization strategy. + + Args: + df (PolarsFrame): The DataFrame or LazyFrame whose column names will be normalized. + config (Config): The configuration object that provides the normalization strategy. + + Returns: + PolarsFrame: The DataFrame or LazyFrame with normalized column names. + + Example: + ```python + import polars as pl + from msfabricutils.etl import get_default_config + + config = get_default_config() + + df = pl.DataFrame({"First Name": [1, 2], "Last Name": [3, 4]}) + normalized_df = normalize_column_names_transform(df, config) + ``` + """ + + if isinstance(df, pl.LazyFrame): + columns = df.collect_schema().names() + else: + columns = df.schema.names() + + column_mapping = { + old_name: config.normalization_strategy(old_name) + for old_name in columns + } + + df = df.rename(column_mapping) + + return df + +# def filter_source(df: PolarsFrame, filter: Callable[[PolarsFrame], PolarsFrame]) -> PolarsFrame: +# return filter(df) \ No newline at end of file From 11d20b15da21d595dbc05586a98386322c4073be Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:21:56 +0100 Subject: [PATCH 13/16] add tests --- tests/etl/test_end_to_end.py | 84 +++++++++++++++++++ tests/etl/{ => transforms}/test_transforms.py | 17 ++-- tests/helpers/test_escape_object_name.py | 3 +- tests/helpers/test_merge_helpers.py | 8 +- tests/helpers/test_separator_indices.py | 2 +- tests/helpers/test_string_normalization.py | 4 +- 6 files changed, 107 insertions(+), 11 deletions(-) create mode 100644 tests/etl/test_end_to_end.py rename tests/etl/{ => transforms}/test_transforms.py (79%) diff --git a/tests/etl/test_end_to_end.py b/tests/etl/test_end_to_end.py new file mode 100644 index 0000000..6fd5fcf --- /dev/null +++ b/tests/etl/test_end_to_end.py @@ -0,0 +1,84 @@ +from datetime import datetime, timezone + +import polars as pl +from freezegun import freeze_time +from polars.testing import assert_frame_equal + +from msfabricutils.etl import ( + add_audit_columns_transform, + deduplicate_transform, + get_default_config, + get_incremental_column_value, + normalize_column_names_transform, + source_delta, + upsert_scd_type_1, +) + + +@freeze_time("2024-01-01") +def test_end_to_end(tmp_path): + + source_table_path = str(tmp_path / "source_table") + target_table_path = str(tmp_path / "target_table") + + # Create a source table + source_df = pl.DataFrame({ + "ID": [1, 2, 3, 1, 2], + "FirstName": ["Alice", "Bob", "Charlie", "AliceS", "BobB"], + "batch_id": [1, 1, 1, 2, 2] + }) + source_df.write_delta(source_table_path) + + # Get the default config + config = get_default_config() + + # Read the source delta table + source_df = source_delta(source_table_path) + + # Get the incremental column value + incremental_column_value = get_incremental_column_value(target_table_path, "batch_id") + + # Filter the source dataframe to only get the rows with a modified_at greater than the incremental column value + filtered_df = source_df.filter(pl.col("batch_id") > incremental_column_value) + + # Deduplicate the source dataframe + deduped_df = deduplicate_transform(filtered_df, primary_key_columns="ID", deduplication_order_columns="batch_id") + + # Normalize the column names + normalized_df = normalize_column_names_transform(deduped_df, config) + + # Add audit columns + audit_df = add_audit_columns_transform(normalized_df, config) + + # Upsert to a target table + upsert_scd_type_1(target_table_path, audit_df, primary_key_columns="ID", config=config) + + # Read the target table + target_df = pl.read_delta(target_table_path) + + # Test that the target table is as expected + datetime_now = datetime(2024, 1, 1, 0, 0, tzinfo=timezone.utc) + + expected_df = pl.DataFrame( + { + "id": [3, 1, 2], + "first_name": ["Charlie", "AliceS", "BobB"], + "batch_id": [1, 2, 2], + "__created_at": [datetime_now, datetime_now, datetime_now], + "__modified_at": [datetime_now, datetime_now, datetime_now], + "__deleted_at": [None, None, None], + "__valid_from": [datetime_now, datetime_now, datetime_now], + "__valid_to": [None, None, None], + }, + schema_overrides={ + "__created_at": pl.Datetime(time_zone="UTC"), + "__modified_at": pl.Datetime(time_zone="UTC"), + "__deleted_at": pl.Datetime(time_zone="UTC"), + "__valid_to": pl.Datetime(time_zone="UTC"), + "__valid_from": pl.Datetime(time_zone="UTC"), + } + ) + + print(target_df) + + assert_frame_equal(target_df, expected_df, check_row_order=False) diff --git a/tests/etl/test_transforms.py b/tests/etl/transforms/test_transforms.py similarity index 79% rename from tests/etl/test_transforms.py rename to tests/etl/transforms/test_transforms.py index a1e8ae9..d38303a 100644 --- a/tests/etl/test_transforms.py +++ b/tests/etl/transforms/test_transforms.py @@ -1,10 +1,12 @@ +from datetime import datetime, timezone + import polars as pl -from msfabricutils.etl import get_default_config -from msfabricutils.etl.transforms import add_audit_columns_transform -from polars.testing import assert_frame_equal from freezegun import freeze_time -from datetime import datetime, timezone +from polars.testing import assert_frame_equal + +from msfabricutils.etl import get_default_config +from msfabricutils.etl.transforms.transforms import add_audit_columns_transform @freeze_time("2024-12-06 12:00:00") @@ -28,11 +30,12 @@ def test_add_audit_columns(): "__modified_at": [datetime_now, datetime_now, datetime_now], "__deleted_at": [None, None, None], "__valid_from": [datetime_now, datetime_now, datetime_now], - "__valid_to": [datetime_now, datetime_now, datetime_now], + "__valid_to": [None, None, None], }, schema_overrides={ - "__deleted_at": pl.Datetime(time_zone="UTC") + "__deleted_at": pl.Datetime(time_zone="UTC"), + "__valid_to": pl.Datetime(time_zone="UTC") } ) - assert actual_df.columns == expected_df.columns + assert_frame_equal(actual_df, expected_df) diff --git a/tests/helpers/test_escape_object_name.py b/tests/helpers/test_escape_object_name.py index 3b8bc9c..cca79f8 100644 --- a/tests/helpers/test_escape_object_name.py +++ b/tests/helpers/test_escape_object_name.py @@ -1,4 +1,5 @@ -from msfabricutils.helpers.quote_identifier import quote_identifier +from msfabricutils.common.quote_identifier import quote_identifier + def test_quote_character(): assert quote_identifier("my_object") == '"my_object"' diff --git a/tests/helpers/test_merge_helpers.py b/tests/helpers/test_merge_helpers.py index d1bc2dd..e243197 100644 --- a/tests/helpers/test_merge_helpers.py +++ b/tests/helpers/test_merge_helpers.py @@ -1,6 +1,12 @@ -from msfabricutils.helpers.merge_helpers import build_merge_predicate, build_when_matched_update_predicate, build_when_matched_update_columns import re +from msfabricutils.etl.helpers.merge_helpers import ( + build_merge_predicate, + build_when_matched_update_columns, + build_when_matched_update_predicate, +) + + def test_build_merge_predicate(): columns = ["column1", "column2"] expected_output = """ diff --git a/tests/helpers/test_separator_indices.py b/tests/helpers/test_separator_indices.py index c3c59a0..8d94aa4 100644 --- a/tests/helpers/test_separator_indices.py +++ b/tests/helpers/test_separator_indices.py @@ -1,4 +1,4 @@ -from msfabricutils.helpers import _separator_indices +from msfabricutils.common import _separator_indices def test_seperator_indices_simple_string(): diff --git a/tests/helpers/test_string_normalization.py b/tests/helpers/test_string_normalization.py index 676a555..dee79cf 100644 --- a/tests/helpers/test_string_normalization.py +++ b/tests/helpers/test_string_normalization.py @@ -1,6 +1,7 @@ -from msfabricutils.helpers.string_normalization import to_snake_case, character_translation +from msfabricutils.common.string_normalization import character_translation, to_snake_case from msfabricutils.etl import get_default_config + def test_to_snake_case(): assert to_snake_case("CustomerID") == "customer_id" assert to_snake_case("IDNumber") == "id_number" @@ -15,6 +16,7 @@ def test_to_snake_case(): assert to_snake_case("__modified_at") == "__modified_at" assert to_snake_case("__valid_from") == "__valid_from" assert to_snake_case("__valid_to") == "__valid_to" + assert to_snake_case("this-contains_ ALLKinds OfWord_Boundaries") == "this_contains_all_kinds_of_word_boundaries" def test_special_character_translation(): assert character_translation("Profit&Loss", {"&": "_and_"}) == "Profit_and_Loss" From 124e67f584bf4486a2510c042f939c86d090b4d0 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:36:52 +0100 Subject: [PATCH 14/16] ruff format --- docs/gen_ref_pages.py | 5 +- .../common/string_normalization.py | 4 +- src/msfabricutils/core/auth.py | 51 +++++++++-------- src/msfabricutils/core/generic.py | 10 ++-- src/msfabricutils/core/lakehouse.py | 12 ++-- src/msfabricutils/core/workspace.py | 10 ++-- src/msfabricutils/etl/config.py | 55 ++++++++++++++----- .../helpers/get_incremental_column_value.py | 9 ++- .../etl/helpers/merge_helpers.py | 10 ++-- src/msfabricutils/etl/sinks/__init__.py | 4 +- src/msfabricutils/etl/sinks/delta_table.py | 30 ++++++---- src/msfabricutils/etl/sources/delta_table.py | 13 +---- src/msfabricutils/etl/sources/parquet.py | 9 +-- .../etl/transforms/transforms.py | 37 +++++++------ .../test_quote_identifier.py} | 3 +- .../test_separator_indices.py | 0 .../test_string_normalization.py | 12 +++- tests/{ => etl}/helpers/test_merge_helpers.py | 14 +++-- tests/etl/test_end_to_end.py | 23 ++++---- tests/etl/transforms/test_transforms.py | 13 ++--- 20 files changed, 178 insertions(+), 146 deletions(-) rename tests/{helpers/test_escape_object_name.py => common/test_quote_identifier.py} (89%) rename tests/{helpers => common}/test_separator_indices.py (100%) rename tests/{helpers => common}/test_string_normalization.py (88%) rename tests/{ => etl}/helpers/test_merge_helpers.py (89%) diff --git a/docs/gen_ref_pages.py b/docs/gen_ref_pages.py index 755ecc9..8d720bf 100644 --- a/docs/gen_ref_pages.py +++ b/docs/gen_ref_pages.py @@ -1,6 +1,7 @@ """Generate the code reference pages.""" from pathlib import Path + import mkdocs_gen_files nav = mkdocs_gen_files.Nav() @@ -26,11 +27,11 @@ with mkdocs_gen_files.open(full_doc_path, "w") as fd: identifier = ".".join(parts) - #print(f"# {identifier}", file=fd) + # print(f"# {identifier}", file=fd) print("::: " + identifier, file=fd) mkdocs_gen_files.set_edit_path(full_doc_path, path) # Generate navigation summary with mkdocs_gen_files.open("reference/SUMMARY.md", "w") as nav_file: - nav_file.writelines(nav.build_literate_nav()) \ No newline at end of file + nav_file.writelines(nav.build_literate_nav()) diff --git a/src/msfabricutils/common/string_normalization.py b/src/msfabricutils/common/string_normalization.py index 1f8ed52..2c11e11 100644 --- a/src/msfabricutils/common/string_normalization.py +++ b/src/msfabricutils/common/string_normalization.py @@ -3,7 +3,7 @@ def to_snake_case(text: str) -> str: """Convert a string to snake case. - + Args: text (str): The string to convert to snake case. Can be converted from PascalCase, camelCase, kebab-case, or mixed case. Non-alphanumeric characters are converted to underscores. @@ -26,7 +26,7 @@ def to_snake_case(text: str) -> str: def character_translation(text: str, translation_map: dict[str, str]) -> str: """Translate characters in a string using a translation map. - + Args: text (str): The string to translate. translation_map (dict[str, str]): A dictionary mapping characters to their replacements. diff --git a/src/msfabricutils/core/auth.py b/src/msfabricutils/core/auth.py index 14586bb..8c8e349 100644 --- a/src/msfabricutils/core/auth.py +++ b/src/msfabricutils/core/auth.py @@ -4,35 +4,37 @@ def get_access_token(audience: str) -> str: """ Retrieves an access token for a given audience. - - This function attempts to obtain an access token for a given audience. - It first checks if the code is running in a Microsoft Fabric notebook environment - and attempts to use the `notebookutils` library to get the token. If the library - is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK + + This function attempts to obtain an access token for a given audience. + It first checks if the code is running in a Microsoft Fabric notebook environment + and attempts to use the `notebookutils` library to get the token. If the library + is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. """ try: import notebookutils # type: ignore + token = notebookutils.credentials.getToken(audience) except ModuleNotFoundError: token = DefaultAzureCredential().get_token(f"{audience}/.default").token return token + def get_onelake_access_token() -> str: """ Retrieves an access token for OneLake storage. - This function attempts to obtain an access token for accessing Azure storage. - It first checks if the code is running in a Microsoft Fabric notebook environment - and attempts to use the `notebookutils` library to get the token. If the library - is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK + This function attempts to obtain an access token for accessing Azure storage. + It first checks if the code is running in a Microsoft Fabric notebook environment + and attempts to use the `notebookutils` library to get the token. If the library + is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. Returns: The access token used for authenticating requests to Azure OneLake storage. - """ + """ audience = "https://storage.azure.com" return get_access_token(audience) @@ -41,15 +43,15 @@ def get_fabric_bearer_token() -> str: """ Retrieves a bearer token for Fabric (Power BI) API. - This function attempts to obtain a bearer token for authenticating requests to the - Power BI API. It first checks if the code is running in a Microsoft Fabric - notebook environment and tries to use the `notebookutils` library to get the token. - If the library is not available, it falls back to using the `DefaultAzureCredential` + This function attempts to obtain a bearer token for authenticating requests to the + Power BI API. It first checks if the code is running in a Microsoft Fabric + notebook environment and tries to use the `notebookutils` library to get the token. + If the library is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. Returns: The bearer token used for authenticating requests to the Fabric (Power BI) API. - """ + """ audience = "https://analysis.windows.net/powerbi/api" return get_access_token(audience) @@ -58,15 +60,15 @@ def get_azure_devops_access_token() -> str: """ Retrieves a bearer token for Fabric (Power BI) API. - This function attempts to obtain a bearer token for authenticating requests to the - Power BI API. It first checks if the code is running in a Microsoft Fabric - notebook environment and tries to use the `notebookutils` library to get the token. - If the library is not available, it falls back to using the `DefaultAzureCredential` + This function attempts to obtain a bearer token for authenticating requests to the + Power BI API. It first checks if the code is running in a Microsoft Fabric + notebook environment and tries to use the `notebookutils` library to get the token. + If the library is not available, it falls back to using the `DefaultAzureCredential` from the Azure SDK to fetch the token. Returns: The bearer token used for authenticating requests to the Azure Fabric (Power BI) API. - """ + """ audience = "499b84ac-1321-427f-aa17-267ca6975798" return get_access_token(audience) @@ -75,8 +77,8 @@ def get_storage_options() -> dict[str, str]: """ Retrieves storage options including a bearer token for OneLake storage. - This function calls `get_onelake_access_token` to obtain a bearer token - and returns a dictionary containing the token and a flag indicating + This function calls `get_onelake_access_token` to obtain a bearer token + and returns a dictionary containing the token and a flag indicating whether to use the Fabric endpoint. Returns: @@ -92,7 +94,4 @@ def get_storage_options() -> dict[str, str]: {'bearer_token': 'your_token_here', 'use_fabric_endpoint': 'true'} ``` """ - return { - "bearer_token": get_onelake_access_token(), - "use_fabric_endpoint": "true" - } \ No newline at end of file + return {"bearer_token": get_onelake_access_token(), "use_fabric_endpoint": "true"} diff --git a/src/msfabricutils/core/generic.py b/src/msfabricutils/core/generic.py index 7651118..1f5abe4 100644 --- a/src/msfabricutils/core/generic.py +++ b/src/msfabricutils/core/generic.py @@ -7,9 +7,9 @@ def get_paginated(endpoint: str, data_key: str) -> list[dict]: """ Retrieves paginated data from the specified API endpoint. - This function makes repeated GET requests to the specified endpoint of the - Fabric REST API, handling pagination automatically. It uses a bearer token - for authentication and retrieves data from each page, appending the results + This function makes repeated GET requests to the specified endpoint of the + Fabric REST API, handling pagination automatically. It uses a bearer token + for authentication and retrieves data from each page, appending the results to a list. Pagination continues until no `continuationToken` is returned. Args: @@ -48,8 +48,8 @@ def get_page(endpoint: str) -> list[dict]: """ Retrieves data from a specified API endpoint. - This function makes a GET request to the specified endpoint of the Azure Fabric API, - using a bearer token for authentication. It returns the JSON response as a list of + This function makes a GET request to the specified endpoint of the Azure Fabric API, + using a bearer token for authentication. It returns the JSON response as a list of dictionaries containing the data returned by the API. Args: diff --git a/src/msfabricutils/core/lakehouse.py b/src/msfabricutils/core/lakehouse.py index d0a6704..f1e19d8 100644 --- a/src/msfabricutils/core/lakehouse.py +++ b/src/msfabricutils/core/lakehouse.py @@ -5,8 +5,8 @@ def get_workspace_lakehouses(workspace_id: str) -> list[dict]: """ Retrieves lakehouses for a specified workspace. - This function fetches a list of lakehouses from a specified workspace using the - `get_paginated` function. It constructs the appropriate endpoint and retrieves + This function fetches a list of lakehouses from a specified workspace using the + `get_paginated` function. It constructs the appropriate endpoint and retrieves paginated data associated with the workspace ID. Args: @@ -21,7 +21,7 @@ def get_workspace_lakehouses(workspace_id: str) -> list[dict]: lakehouses = get_workspace_lakehouses("12345678-1234-1234-1234-123456789012") ``` - """ + """ endpoint = f"workspaces/{workspace_id}/lakehouses" data_key = "value" @@ -32,8 +32,8 @@ def get_workspace_lakehouse_tables(workspace_id: str, lakehouse_id: str) -> list """ Retrieves tables for a specified lakehouse within a workspace. - This function fetches a list of tables from a specific lakehouse within a given workspace - using the `get_paginated` function. It constructs the appropriate endpoint and retrieves + This function fetches a list of tables from a specific lakehouse within a given workspace + using the `get_paginated` function. It constructs the appropriate endpoint and retrieves paginated data associated with the workspace and lakehouse IDs. Args: @@ -52,7 +52,7 @@ def get_workspace_lakehouse_tables(workspace_id: str, lakehouse_id: str) -> list "beefbeef-beef-beef-beef-beefbeefbeef" ) ``` - """ + """ endpoint = f"workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables" data_key = "data" diff --git a/src/msfabricutils/core/workspace.py b/src/msfabricutils/core/workspace.py index 39c0ca7..fbeb680 100644 --- a/src/msfabricutils/core/workspace.py +++ b/src/msfabricutils/core/workspace.py @@ -7,8 +7,8 @@ def get_workspaces() -> list[dict[str, Any]]: """ Retrieves a list of workspaces. - This function fetches a list of workspaces using the `get_paginated` function. - It constructs the appropriate endpoint and retrieves the paginated data associated + This function fetches a list of workspaces using the `get_paginated` function. + It constructs the appropriate endpoint and retrieves the paginated data associated with workspaces. Returns: @@ -20,7 +20,7 @@ def get_workspaces() -> list[dict[str, Any]]: workspaces = get_workspaces() ``` - """ + """ endpoint = "workspaces" data_key = "value" @@ -31,7 +31,7 @@ def get_workspace(workspace_id: str) -> dict[str, Any]: """ Retrieves details of a specified workspace. - This function fetches the details of a specific workspace by using the `get_page` + This function fetches the details of a specific workspace by using the `get_page` function. It constructs the appropriate endpoint based on the provided workspace ID. Args: @@ -46,7 +46,7 @@ def get_workspace(workspace_id: str) -> dict[str, Any]: workspace = get_workspace("12345678-1234-1234-1234-123456789012") ``` - """ + """ endpoint = f"workspaces/{workspace_id}" return get_page(endpoint) diff --git a/src/msfabricutils/etl/config.py b/src/msfabricutils/etl/config.py index a300619..59f9101 100644 --- a/src/msfabricutils/etl/config.py +++ b/src/msfabricutils/etl/config.py @@ -15,15 +15,17 @@ class IncrementalColumn: Attributes: name (str): The name of the incremental column. data_type (pl.DataType): The data type of the incremental column. - + Example: ```python incremental_column = IncrementalColumn("batch_id", pl.Int64) ``` """ + name: str data_type: pl.DataType + @dataclass class AuditColumn: """ @@ -32,7 +34,7 @@ class AuditColumn: Attributes: name (str): The name of the audit column. default_value (pl.Expr): The default value expression for the audit column. - + Example: ```python audit_column = AuditColumn( @@ -41,9 +43,11 @@ class AuditColumn: ) ``` """ + name: str default_value: pl.Expr + @dataclass() class Config: """ @@ -59,6 +63,7 @@ class Config: character_translation_map (dict[str, str]): A mapping of special characters to their translations. normalization_strategy (Callable[[str], str]): A function that takes a column name and returns the normalized name. """ + incremental_column: IncrementalColumn column_created_at: AuditColumn column_modified_at: AuditColumn @@ -71,11 +76,21 @@ class Config: def __init__(self): # TODO: Change to `__run_id` self.incremental_column = IncrementalColumn("batch_id", pl.Int64) - self.column_created_at = AuditColumn("__created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) - self.column_modified_at = AuditColumn("__modified_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) - self.column_deleted_at = AuditColumn("__deleted_at", pl.lit(None).cast(pl.Datetime("us", "UTC"))) - self.column_valid_from = AuditColumn("__valid_from", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))) - self.column_valid_to = AuditColumn("__valid_to", pl.lit(None).cast(pl.Datetime("us", "UTC"))) + self.column_created_at = AuditColumn( + "__created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC")) + ) + self.column_modified_at = AuditColumn( + "__modified_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC")) + ) + self.column_deleted_at = AuditColumn( + "__deleted_at", pl.lit(None).cast(pl.Datetime("us", "UTC")) + ) + self.column_valid_from = AuditColumn( + "__valid_from", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC")) + ) + self.column_valid_to = AuditColumn( + "__valid_to", pl.lit(None).cast(pl.Datetime("us", "UTC")) + ) self.character_translation_map = { " ": "_", "-": "_", @@ -107,7 +122,9 @@ def __init__(self): "$": "_dollar", "~": "_approximate", } - self.normalization_strategy = lambda name: to_snake_case(character_translation(name, self.character_translation_map)) + self.normalization_strategy = lambda name: to_snake_case( + character_translation(name, self.character_translation_map) + ) def get_static_audit_columns(self) -> list[AuditColumn]: """ @@ -115,7 +132,7 @@ def get_static_audit_columns(self) -> list[AuditColumn]: Returns: A list containing the static audit columns. - + Example: ```python static_columns = config.get_static_audit_columns() @@ -125,14 +142,14 @@ def get_static_audit_columns(self) -> list[AuditColumn]: self.column_created_at, self.column_valid_from, ] - + def get_dynamic_audit_columns(self) -> list[AuditColumn]: """ Returns a list of dynamic audit columns, namely the `modified_at` and `valid_to` columns. Returns: A list containing the dynamic audit columns. - + Example: ```python dynamic_columns = config.get_dynamic_audit_columns() @@ -164,7 +181,15 @@ def get_audit_columns(self) -> list[AuditColumn]: self.column_valid_to, ] -def create_config(incremental_column: IncrementalColumn, created_at: AuditColumn, modified_at: AuditColumn, deleted_at: AuditColumn, valid_from: AuditColumn, valid_to: AuditColumn) -> Config: + +def create_config( + incremental_column: IncrementalColumn, + created_at: AuditColumn, + modified_at: AuditColumn, + deleted_at: AuditColumn, + valid_from: AuditColumn, + valid_to: AuditColumn, +) -> Config: """ Creates a new Config instance with the provided audit and incremental columns. @@ -178,7 +203,7 @@ def create_config(incremental_column: IncrementalColumn, created_at: AuditColumn Returns: A new instance of the Config class. - + Example: ```python incremental_column = IncrementalColumn("batch_id", pl.Int64) @@ -210,10 +235,10 @@ def get_default_config() -> Config: Returns: A default instance of the Config class. - + Example: ```python default_config = get_default_config() ``` """ - return Config() \ No newline at end of file + return Config() diff --git a/src/msfabricutils/etl/helpers/get_incremental_column_value.py b/src/msfabricutils/etl/helpers/get_incremental_column_value.py index 0f5bc90..603b6ae 100644 --- a/src/msfabricutils/etl/helpers/get_incremental_column_value.py +++ b/src/msfabricutils/etl/helpers/get_incremental_column_value.py @@ -28,5 +28,10 @@ def get_incremental_column_value(table_uri: str, incremental_column: str) -> int if not DeltaTable.is_deltatable(table_uri, storage_options=storage_options): return 0 - return pl.scan_delta(table_uri, storage_options=storage_options).select(pl.col(incremental_column)).max().collect().item() - + return ( + pl.scan_delta(table_uri, storage_options=storage_options) + .select(pl.col(incremental_column)) + .max() + .collect() + .item() + ) diff --git a/src/msfabricutils/etl/helpers/merge_helpers.py b/src/msfabricutils/etl/helpers/merge_helpers.py index a09f769..9d7beef 100644 --- a/src/msfabricutils/etl/helpers/merge_helpers.py +++ b/src/msfabricutils/etl/helpers/merge_helpers.py @@ -5,7 +5,7 @@ def build_merge_predicate(columns: list[str]) -> str: """ Constructs a SQL merge predicate based on the provided column names. - This function generates a string that represents the condition for merging + This function generates a string that represents the condition for merging records based on equality of the specified columns. Args: @@ -36,7 +36,7 @@ def build_when_matched_update_predicate(columns: list[str]) -> str: """ Constructs a SQL predicate for when matched update conditions. - This function generates a string that represents the conditions for updating + This function generates a string that represents the conditions for updating records when a match is found based on the specified columns. Args: @@ -52,7 +52,7 @@ def build_when_matched_update_predicate(columns: list[str]) -> str: \"\"\" ( (target."id" != source."id") - OR (target."id" IS NULL AND source."id" IS NOT NULL) + OR (target."id" IS NULL AND source."id" IS NOT NULL) OR (target."id" IS NOT NULL AND source."id" IS NULL) ) OR ... \"\"\" @@ -75,7 +75,7 @@ def build_when_matched_update_columns(columns: list[str]) -> dict[str, str]: """ Constructs a mapping of columns to be updated when a match is found. - This function generates a dictionary where the keys are the target column + This function generates a dictionary where the keys are the target column names and the values are the corresponding source column names. Args: @@ -95,6 +95,6 @@ def build_when_matched_update_columns(columns: list[str]) -> dict[str, str]: ``` """ return { - f"target.{quote_identifier(column)}": f"source.{quote_identifier(column)}" + f"target.{quote_identifier(column)}": f"source.{quote_identifier(column)}" for column in columns } diff --git a/src/msfabricutils/etl/sinks/__init__.py b/src/msfabricutils/etl/sinks/__init__.py index 0b9a247..6b716ed 100644 --- a/src/msfabricutils/etl/sinks/__init__.py +++ b/src/msfabricutils/etl/sinks/__init__.py @@ -1,5 +1,3 @@ from .delta_table import upsert_scd_type_1 -__all__ = ( - "upsert_scd_type_1", -) +__all__ = ("upsert_scd_type_1",) diff --git a/src/msfabricutils/etl/sinks/delta_table.py b/src/msfabricutils/etl/sinks/delta_table.py index 546f2be..a93256f 100644 --- a/src/msfabricutils/etl/sinks/delta_table.py +++ b/src/msfabricutils/etl/sinks/delta_table.py @@ -10,7 +10,13 @@ from msfabricutils.etl.types import PolarsFrame -def upsert_scd_type_1(table_uri: str, df: PolarsFrame, primary_key_columns: str | list[str], config: Config | None = None, exclude_columns: str | list[str] | None = None) -> dict[str: str]: +def upsert_scd_type_1( + table_uri: str, + df: PolarsFrame, + primary_key_columns: str | list[str], + config: Config | None = None, + exclude_columns: str | list[str] | None = None, +) -> dict[str:str]: """ Upserts dataframe into a Delta table using Slowly Changing Dimension (SCD) Type 1. @@ -52,10 +58,9 @@ def upsert_scd_type_1(table_uri: str, df: PolarsFrame, primary_key_columns: str primary_key_columns = [primary_key_columns] primary_key_columns = [config.normalization_strategy(column) for column in primary_key_columns] - if isinstance(exclude_columns, str): exclude_columns = [exclude_columns] - + if isinstance(df, pl.LazyFrame): df = df.collect() @@ -69,13 +74,16 @@ def upsert_scd_type_1(table_uri: str, df: PolarsFrame, primary_key_columns: str merge_predicate = build_merge_predicate(primary_key_columns) predicate_update_columns = [ - column for column in df.column_names - if column not in primary_key_columns + exclude_columns + static_audit_columns + dynamic_audit_columns + column + for column in df.column_names + if column + not in primary_key_columns + exclude_columns + static_audit_columns + dynamic_audit_columns ] when_matched_update_predicates = build_when_matched_update_predicate(predicate_update_columns) update_columns = [ - column for column in df.column_names + column + for column in df.column_names if column not in primary_key_columns + exclude_columns + static_audit_columns ] @@ -83,16 +91,14 @@ def upsert_scd_type_1(table_uri: str, df: PolarsFrame, primary_key_columns: str table_merger = ( dt.merge( df, - source_alias = "source", - target_alias = "target", - predicate = merge_predicate, + source_alias="source", + target_alias="target", + predicate=merge_predicate, ) .when_matched_update( - predicate = when_matched_update_predicates, - updates = when_matched_update_columns + predicate=when_matched_update_predicates, updates=when_matched_update_columns ) .when_not_matched_insert_all() ) return table_merger.execute() - diff --git a/src/msfabricutils/etl/sources/delta_table.py b/src/msfabricutils/etl/sources/delta_table.py index dc1a991..6128578 100644 --- a/src/msfabricutils/etl/sources/delta_table.py +++ b/src/msfabricutils/etl/sources/delta_table.py @@ -18,7 +18,7 @@ def source_delta(table_uri: str, eager: bool = False) -> PolarsFrame: Example: ```python from msfabricutils.etl import source_delta - + workspace_id = "12345678-1234-1234-1234-123456789012" lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef" table_name = "my-delta-table" @@ -32,13 +32,6 @@ def source_delta(table_uri: str, eager: bool = False) -> PolarsFrame: storage_options = get_storage_options() if table_uri.startswith("abfss://") else None if eager: - return pl.read_delta( - source=table_uri, - storage_options=storage_options - ) - - return pl.scan_delta( - source=table_uri, - storage_options=storage_options - ) + return pl.read_delta(source=table_uri, storage_options=storage_options) + return pl.scan_delta(source=table_uri, storage_options=storage_options) diff --git a/src/msfabricutils/etl/sources/parquet.py b/src/msfabricutils/etl/sources/parquet.py index d327d08..b1a8fed 100644 --- a/src/msfabricutils/etl/sources/parquet.py +++ b/src/msfabricutils/etl/sources/parquet.py @@ -45,14 +45,9 @@ def source_parquet(table_uri: str, eager: bool = False) -> PolarsFrame: if eager: return pl.read_parquet( - source=table_uri, - hive_partitioning=True, - storage_options=storage_options + source=table_uri, hive_partitioning=True, storage_options=storage_options ) return pl.scan_parquet( - source=table_uri, - hive_partitioning=True, - storage_options=storage_options + source=table_uri, hive_partitioning=True, storage_options=storage_options ) - diff --git a/src/msfabricutils/etl/transforms/transforms.py b/src/msfabricutils/etl/transforms/transforms.py index 2179dd0..3d8ceb9 100644 --- a/src/msfabricutils/etl/transforms/transforms.py +++ b/src/msfabricutils/etl/transforms/transforms.py @@ -1,4 +1,3 @@ - import polars as pl from polars.exceptions import ColumnNotFoundError @@ -29,19 +28,21 @@ def add_audit_columns_transform(df: PolarsFrame, config: Config) -> PolarsFrame: ``` """ - + audit_columns = config.get_audit_columns() df = df.with_columns( - [ - audit_column.default_value.alias(audit_column.name) - for audit_column in audit_columns - ] + [audit_column.default_value.alias(audit_column.name) for audit_column in audit_columns] ) return df -def deduplicate_transform(df: PolarsFrame, primary_key_columns: str | list[str] | None = None, deduplication_order_columns: str | list[str] | None = None, deduplication_order_descending: bool | list[bool] = True) -> PolarsFrame: +def deduplicate_transform( + df: PolarsFrame, + primary_key_columns: str | list[str] | None = None, + deduplication_order_columns: str | list[str] | None = None, + deduplication_order_descending: bool | list[bool] = True, +) -> PolarsFrame: """ Removes duplicate rows from the DataFrame based on primary key columns. @@ -65,10 +66,10 @@ def deduplicate_transform(df: PolarsFrame, primary_key_columns: str | list[str] deduped_df = deduplicate_transform(df, primary_key_columns=["id"]) ``` """ - + if isinstance(primary_key_columns, str): primary_key_columns = [primary_key_columns] - + # Temporary fix start # See GitHub issue: https://github.com/pola-rs/polars/issues/20209 # TODO: Remove this once the issue is fixed. @@ -82,12 +83,16 @@ def deduplicate_transform(df: PolarsFrame, primary_key_columns: str | list[str] if primary_key_columns: for column in primary_key_columns: if column not in columns: - raise ColumnNotFoundError(f"unable to find column `{column}`. Valid columns: {columns}") - + raise ColumnNotFoundError( + f"unable to find column `{column}`. Valid columns: {columns}" + ) + # Temporary fix end if deduplication_order_columns: - df = df.sort(deduplication_order_columns, descending=deduplication_order_descending, nulls_last=True) + df = df.sort( + deduplication_order_columns, descending=deduplication_order_descending, nulls_last=True + ) df = df.unique(subset=primary_key_columns, keep="first") @@ -122,14 +127,12 @@ def normalize_column_names_transform(df: PolarsFrame, config: Config) -> PolarsF else: columns = df.schema.names() - column_mapping = { - old_name: config.normalization_strategy(old_name) - for old_name in columns - } + column_mapping = {old_name: config.normalization_strategy(old_name) for old_name in columns} df = df.rename(column_mapping) return df + # def filter_source(df: PolarsFrame, filter: Callable[[PolarsFrame], PolarsFrame]) -> PolarsFrame: -# return filter(df) \ No newline at end of file +# return filter(df) diff --git a/tests/helpers/test_escape_object_name.py b/tests/common/test_quote_identifier.py similarity index 89% rename from tests/helpers/test_escape_object_name.py rename to tests/common/test_quote_identifier.py index cca79f8..87670e1 100644 --- a/tests/helpers/test_escape_object_name.py +++ b/tests/common/test_quote_identifier.py @@ -6,5 +6,4 @@ def test_quote_character(): assert quote_identifier("my_object", "'") == "'my_object'" assert quote_identifier('"my_object"') == '"my_object"' assert quote_identifier("'''my_object'''", "'") == "'my_object'" - assert quote_identifier("") == '""' - + assert quote_identifier("") == '""' diff --git a/tests/helpers/test_separator_indices.py b/tests/common/test_separator_indices.py similarity index 100% rename from tests/helpers/test_separator_indices.py rename to tests/common/test_separator_indices.py diff --git a/tests/helpers/test_string_normalization.py b/tests/common/test_string_normalization.py similarity index 88% rename from tests/helpers/test_string_normalization.py rename to tests/common/test_string_normalization.py index dee79cf..72f86a7 100644 --- a/tests/helpers/test_string_normalization.py +++ b/tests/common/test_string_normalization.py @@ -16,7 +16,11 @@ def test_to_snake_case(): assert to_snake_case("__modified_at") == "__modified_at" assert to_snake_case("__valid_from") == "__valid_from" assert to_snake_case("__valid_to") == "__valid_to" - assert to_snake_case("this-contains_ ALLKinds OfWord_Boundaries") == "this_contains_all_kinds_of_word_boundaries" + assert ( + to_snake_case("this-contains_ ALLKinds OfWord_Boundaries") + == "this_contains_all_kinds_of_word_boundaries" + ) + def test_special_character_translation(): assert character_translation("Profit&Loss", {"&": "_and_"}) == "Profit_and_Loss" @@ -28,7 +32,6 @@ def test_special_character_translation(): def test_default_normalization_strategy(): - config = get_default_config() def combined_normalization(text: str) -> str: @@ -41,4 +44,7 @@ def combined_normalization(text: str) -> str: assert combined_normalization("Profit\\Loss") == "profit_or_loss" assert combined_normalization("Profit(Loss") == "profit_loss" assert combined_normalization("Profit)Loss") == "profit_loss" - assert combined_normalization("Growth% and Loss + EBIDTA") == "growth_percent_and_loss_plus_ebidta" \ No newline at end of file + assert ( + combined_normalization("Growth% & Loss + EBIDTA") + == "growth_percent_and_loss_plus_ebidta" + ) diff --git a/tests/helpers/test_merge_helpers.py b/tests/etl/helpers/test_merge_helpers.py similarity index 89% rename from tests/helpers/test_merge_helpers.py rename to tests/etl/helpers/test_merge_helpers.py index e243197..10f58b6 100644 --- a/tests/helpers/test_merge_helpers.py +++ b/tests/etl/helpers/test_merge_helpers.py @@ -16,10 +16,12 @@ def test_build_merge_predicate(): """ actual_output = build_merge_predicate(columns) - assert re.sub(r"\s+", " ", actual_output.strip()) == re.sub(r"\s+", " ", expected_output.strip()) + assert re.sub(r"\s+", " ", actual_output.strip()) == re.sub( + r"\s+", " ", expected_output.strip() + ) -def test_build_when_matched_update_predicate(): +def test_build_when_matched_update_predicate(): column_names = ["column1", "column2", "column3"] expected_output = """ ( @@ -42,7 +44,9 @@ def test_build_when_matched_update_predicate(): """ actual_output = build_when_matched_update_predicate(column_names) - assert re.sub(r"\s+", " ", actual_output.strip()) == re.sub(r"\s+", " ", expected_output.strip()) + assert re.sub(r"\s+", " ", actual_output.strip()) == re.sub( + r"\s+", " ", expected_output.strip() + ) def test_build_when_matched_update_columns(): @@ -50,8 +54,8 @@ def test_build_when_matched_update_columns(): expected_output = { 'target."column1"': 'source."column1"', 'target."column2"': 'source."column2"', - 'target."column3"': 'source."column3"' + 'target."column3"': 'source."column3"', } actual_output = build_when_matched_update_columns(column_names) - assert actual_output == expected_output \ No newline at end of file + assert actual_output == expected_output diff --git a/tests/etl/test_end_to_end.py b/tests/etl/test_end_to_end.py index 6fd5fcf..9792c42 100644 --- a/tests/etl/test_end_to_end.py +++ b/tests/etl/test_end_to_end.py @@ -17,16 +17,17 @@ @freeze_time("2024-01-01") def test_end_to_end(tmp_path): - source_table_path = str(tmp_path / "source_table") target_table_path = str(tmp_path / "target_table") # Create a source table - source_df = pl.DataFrame({ - "ID": [1, 2, 3, 1, 2], - "FirstName": ["Alice", "Bob", "Charlie", "AliceS", "BobB"], - "batch_id": [1, 1, 1, 2, 2] - }) + source_df = pl.DataFrame( + { + "ID": [1, 2, 3, 1, 2], + "FirstName": ["Alice", "Bob", "Charlie", "AliceS", "BobB"], + "batch_id": [1, 1, 1, 2, 2], + } + ) source_df.write_delta(source_table_path) # Get the default config @@ -42,7 +43,9 @@ def test_end_to_end(tmp_path): filtered_df = source_df.filter(pl.col("batch_id") > incremental_column_value) # Deduplicate the source dataframe - deduped_df = deduplicate_transform(filtered_df, primary_key_columns="ID", deduplication_order_columns="batch_id") + deduped_df = deduplicate_transform( + filtered_df, primary_key_columns="ID", deduplication_order_columns="batch_id" + ) # Normalize the column names normalized_df = normalize_column_names_transform(deduped_df, config) @@ -76,9 +79,9 @@ def test_end_to_end(tmp_path): "__deleted_at": pl.Datetime(time_zone="UTC"), "__valid_to": pl.Datetime(time_zone="UTC"), "__valid_from": pl.Datetime(time_zone="UTC"), - } - ) + }, + ) print(target_df) - + assert_frame_equal(target_df, expected_df, check_row_order=False) diff --git a/tests/etl/transforms/test_transforms.py b/tests/etl/transforms/test_transforms.py index d38303a..aac702d 100644 --- a/tests/etl/transforms/test_transforms.py +++ b/tests/etl/transforms/test_transforms.py @@ -1,4 +1,3 @@ - from datetime import datetime, timezone import polars as pl @@ -12,12 +11,8 @@ @freeze_time("2024-12-06 12:00:00") def test_add_audit_columns(): config = get_default_config() - - df = pl.DataFrame( - { - "data": [1, 2, 3] - } - ) + + df = pl.DataFrame({"data": [1, 2, 3]}) actual_df = add_audit_columns_transform(df, config) @@ -34,8 +29,8 @@ def test_add_audit_columns(): }, schema_overrides={ "__deleted_at": pl.Datetime(time_zone="UTC"), - "__valid_to": pl.Datetime(time_zone="UTC") - } + "__valid_to": pl.Datetime(time_zone="UTC"), + }, ) assert_frame_equal(actual_df, expected_df) From 98757786ef29a1aa5c95f89ff4d30559fbdc4f07 Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:39:33 +0100 Subject: [PATCH 15/16] add polars dependency --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index cd8f6f9..efd8f86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,8 @@ dependencies = [ "duckdb>=1.1.3", "deltalake>=0.22.0", "sqlglot>=25.32.1", + "polars-lts-cpu==1.16.0 ; sys_platform == 'darwin'", + "polars==1.16.0 ; sys_platform == 'win32' or sys_platform == 'linux'", ] [project.urls] From f14f1607d05a93d6315836715d63a3a39a48b0fc Mon Sep 17 00:00:00 2001 From: jsj Date: Sun, 8 Dec 2024 16:40:04 +0100 Subject: [PATCH 16/16] ruff format --- src/msfabricutils/core/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msfabricutils/core/__init__.py b/src/msfabricutils/core/__init__.py index 5a17bc9..3cac2ff 100644 --- a/src/msfabricutils/core/__init__.py +++ b/src/msfabricutils/core/__init__.py @@ -8,5 +8,5 @@ "get_workspace_lakehouses", "get_workspace_lakehouse_tables", "get_onelake_access_token", - "get_fabric_bearer_token" + "get_fabric_bearer_token", )