From fbf2fe8e4452d4000b8accb08ece1c717197f908 Mon Sep 17 00:00:00 2001 From: jnadal Date: Fri, 9 Aug 2024 10:59:28 +0200 Subject: [PATCH] added on_schema_change possibility --- CHANGELOG.md | 3 ++- .../macros/materializations/incremental/incremental.sql | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eddb3010..d8d7eb55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ -## New versio +## New version - Fix session provisioning timeout and delay handling +- Add on_schema_change possibility ## v1.8.1 - Fix typo in README.md diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index 7c0da215..50c43380 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -11,21 +11,26 @@ {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} {% endif %} + {%- set unique_key = config.get('unique_key', none) -%} {% if unique_key is none and file_format == 'hudi' %} {{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }} {% endif %} + {%- set partition_by = config.get('partition_by', none) -%} {%- set custom_location = config.get('custom_location', default='empty') -%} {%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%} {%- set table_properties = config.get('table_properties', default='empty') -%} {% set target_relation = this %} + {%- set existing_relation = load_relation(this) -%} {% set existing_relation_type = adapter.get_table_type(target_relation) %} {% set tmp_relation = make_temp_relation(target_relation, '_tmp') %} {% set is_incremental = 'False' %} {% set lf_tags_config = config.get('lf_tags_config') %} {% set lf_grants = config.get('lf_grants') %} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} + {% call statement() %} set spark.sql.autoBroadcastJoinThreshold=-1 {% endcall %} @@ -54,7 +59,7 @@ {% set build_sql = create_table_as(False, target_relation, sql) %} {% endif %} {% elif existing_relation_type == 'view' or should_full_refresh() %} - {{ drop_relation(target_relation) }} + {{ drop_relation(target_relation) }} {% if file_format == 'delta' %} {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} @@ -74,6 +79,8 @@ {{ glue__create_tmp_table_as(tmp_relation, sql) }} {% set is_incremental = 'True' %} {% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} + + {%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} {% endif %} {% endif %}