Skip to content

Commit

Permalink
added on_schema_change possibility
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremynadal33 committed Aug 9, 2024
1 parent 726ae4c commit d7ed9c2
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,26 @@
{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{% endif %}

{%- set unique_key = config.get('unique_key', none) -%}
{% if unique_key is none and file_format == 'hudi' %}
{{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }}
{% endif %}

{%- set partition_by = config.get('partition_by', none) -%}
{%- set custom_location = config.get('custom_location', default='empty') -%}
{%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%}
{%- set table_properties = config.get('table_properties', default='empty') -%}

{% set target_relation = this %}
{%- set existing_relation = load_relation(this) -%}
{% set existing_relation_type = adapter.get_table_type(target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation, '_tmp') %}
{% set is_incremental = 'False' %}
{% set lf_tags_config = config.get('lf_tags_config') %}
{% set lf_grants = config.get('lf_grants') %}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{% call statement() %}
set spark.sql.autoBroadcastJoinThreshold=-1
{% endcall %}
Expand Down Expand Up @@ -54,7 +59,7 @@
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% endif %}
{% elif existing_relation_type == 'view' or should_full_refresh() %}
{{ drop_relation(target_relation) }}
{{ drop_relation(target_relation) }}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
Expand All @@ -74,6 +79,8 @@
{{ glue__create_tmp_table_as(tmp_relation, sql) }}
{% set is_incremental = 'True' %}
{% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}

{%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{% endif %}
{% endif %}

Expand Down

0 comments on commit d7ed9c2

Please sign in to comment.