Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(etl): created starter pack for ETL #23

Merged
merged 16 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,7 @@ dmypy.json
cython_debug/

# Ruff
.ruff_cache
.ruff_cache

# MacOS
**/.DS_Store
3 changes: 3 additions & 0 deletions docs/common/fabric-duckdb-connection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Fabric DuckDB Connection

::: msfabricutils.common.fabric_duckdb_connection
5 changes: 5 additions & 0 deletions docs/common/utilities.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Utilities

::: msfabricutils.common.quote_identifier
::: msfabricutils.common.separator_indices
::: msfabricutils.common.string_normalization
3 changes: 3 additions & 0 deletions docs/core/authentication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Authentication

::: msfabricutils.core.auth
4 changes: 4 additions & 0 deletions docs/core/fabric-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Fabric API

::: msfabricutils.core.lakehouse
::: msfabricutils.core.workspace
3 changes: 3 additions & 0 deletions docs/etl/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ETL

The `msfabricutils.etl` module provides a set of functions and classes to facilitate ETL (Extract, Transform, Load) operations in the Microsoft Fabric environment.
3 changes: 3 additions & 0 deletions docs/etl/sinks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Sinks

::: msfabricutils.etl.sinks
4 changes: 4 additions & 0 deletions docs/etl/sources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Sources

::: msfabricutils.etl.sources

3 changes: 3 additions & 0 deletions docs/etl/transforms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Transforms

