From 4e2616664a1011c65e164b819409596235d0ddc3 Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Mon, 30 Oct 2023 15:22:44 +0100 Subject: [PATCH 1/8] first commit --- .github/PULL_REQUEST_TEMPLATE.md | 25 ++++++++++++++++--- dbt/include/glue/macros/adapters.sql | 3 ++- .../incremental/incremental.sql | 23 ++++++++--------- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index bd489d07..5a70b9b0 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,4 +1,4 @@ -resolves # +resolves #247 +Fixing [full refresh with iceberg issue](https://github.com/aws-samples/dbt-glue/issues/247). + +In the `incremental.sql` file, it was first checking if the relation was iceberg, instead of checking for full refresh. +See [athena connector implementation](https://github.com/dbt-athena/dbt-athena/blob/main/dbt/include/athena/macros/materializations/models/incremental/incremental.sql) for reference. + +We fix it by modifying the order of the condition, we first check if the user has used the full refresh flag (i.e. `-f` or `--full-refresh`). + +To check if the user used the flag, we use the dbt core macro [should_full_refresh()](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/include/global_project/macros/materializations/configs.sql#L6) instead of the currently implemented *full_refresh_mode* variable. Since we don't need the *full_refresh_mode* variable anymore, we remove this variable from the code. + +Once the full refresh condition is True, we check if it is an iceberg materialization. If yes, we first delete the table using `glue__drop_relation` macro in adapters.sql. +For this macro to work, we slightly modify this macro with a special bracket character to avoid an `analysisexception: spark_catalog requires a single-part namespace` error. +Finally, we write the table with the `iceberg_write` function, that will re-create the table from scratch since it does not exist anymore. + +TLDR: +- We modify the order of condition by first checking if full refresh is enabled instead of if iceberg. +- We check if full refresh is enabled with a dbt-core macro instead of custom implementation. +- If full refresh and iceberg, then we delete the table. +- We re-write the table using iceberg write. ### Checklist -- [ ] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) -- [ ] I have run this code in development and it appears to resolve the stated issue +- [X] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) +- [X] I have run this code in development and it appears to resolve the stated issue - [ ] This PR includes tests, or tests are not required/relevant for this PR - [ ] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-glue next" section. diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index 380eb241..20a1d8de 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -31,13 +31,14 @@ drop {{ rel_type }} if exists {{ relation }} {%- elif rel_type is not none and rel_type == 'iceberg_table' %} {%- set default_catalog = 'glue_catalog' -%} - drop table if exists {{ default_catalog }}.{{ relation }} + drop table if exists `{{default_catalog}}.{{ relation.schema }}`.`{{ relation.table }}` {%- else -%} drop table if exists {{ relation }} {%- endif %} {%- endcall %} {% endmacro %} + {% macro glue__make_temp_relation(base_relation, suffix) %} {% set tmp_identifier = base_relation.identifier ~ suffix %} {% set tmp_relation = base_relation.incorporate(path={"schema": base_relation.schema, "identifier": tmp_identifier}) -%} diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index 5c1c3e2e..1f20c4d9 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -20,10 +20,6 @@ {%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%} {%- set table_properties = config.get('table_properties', default='empty') -%} - - {%- set full_refresh_config = config.get('full_refresh', default=False) -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == 'True' or full_refresh_config == 'True') -%} - {% set target_relation = this %} {% set existing_relation_type = adapter.get_table_type(target_relation) %} {% set tmp_relation = make_temp_relation(target_relation, '_tmp') %} @@ -41,12 +37,6 @@ {%- set hudi_options = config.get('hudi_options', default={}) -%} {{ adapter.hudi_merge_table(target_relation, sql, unique_key, partition_by, custom_location, hudi_options, substitute_variables) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 "%} - {% elif file_format == 'iceberg' %} - {{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }} - {% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%} - {%- if expire_snapshots == 'True' -%} - {%- set result = adapter.iceberg_expire_snapshots(target_relation) -%} - {%- endif -%} {% else %} {% if strategy == 'insert_overwrite' and partition_by %} {% call statement() %} @@ -60,14 +50,23 @@ {% else %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% endif %} - {% elif existing_relation_type == 'view' or full_refresh_mode %} - {{ drop_relation(target_relation) }} + {% elif existing_relation_type == 'view' or should_full_refresh() %} + {{ glue__drop_relation(target_relation) }} {% if file_format == 'delta' %} {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} + {% elif file_format == 'iceberg' %} + {{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }} + {% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%} {% else %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% endif %} + {% elif file_format == 'iceberg' %} + {{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }} + {% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%} + {%- if expire_snapshots == 'True' -%} + {%- set result = adapter.iceberg_expire_snapshots(target_relation) -%} + {%- endif -%} {% else %} {{ glue__create_tmp_table_as(tmp_relation, sql) }} {% set is_incremental = 'True' %} From caeca5e836b9b0a17d78960af3e23b146872f06f Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Mon, 30 Oct 2023 15:35:10 +0100 Subject: [PATCH 2/8] revert pr template --- .github/PULL_REQUEST_TEMPLATE.md | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 5a70b9b0..6eff0707 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,4 +1,4 @@ -resolves #247 +resolves # ### Checklist -- [X] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) -- [X] I have run this code in development and it appears to resolve the stated issue +- [ ] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) +- [ ] I have run this code in development and it appears to resolve the stated issue - [ ] This PR includes tests, or tests are not required/relevant for this PR - [ ] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-glue next" section. -By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. +By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. \ No newline at end of file From 80fa1d756306cb900c29ffa5809043e8f2ff4859 Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Mon, 30 Oct 2023 17:51:51 +0100 Subject: [PATCH 3/8] fix when existing_relation_type is none --- .../glue/macros/materializations/incremental/incremental.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index 1f20c4d9..cc9b0093 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -47,6 +47,9 @@ {% if file_format == 'delta' %} {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} + {% elif file_format == 'iceberg' %} + {{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }} + {% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%} {% else %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% endif %} From 7f401c7f3b2765a9fb715c3888d345daf1e42799 Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Wed, 8 Nov 2023 18:24:50 +0100 Subject: [PATCH 4/8] now correctly delete the table --- dbt/include/glue/macros/adapters.sql | 2 +- .../glue/macros/materializations/incremental/incremental.sql | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index 20a1d8de..fe6351d3 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -31,7 +31,7 @@ drop {{ rel_type }} if exists {{ relation }} {%- elif rel_type is not none and rel_type == 'iceberg_table' %} {%- set default_catalog = 'glue_catalog' -%} - drop table if exists `{{default_catalog}}.{{ relation.schema }}`.`{{ relation.table }}` + drop table if exists {{default_catalog}}.{{ relation.schema }}.{{ relation.table }} {%- else -%} drop table if exists {{ relation }} {%- endif %} diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index cc9b0093..4e0dbf38 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -55,6 +55,7 @@ {% endif %} {% elif existing_relation_type == 'view' or should_full_refresh() %} {{ glue__drop_relation(target_relation) }} + {% do glue__drop_relation(existing_relation) %} {% if file_format == 'delta' %} {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} From 7295153c0c1786a1cf8e1cf75b05adcf31605f10 Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Wed, 8 Nov 2023 18:30:42 +0100 Subject: [PATCH 5/8] cleaner --- dbt/include/glue/macros/adapters.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index fe6351d3..ec12eaa6 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -31,7 +31,7 @@ drop {{ rel_type }} if exists {{ relation }} {%- elif rel_type is not none and rel_type == 'iceberg_table' %} {%- set default_catalog = 'glue_catalog' -%} - drop table if exists {{default_catalog}}.{{ relation.schema }}.{{ relation.table }} + drop table if exists {{ default_catalog }}.{{ relation }} {%- else -%} drop table if exists {{ relation }} {%- endif %} From 25b6bfee5bce0671d6f6edf8f0b5d77147344c31 Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Mon, 13 Nov 2023 12:09:29 +0100 Subject: [PATCH 6/8] beauty --- dbt/include/glue/macros/adapters.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index ec12eaa6..380eb241 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -38,7 +38,6 @@ {%- endcall %} {% endmacro %} - {% macro glue__make_temp_relation(base_relation, suffix) %} {% set tmp_identifier = base_relation.identifier ~ suffix %} {% set tmp_relation = base_relation.incorporate(path={"schema": base_relation.schema, "identifier": tmp_identifier}) -%} From 4d12ac1ef7e58fcc91e60e010b60f5ff78348b6f Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Mon, 13 Nov 2023 12:11:31 +0100 Subject: [PATCH 7/8] duplicate line of code --- .../glue/macros/materializations/incremental/incremental.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index 4e0dbf38..cc9b0093 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -55,7 +55,6 @@ {% endif %} {% elif existing_relation_type == 'view' or should_full_refresh() %} {{ glue__drop_relation(target_relation) }} - {% do glue__drop_relation(existing_relation) %} {% if file_format == 'delta' %} {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} From 7c9c41299c9ab41ba9e17b7de7a6c7a564cc7261 Mon Sep 17 00:00:00 2001 From: "Vincent.PAUWELS" Date: Mon, 13 Nov 2023 14:05:33 +0100 Subject: [PATCH 8/8] use drop_relation instead of glue__drop_relation --- .../glue/macros/materializations/incremental/incremental.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index cc9b0093..7c0da215 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -54,7 +54,7 @@ {% set build_sql = create_table_as(False, target_relation, sql) %} {% endif %} {% elif existing_relation_type == 'view' or should_full_refresh() %} - {{ glue__drop_relation(target_relation) }} + {{ drop_relation(target_relation) }} {% if file_format == 'delta' %} {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}