Skip to content

Commit

Permalink
Adding minimal write option for Delta format
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremynadal33 committed Aug 6, 2024
1 parent 726ae4c commit 829de45
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## New versio
## New version
- Fix session provisioning timeout and delay handling
- Add write options for Delta format

## v1.8.1
- Fix typo in README.md
Expand Down
9 changes: 7 additions & 2 deletions dbt/adapters/glue/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ def delta_update_manifest(self, target_relation, custom_location, partition_by):
self._update_additional_location(target_relation, location)

@available
def delta_create_table(self, target_relation, request, primary_key, partition_key, custom_location):
def delta_create_table(self, target_relation, request, primary_key, partition_key, custom_location, delta_create_table_write_options=None):
session, client = self.get_connection()
logger.debug(request)

Expand All @@ -636,6 +636,11 @@ def delta_create_table(self, target_relation, request, primary_key, partition_ke
location = f"{session.credentials.location}/{target_relation.schema}/{target_relation.name}"
else:
location = custom_location

options_string = ""
if delta_create_table_write_options:
for key, value in delta_create_table_write_options.items():
options_string += f'.option("{key}", "{value}")'

create_table_query = f"""
CREATE TABLE {table_name}
Expand All @@ -647,7 +652,7 @@ def delta_create_table(self, target_relation, request, primary_key, partition_ke
custom_glue_code_for_dbt_adapter
spark.sql("""
{request}
""").write.format("delta").mode("overwrite")'''
""").write.format("delta").mode("overwrite"){options_string}'''

write_data_footer = f'''.save("{location}")
SqlWrapper2.execute("""select 1""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
{%- set custom_location = config.get('custom_location', default='empty') -%}
{%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%}
{%- set table_properties = config.get('table_properties', default='empty') -%}
{%- set delta_create_table_write_options = config.get('write_options', default={}) -%}

{% set target_relation = this %}
{% set existing_relation_type = adapter.get_table_type(target_relation) %}
Expand All @@ -45,7 +46,7 @@
{% endif %}
{% if existing_relation_type is none %}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
Expand All @@ -56,7 +57,7 @@
{% elif existing_relation_type == 'view' or should_full_refresh() %}
{{ drop_relation(target_relation) }}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
Expand Down

0 comments on commit 829de45

Please sign in to comment.