::: msfabricutils.etl.transforms
5 changes: 3 additions & 2 deletions docs/gen_ref_pages.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Generate the code reference pages."""

from pathlib import Path

import mkdocs_gen_files

nav = mkdocs_gen_files.Nav()
Expand All @@ -26,11 +27,11 @@

with mkdocs_gen_files.open(full_doc_path, "w") as fd:
identifier = ".".join(parts)
#print(f"# {identifier}", file=fd)
# print(f"# {identifier}", file=fd)
print("::: " + identifier, file=fd)

mkdocs_gen_files.set_edit_path(full_doc_path, path)

# Generate navigation summary
with mkdocs_gen_files.open("reference/SUMMARY.md", "w") as nav_file:
nav_file.writelines(nav.build_literate_nav())
nav_file.writelines(nav.build_literate_nav())
62 changes: 20 additions & 42 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,55 +1,33 @@
# MSFabricUtils

A collection of **Spark-free** Python utilities for working with Microsoft Fabric in the Python Notebook experience.
A collection of **Spark-free** and **local-first** Python utilities for working with Microsoft Fabric Lakehouses locally or in the Fabric Python Notebook experience.

## Features

### Local development first
- Aim to provide a local development support "within" Fabric
- **ETL Utilities** - Extract, Transform, Load data from and to Microsoft Fabric Lakehouses. While the utilities can be configured to fit different needs, its defaults are highly opinionated for what we believe are sensible defaults for many use cases
- **Fabric API** - Access Fabric APIs from Python, such as workspaces and lakehouses
- **Local development first** - Aim to provide a local development for Microsoft Fabric solutions
- **DuckDB Connection**
- Seamless integration between DuckDB and Microsoft Fabric Lakehouses
- Cross-lakehouse and cross-workspace querying
- Delta Lake writing features

### DuckDB Connection
- Seamless integration between DuckDB and Microsoft Fabric Lakehouses
- Cross-lakehouse and cross-workspace querying
- Delta Lake writing features
## Core dependencies

### Ideas for improvements
- ETL/ELT Helpers
- Lakehouse Management Tools
- Got an idea? Add an issue on [github](https://www.github.com/mrjsj/msfabricutils/issues)
MSFabricUtils is built on top of modern, high-performance Python libraries:

## Installation
- **[delta-rs](https://delta-io.github.io/delta-rs)** - A native Rust implementation of Delta Lake, providing fast and reliable Delta Lake operations without the need for a Spark cluster
- **[Polars](https://pola.rs)** - A lightning-fast DataFrame library written in Rust, offering superior performance for data manipulation tasks
- **[DuckDB](https://duckdb.org)** - An embedded analytical database engine, enabling SQL queries with at blazing speed

```bash
pip install msfabricutils
```
These dependencies were chosen specifically to:

## Quick Start
- Provide Spark-like functionality without the overhead of a Spark cluster
- Enable high-performance data processing on a single machine
- Support both local development and cloud deployment scenarios
- Maintain compatibility with Delta Lake format used in Microsoft Fabric

Ensure you are working in a Python Notebook:

![Select Python Notebook](images/select-python-notebooks.png)

```python
from msfabricutils import FabricDuckDBConnection, get_onelake_access_token

#Initialize connection
access_token = get_onelake_access_token()

conn = FabricDuckDBConnection(access_token=access_token)

#Register lakehouses
conn.register_workspace_lakehouses(
workspace_id = "your-workspace-id",
lakehouses = ["sales", "marketing"]
)

# Query across lakehouses
df = conn.sql("""
SELECT
*
FROM sales.customers
JOIN marketing.segments USING (customer_id)
""").df()
```

## Ideas for improvements
Got an idea? Add an issue on [github](https://www.github.com/mrjsj/msfabricutils/issues)!

99 changes: 99 additions & 0 deletions docs/usage/etl.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# ETL

The ETL module provides a set of helper functions for extracting, transforming, and loading data in a Fabric Lakehouse.



## Basic extract, transform, load

```python
from msfabricutils.etl import (
source_parquet,
upsert_scd_type_1
)
import polars as pl

# Create a source parquet file
df = pl.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"]
})
df.write_parquet("source.parquet")

# Read the source parquet file
source_df = source_parquet("source.parquet")

# Upsert to a target table
upsert_scd_type_1(dedup_df, "target_table")
```



## Advanced example

Consider an example, where

- The source needs to be incrementially loaded
- The source has duplicate rows
- The target table need audit columns, such as created_at, updated_at etc.
- The target table should have normalized column names

The following code will read the delta table, deduplicate the rows, normalize the column names, add audit columns and upsert to the target table.
The result will be a new row for AliceS and BobB in the target table.

```python
from msfabricutils.etl import (
get_default_config,
source_delta,
get_incremental_column_value,
deduplicate_transform,
normalize_column_names_transform,
add_audit_columns_transform,
upsert_scd_type_1,
)

source_table_path = "source_table"
target_table_path = "target_table"

# Create a source table
source_df = pl.DataFrame({
"ID": [1, 2, 3, 1, 2],
"FirstName": ["Alice", "Bob", "Charlie", "AliceS", "BobB"],
"batch_id": [1, 1, 1, 2, 2]
})
source_df.write_delta(source_table_path)

# Get the default config
config = get_default_config()

# Read the source delta table
source_df = source_delta(source_table_path)

# Get the incremental column value
incremental_column_value = get_incremental_column_value(target_table_path, "batch_id")

# Filter the source dataframe to only get the rows with a modified_at greater than the incremental column value
filtered_df = source_df.filter(pl.col("batch_id") > incremental_column_value)

# Deduplicate the source dataframe
deduped_df = deduplicate_transform(filtered_df, primary_key_columns="ID", deduplication_order_columns="batch_id")

# Normalize the column names
normalized_df = normalize_column_names_transform(deduped_df, config)

# Add audit columns
audit_df = add_audit_columns_transform(normalized_df, config)

# Upsert to a target table
upsert_scd_type_1(target_table_path, audit_df, primary_key_columns="ID", config=config)


```

| id | first_name | batch_id | __created_at | __modified_at | __deleted_at | __valid_from | __valid_to |
| --- | ---------- | -------- | ----------------- | ----------------- | ----------------- | ----------------------- | ----------------- |
| i64 | str | i64 | datetime[μs, UTC] | datetime[μs, UTC] | datetime[μs, UTC] | datetime[μs, UTC] | datetime[μs, UTC] |
| 3 | Charlie | 1 | 2024-01-01 00:00:00 UTC | 2024-01-01 00:00:00 UTC | null | 2024-01-01 00:00:00 UTC | null |
| 2 | BobB | 2 | 2024-01-01 00:00:00 UTC | 2024-01-01 00:00:00 UTC | null | 2024-01-01 00:00:00 UTC | null |
| 1 | AliceS | 2 | 2024-01-01 00:00:00 UTC | 2024-01-01 00:00:00 UTC | null | 2024-01-01 00:00:00 UTC | null |

11 changes: 11 additions & 0 deletions docs/usage/fabric-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Fabric API

A collection of helper functions for working with the Fabric API.

## List workspaces

```python
from msfabricutils.fabric import get_workspaces

get_workspaces()
```
7 changes: 7 additions & 0 deletions docs/usage/installation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Installation

The `msfabricutils` package can be installed using pip:

```bash
pip install msfabricutils
```
45 changes: 33 additions & 12 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ theme:
toggle:
icon: material/brightness-4
name: Switch to light mode
navigation_depth: 3
features:
- navigation.instant
- navigation.tracking
- navigation.sections
- navigation.instant
- navigation.expand
- navigation.top
- navigation.tabs
- navigation.indexes
- navigation.tabs.sticky
- navigation.footer
- content.tabs.link
- content.code.annotation
- content.code.copy

plugins:
- search
- search:
lang: en
- gen-files:
scripts:
- docs/gen_ref_pages.py
Expand All @@ -36,31 +43,45 @@ plugins:
python:
paths: [src]
options:
show_root_toc_entry: true
docstring_style: google
show_source: true
show_root_heading: true
show_source: false
show_root_heading: false
filters: ["!^_", "^__init__$"]
show_object_full_path: false
heading_level: 1
members_order: source
separate_signature: true
show_signature_annotations: true
docstring_section_style: table

nav:
- Home: index.md
- Home:
- index.md
- Usage:
- usage/installation.md
- usage/etl.md
- usage/fabric-api.md
- API Reference:
- FabricDuckDBConnection: reference/msfabricutils/fabric_duckdb_connection.md
- Common:
- common/fabric-duckdb-connection.md
- common/utilities.md
- Core:
- Authentication: reference/msfabricutils/core/auth.md
- Utilities:
- Lakehouse: reference/msfabricutils/core/lakehouse.md
- Workspace: reference/msfabricutils/core/workspace.md
- core/authentication.md
- core/fabric-api.md
- ETL:
- etl/index.md
- etl/sources.md
- etl/transforms.md
- etl/sinks.md

# Formatting options
markdown_extensions:
- admonition
- pymdownx.details
- attr_list
- pymdownx.superfences
- tables
- pymdownx.tabbed:
alternate_style: true
- pymdownx.snippets:
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies = [
"duckdb>=1.1.3",
"deltalake>=0.22.0",
"sqlglot>=25.32.1",
"polars-lts-cpu==1.16.0 ; sys_platform == 'darwin'",
"polars==1.16.0 ; sys_platform == 'win32' or sys_platform == 'linux'",
]

[project.urls]
Expand All @@ -41,13 +43,16 @@ docs = [
dev = [
"pytest>=6.2.5",
"ruff>=0.8.1",
"freezegun>=1.5.1",
]

[tool.ruff]
line-length = 100

[tool.ruff.lint]
extend-select = ["Q001", "I001"]

[tool.pytest.ini_options]
pythonpath = [
"src",
]

2 changes: 1 addition & 1 deletion src/msfabricutils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from msfabricutils.fabric_duckdb_connection import FabricDuckDBConnection
from msfabricutils.common.fabric_duckdb_connection import FabricDuckDBConnection
from msfabricutils.core import (
get_fabric_bearer_token,
get_onelake_access_token,
Expand Down
10 changes: 10 additions & 0 deletions src/msfabricutils/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .quote_identifier import quote_identifier
from .separator_indices import _separator_indices
from .string_normalization import character_translation, to_snake_case

__all__ = (
"_separator_indices",
"quote_identifier",
"to_snake_case",
"character_translation",
)
Loading
Loading