Skip to content

Commit

Permalink
add concrete catalog integration config
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Jan 6, 2025
1 parent 66d0588 commit e305778
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 34 deletions.
84 changes: 56 additions & 28 deletions dbt/adapters/snowflake/catalog.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,57 @@
from typing import Dict, Optional

import textwrap

from dbt.adapters.base import BaseRelation
from dbt.adapters.base.catalog import ExternalCatalogIntegration


class SnowflakeExternalCatalogIntegration(ExternalCatalogIntegration):

def relation_exists(self, relation: BaseRelation) -> bool:
response, result = self._connection_manager.execute(f"DESCRIBE ICEBERG TABLE {relation.render()}")
if result and result.rows:
return True
return False

def _exists(self) -> bool:
if not self._exists:
response, result = self._connection_manager.execute(
f"DESCRIBE CATALOG INTEGRATION {self.external_catalog.name}")
if result and result.rows:
self._exists = True
else:
self._exists = False
return self._exists

def refresh_relation(self, relation: BaseRelation) -> None:
self._connection_manager.execute(f"ALTER ICEBERG TABLE {relation.render()} REFRESH")

def create_relation(self, relation: BaseRelation) -> None:
self._connection_manager.execute(f"CREATE ICEBERG TABLE {relation.render()}"
f"EXTERNAL_VOLUME '{self.external_catalog.configuration.external_volume.name}'"
f"CATALOG='{self.external_catalog.name}'")
from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig


class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.managed

def render_ddl_predicates(self, relation: RelationConfig) -> str:
"""
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
:param relation:
:return:
"""
base_location: str = f"_dbt/{relation.schema}/{relation.name}"

if sub_path := relation.config.get("base_location_subpath"):
base_location += f"/{sub_path}"

iceberg_ddl_predicates: str = f"""
external_volume = '{self.external_volume}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)


class SnowflakeGlueCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.glue
auto_refresh: str = "FALSE"
replace_invalid_characters: str = "FALSE"

def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None:
if adapter_configs:
if "auto_refresh" in adapter_configs:
self.auto_refresh = adapter_configs["auto_refresh"]
if "replace_invalid_characters" in adapter_configs:
self.replace_invalid_characters = adapter_configs["replace_invalid_characters"]

def render_ddl_predicates(self, relation: BaseRelation) -> str:
ddl_predicate = f"""create or replace iceberg table {relation.render()}
external_volume = '{self.external_volume}
catalog = '{self.name}'
"""
if self.namespace:
ddl_predicate += "CATALOG_NAMESPACE = '{self.namespace}'"
if self.auto_refresh:
ddl_predicate += f"REPLACE_INVALID_CHARACTERS = {self.auto_refresh}"
if self.replace_invalid_characters:
ddl_predicate += f"AUTO_REFRESH = {self.replace_invalid_characters}"
return ddl_predicate
9 changes: 9 additions & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.meta import available
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.contracts.catalog import CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.snowflake.catalog import (
SnowflakeManagedIcebergCatalogIntegration,
SnowflakeGlueCatalogIntegration,
)
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
Expand Down Expand Up @@ -62,6 +67,10 @@ class SnowflakeAdapter(SQLAdapter):
ConnectionManager = SnowflakeConnectionManager

AdapterSpecificConfigs = SnowflakeConfig
CatalogIntegrations = {
CatalogIntegrationType.iceberg_managed: SnowflakeManagedIcebergCatalogIntegration,
CatalogIntegrationType.glue: SnowflakeGlueCatalogIntegration,
}

CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
Expand Down
38 changes: 33 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from dataclasses import dataclass
from typing import Optional, Dict, Any, TYPE_CHECKING
from typing import Optional, Dict, Any, TYPE_CHECKING, Union

from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self

from dbt.adapters.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)


if TYPE_CHECKING:
import agate

Expand All @@ -37,6 +40,29 @@ def default(cls) -> Self:
return cls("ON_CREATE")


def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Optional[str]:
if isinstance(catalog_info, Dict):
catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info)
else:
catalog_config = SnowflakeCatalogConfig.parse_relation_config(catalog_info) # type: ignore
if catalog_config.table_format != TableFormat.default():
catalog_name = "snowflake_managed"
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_config.name,
table_format=catalog_config.table_format,
catalog_type=CatalogIntegrationType.managed,
external_volume=catalog_config.external_volume,
)
catalogs_client.add_catalog(
SnowflakeManagedIcebergCatalogIntegration(integration_config),
catalog_name=catalog_name,
)
return catalog_name
else:
return None


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
Expand All @@ -60,12 +86,13 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
catalog: Optional[str]
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
catalog = _setup_catalog_integration(config_dict["catalog"])
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
Expand All @@ -75,7 +102,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]),
"catalog": catalog,
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}
Expand All @@ -84,14 +111,15 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
catalog = _setup_catalog_integration(relation_config)
config_dict = {
"name": relation_config.identifier,
"schema_name": relation_config.schema,
"database_name": relation_config.database,
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
"catalog": catalog,
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
{%- set catalog_integration = adapter.get_catalog_integration(model.catalog) -%}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# install latest changes in dbt-core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@catalogs-parsing#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git
Expand Down

0 comments on commit e305778

Please sign in to comment.