From 5d935eedbac8199e5fbf4022d291abfba8198608 Mon Sep 17 00:00:00 2001 From: Leah Procopi Date: Fri, 24 Jan 2025 11:29:16 -0800 Subject: [PATCH] Feature: Custom Iceberg `base_location_root` (#1289) * update relation & add tests * switch if/else pattern * add change log * add test for dynamic table with path and subpath * modify base_location config * update to base_location_root, include schema and table name in path * resolve linting error (f-string nested double quotes) * resolve linting error (move to new line) * add unit tests & make final edits to functional tests * lint unit test * update changie * update dt test cases for iceberg to be dynamic --------- Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com> --- .../unreleased/Features-20250113-133414.yaml | 6 ++ dbt/adapters/snowflake/impl.py | 1 + dbt/adapters/snowflake/relation.py | 5 +- tests/functional/iceberg/models.py | 63 +++++++++++++++ tests/functional/iceberg/test_table_basic.py | 12 ++- tests/unit/test_iceberg_location.py | 79 +++++++++++++++++++ 6 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 .changes/unreleased/Features-20250113-133414.yaml create mode 100644 tests/unit/test_iceberg_location.py diff --git a/.changes/unreleased/Features-20250113-133414.yaml b/.changes/unreleased/Features-20250113-133414.yaml new file mode 100644 index 000000000..869ed6b17 --- /dev/null +++ b/.changes/unreleased/Features-20250113-133414.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Added support for custom iceberg base_location_root +time: 2025-01-13T13:34:14.326047-08:00 +custom: + Author: LProcopi15 + Issue: "1284" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 7ccff9f8a..6ae8ef183 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -54,6 +54,7 @@ class SnowflakeConfig(AdapterConfig): # extended formats table_format: Optional[str] = None external_volume: Optional[str] = None + base_location_root: Optional[str] = None base_location_subpath: Optional[str] = None diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 54db21924..f3ee3e510 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -204,7 +204,10 @@ def get_ddl_prefix_for_alter(self) -> str: return "" def get_iceberg_ddl_options(self, config: RelationConfig) -> str: - base_location: str = f"_dbt/{self.schema}/{self.name}" + # If the base_location_root config is supplied, overwrite the default value ("_dbt/") + base_location: str = ( + f"{config.get('base_location_root', '_dbt')}/{self.schema}/{self.name}" + ) if subpath := config.get("base_location_subpath"): base_location += f"/{subpath}" diff --git a/tests/functional/iceberg/models.py b/tests/functional/iceberg/models.py index 6433f74bf..e6da6aca4 100644 --- a/tests/functional/iceberg/models.py +++ b/tests/functional/iceberg/models.py @@ -23,6 +23,37 @@ select * from {{ ref('first_table') }} """ +_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH = """ +{{ + config( + transient = "true", + materialized = "table", + cluster_by=['id'], + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_root="root_path", + ) +}} + +select * from {{ ref('first_table') }} +""" + +_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH = """ +{{ + config( + transient = "true", + materialized = "table", + cluster_by=['id'], + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_root="root_path", + base_location_subpath="subpath", + ) +}} + +select * from {{ ref('first_table') }} +""" + _MODEL_BASIC_DYNAMIC_TABLE_MODEL = """ {{ config( materialized='dynamic_table', @@ -36,6 +67,38 @@ select * from {{ ref('first_table') }} """ +_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH = """ +{{ + config( + transient = "transient", + materialized = "dynamic_table", + cluster_by=['id'], + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_root="root_path", + ) +}} + +select * from {{ ref('first_table') }} +""" + +_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH = """ +{{ + config( + transient = "true", + materialized = "dynamic_table", + cluster_by=['id'], + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_root="root_path", + base_location_subpath='subpath', + ) +}} + +select * from {{ ref('first_table') }} +""" + + _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH = """ {{ config( materialized='dynamic_table', diff --git a/tests/functional/iceberg/test_table_basic.py b/tests/functional/iceberg/test_table_basic.py index e835a5fce..faf4b34f7 100644 --- a/tests/functional/iceberg/test_table_basic.py +++ b/tests/functional/iceberg/test_table_basic.py @@ -7,7 +7,11 @@ from tests.functional.iceberg.models import ( _MODEL_BASIC_TABLE_MODEL, _MODEL_BASIC_ICEBERG_MODEL, + _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH, + _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH, _MODEL_BASIC_DYNAMIC_TABLE_MODEL, + _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH, + _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH, _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH, _MODEL_BUILT_ON_ICEBERG_TABLE, _MODEL_TABLE_BEFORE_SWAP, @@ -26,14 +30,18 @@ def models(self): return { "first_table.sql": _MODEL_BASIC_TABLE_MODEL, "iceberg_table.sql": _MODEL_BASIC_ICEBERG_MODEL, + "iceberg_tableb.sql": _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH, + "iceberg_tablec.sql": _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH, "table_built_on_iceberg_table.sql": _MODEL_BUILT_ON_ICEBERG_TABLE, "dynamic_table.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL, - "dynamic_tableb.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH, + "dynamic_tableb.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH, + "dynamic_tablec.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH, + "dynamic_tabled.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH, } def test_iceberg_tables_build_and_can_be_referred(self, project): run_results = run_dbt() - assert len(run_results) == 5 + assert len(run_results) == 9 class TestIcebergTableTypeBuildsOnExistingTable: diff --git a/tests/unit/test_iceberg_location.py b/tests/unit/test_iceberg_location.py new file mode 100644 index 000000000..dca82b47e --- /dev/null +++ b/tests/unit/test_iceberg_location.py @@ -0,0 +1,79 @@ +import pytest +from dbt.adapters.snowflake.relation import SnowflakeRelation + + +@pytest.fixture +def iceberg_config() -> dict: + """Fixture providing standard Iceberg configuration.""" + return { + "schema": "my_schema", + "identifier": "my_table", + "external_volume": "s3_iceberg_snow", + "base_location_root": "root_path", + "base_location_subpath": "subpath", + } + + +def get_actual_base_location(config: dict[str, str]) -> str: + """Get the actual base location from the configuration by parsing the DDL predicates.""" + + relation = SnowflakeRelation.create( + schema=config["schema"], + identifier=config["identifier"], + ) + + actual_ddl_predicates = relation.get_iceberg_ddl_options(config).strip() + actual_base_location = actual_ddl_predicates.split("base_location = ")[1] + + return actual_base_location + + +def test_iceberg_path_and_subpath(iceberg_config: dict[str, str]): + """Test when base_location_root and base_location_subpath are provided""" + expected_base_location = ( + f"'{iceberg_config['base_location_root']}/" + f"{iceberg_config['schema']}/" + f"{iceberg_config['identifier']}/" + f"{iceberg_config['base_location_subpath']}'" + ).strip() + + assert get_actual_base_location(iceberg_config) == expected_base_location + + +def test_iceberg_only_subpath(iceberg_config: dict[str, str]): + """Test when only base_location_subpath is provided""" + del iceberg_config["base_location_root"] + + expected_base_location = ( + f"'_dbt/" + f"{iceberg_config['schema']}/" + f"{iceberg_config['identifier']}/" + f"{iceberg_config['base_location_subpath']}'" + ).strip() + + assert get_actual_base_location(iceberg_config) == expected_base_location + + +def test_iceberg_only_path(iceberg_config: dict[str, str]): + """Test when only base_location_root is provided""" + del iceberg_config["base_location_subpath"] + + expected_base_location = ( + f"'{iceberg_config['base_location_root']}/" + f"{iceberg_config['schema']}/" + f"{iceberg_config['identifier']}'" + ).strip() + + assert get_actual_base_location(iceberg_config) == expected_base_location + + +def test_iceberg_no_path(iceberg_config: dict[str, str]): + """Test when no base_location_root or is base_location_subpath provided""" + del iceberg_config["base_location_root"] + del iceberg_config["base_location_subpath"] + + expected_base_location = ( + f"'_dbt/" f"{iceberg_config['schema']}/" f"{iceberg_config['identifier']}'" + ).strip() + + assert get_actual_base_location(iceberg_config) == expected_base_location