Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add refresh_mode and initialize as dynamic table options #1081

Merged
merged 19 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7ba40c9
feature: Update Dynamic table parameters on materialization
HenkvanDyk Jan 30, 2024
17eb974
refactor: Update doc strings
HenkvanDyk Jan 31, 2024
c92522e
feature: changelog
HenkvanDyk Jan 31, 2024
addaa63
Merge branch 'main' into feature/ADAP-1076
HenkvanDyk Feb 9, 2024
932a58f
Merge branch 'main' into feature/ADAP-1076
mikealfare Jun 7, 2024
13dc7de
Merge remote-tracking branch 'HenkvanDyk/feature/ADAP-1076' into dyna…
mikealfare Jun 12, 2024
6076e3a
add new attributes to the describe dynamic table query
mikealfare Jun 12, 2024
1c2f222
incorporate new attributes into the changeset
mikealfare Jun 12, 2024
c2905cc
add the new config to the dynamic table fixture
mikealfare Jun 13, 2024
f44ebf6
fix the test runner to inspect the object in the database for assertions
mikealfare Jun 13, 2024
761a991
add new attributes to the create and replace statements
mikealfare Jun 13, 2024
c2bf2b3
add a method to create the config on the relation
mikealfare Jun 13, 2024
08ddd42
remove initialize from describe query
mikealfare Jun 13, 2024
405420f
Merge branch 'main' into dynamic-table-parameters
mikealfare Jun 13, 2024
8606770
update changelog entry
mikealfare Jun 13, 2024
c7781a4
remove comment from relation parsing since it's handled via persist_docs
mikealfare Jun 14, 2024
99ba501
Merge branch 'main' into dynamic-table-parameters
mikealfare Jun 17, 2024
3db741b
add self to changelog
mikealfare Jun 17, 2024
657d498
Merge branch 'main' into dynamic-table-parameters
mikealfare Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240131-125318.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support refresh_mode and initialize parameters for dynamic tables
time: 2024-01-31T12:53:18.111616Z
custom:
Author: HenkvanDyk,mikealfare
Issue: "1076"
28 changes: 27 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
from typing import FrozenSet, Optional, Type

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import RelationConfigChangeAction, RelationResults
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError

from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeQuotePolicy,
Expand All @@ -21,6 +27,9 @@ class SnowflakeRelation(BaseRelation):
type: Optional[SnowflakeRelationType] = None
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
SnowflakeRelationType.DynamicTable: SnowflakeDynamicTableConfig,
}
renameable_relations: FrozenSet[SnowflakeRelationType] = field(
default_factory=lambda: frozenset(
{
Expand Down Expand Up @@ -52,6 +61,17 @@ def DynamicTable(cls) -> str:
def get_relation_type(cls) -> Type[SnowflakeRelationType]:
return SnowflakeRelationType

@classmethod
def from_config(cls, config: RelationConfig) -> RelationConfigBase:
relation_type: str = config.config.materialized

if relation_config := cls.relation_configs.get(relation_type):
return relation_config.from_relation_config(config)

raise DbtRuntimeError(
f"from_config() is not supported for the provided relation type: {relation_type}"
)

@classmethod
def dynamic_table_config_changeset(
cls, relation_results: RelationResults, relation_config: RelationConfig
Expand All @@ -77,6 +97,12 @@ def dynamic_table_config_changeset(
)
)

if new_dynamic_table.refresh_mode != existing_dynamic_table.refresh_mode:
config_change_collection.refresh_mode = SnowflakeDynamicTableRefreshModeConfigChange(
action=RelationConfigChangeAction.create,
context=new_dynamic_table.refresh_mode,
)

if config_change_collection.has_changes:
return config_change_collection
return None
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
Expand Down
48 changes: 47 additions & 1 deletion dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase

if TYPE_CHECKING:
import agate


class RefreshMode(StrEnum):
AUTO = "AUTO"
FULL = "FULL"
INCREMENTAL = "INCREMENTAL"

@classmethod
def default(cls) -> Self:
return cls("AUTO")


class Initialize(StrEnum):
ON_CREATE = "ON_CREATE"
ON_SCHEDULE = "ON_SCHEDULE"

@classmethod
def default(cls) -> Self:
return cls("ON_CREATE")


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
Expand All @@ -22,6 +43,8 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
- query: the query behind the table
- target_lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- snowflake_warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table
- refresh_mode: specifies the refresh type for the dynamic table
- initialize: specifies the behavior of the initial refresh of the dynamic table

There are currently no non-configurable parameters.
"""
Expand All @@ -32,6 +55,8 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
Expand All @@ -44,6 +69,8 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict)
Expand All @@ -60,6 +87,12 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
config_dict.update(refresh_mode=refresh_mode.upper())

if initialize := relation_config.config.extra.get("initialize"):
config_dict.update(initialize=initialize.upper())

return config_dict

@classmethod
Expand All @@ -73,6 +106,8 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
"refresh_mode": dynamic_table.get("refresh_mode"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}

return config_dict
Expand All @@ -96,10 +131,20 @@ def requires_full_refresh(self) -> bool:
return False


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableRefreshModeConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return True


@dataclass
class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
Expand All @@ -111,9 +156,10 @@ def requires_full_refresh(self) -> bool:
if self.snowflake_warehouse
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.snowflake_warehouse])
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode])
12 changes: 10 additions & 2 deletions dbt/include/snowflake/macros/relations/dynamic_table/create.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
{% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%}

{%- set dynamic_table = relation.from_config(config.model) -%}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the RelationConfig allows us to easily incorporate input validation for refresh_mode and initialize. If we're doing that, we should pull all attribution from this class, hence updating target_lag and snowflake_warehouse.


create dynamic table {{ relation }}
target_lag = '{{ config.get("target_lag") }}'
warehouse = {{ config.get("snowflake_warehouse") }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
as (
{{ sql }}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"database_name",
"text",
"target_lag",
"warehouse"
"warehouse",
"refresh_mode"
from table(result_scan(last_query_id()))
{%- endset %}
{% set _dynamic_table = run_query(_dynamic_table_sql) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
{% macro snowflake__get_replace_dynamic_table_sql(relation, sql) %}
{% macro snowflake__get_replace_dynamic_table_sql(relation, sql) -%}

{%- set dynamic_table = relation.from_config(config.model) -%}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for snowflake__get_create_dyanmic_table_sql.

create or replace dynamic table {{ relation }}
target_lag = '{{ config.get("target_lag") }}'
warehouse = {{ config.get("snowflake_warehouse") }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
as (
{{ sql }}
)
;
{{ snowflake__refresh_dynamic_table(relation) }}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now handled via the initialize parameter. Since initialize defaults to ON_CREATE, this is consistent with the current behavior.

{% endmacro %}
{%- endmacro %}
1 change: 1 addition & 0 deletions tests/functional/adapter/dynamic_table_tests/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MY_SEED,
)
from tests.functional.adapter.dynamic_table_tests.utils import (
query_refresh_mode,
query_relation_type,
query_target_lag,
query_warehouse,
Expand All @@ -28,6 +29,7 @@ class SnowflakeDynamicTableChanges:
def check_start_state(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "2 minutes"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"
assert query_refresh_mode(project, dynamic_table) == "INCREMENTAL"

@staticmethod
def change_config_via_alter(project, dynamic_table):
Expand Down Expand Up @@ -57,13 +59,13 @@ def check_state_alter_change_is_applied_downstream(project, dynamic_table):

@staticmethod
def change_config_via_replace(project, dynamic_table):
# dbt-snowflake does not currently monitor any changes that trigger a full refresh
pass
initial_model = get_model_file(project, dynamic_table)
new_model = initial_model.replace("refresh_mode='INCREMENTAL'", "refresh_mode='FULL'")
set_model_file(project, dynamic_table, new_model)

@staticmethod
def check_state_replace_change_is_applied(project, dynamic_table):
# dbt-snowflake does not currently monitor any changes that trigger a full refresh
pass
assert query_refresh_mode(project, dynamic_table) == "FULL"

@staticmethod
def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]:
Expand Down
5 changes: 5 additions & 0 deletions tests/functional/adapter/dynamic_table_tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]:
return config.get("warehouse")


def query_refresh_mode(project, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(project, dynamic_table)
return config.get("refresh_mode")


def describe_dynamic_table(project, dynamic_table: SnowflakeRelation) -> agate.Row:
with get_connection(project.adapter):
macro_results = project.adapter.execute_macro(
Expand Down
Loading