Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Use Snowflake Temporary tables where possible #249

Merged
merged 5 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sayn/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ def create_table(
db=None,
select=None,
replace=False,
temporary=False,
**ddl,
):
db_name = db or ""
Expand Down Expand Up @@ -823,7 +824,12 @@ def merge_query(
tmp_db = db

create_or_replace = self.create_table(
tmp_table, tmp_schema, tmp_db, select=select, replace=True
tmp_table,
tmp_schema,
tmp_db,
select=select,
replace=True,
temporary=True,
)

merge = self.merge_tables(
Expand Down
52 changes: 51 additions & 1 deletion sayn/database/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

class Snowflake(Database):
def feature(self, feature):
return feature in ("TABLE RENAME CHANGES SCHEMA",)
return feature in ("TABLE RENAME CHANGES SCHEMA")

def create_engine(self, settings):
from snowflake.sqlalchemy import URL
Expand Down Expand Up @@ -80,3 +80,53 @@ def _load_data_batch(self, table, data, schema, db):
temp_file_name=fname,
)
)

def create_table(
self,
table,
schema=None,
db=None,
select=None,
replace=False,
temporary=False,
**ddl,
):
db_name = db or ""
schema_name = schema or ""
full_name = fully_qualify(table, schema, db)
if (
db_name in self._requested_objects
and schema_name in self._requested_objects[db_name]
and table in self._requested_objects[db_name][schema_name]
):
db_info = self._requested_objects[db_name][schema_name][table]
object_type = db_info.get("type")
table_exists = bool(object_type == "table")
view_exists = bool(object_type == "view")
else:
db_info = dict()
table_exists = True
view_exists = True

template = self._jinja_env.get_template("snowflake_create_table.sql")

return template.render(
table_name=table,
full_name=full_name,
view_exists=view_exists,
table_exists=table_exists,
select=select,
replace=True,
temporary=temporary,
can_replace_table=self.feature("CAN REPLACE TABLE"),
needs_cascade=self.feature("NEEDS CASCADE"),
cannot_specify_ddl_select=self.feature("CANNOT SPECIFY DDL IN SELECT"),
all_columns_have_type=len(
[c for c in ddl.get("columns", dict()) if c.get("type") is not None]
),
**ddl,
)


def fully_qualify(name, schema=None, db=None):
return f"{db+'.' if db is not None else ''}{schema+'.' if schema is not None else ''}{name}"
8 changes: 4 additions & 4 deletions sayn/database/templates/create_table.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{%- if not replace %}
CREATE TABLE IF NOT EXISTS {{ full_name }}
{%- elif replace and can_replace_table %}
{%- if table_exists %}
{%- if table_exists %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- elif view_exists %}
{%- elif view_exists %}
DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
CREATE TABLE {{ full_name }}
{%- else %}
{%- else %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- endif %}
{%- endif %}
{%- elif replace and not can_replace_table %}
{% if table_exists %}
DROP TABLE IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
Expand Down
83 changes: 83 additions & 0 deletions sayn/database/templates/snowflake_create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{%- if temporary %}
CREATE TEMP TABLE {{ full_name }}
{%- elif not replace %}
CREATE TABLE IF NOT EXISTS {{ full_name }}
{%- elif replace and can_replace_table %}
{%- if table_exists %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- elif view_exists %}
DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
CREATE TABLE {{ full_name }}
{%- else %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- endif %}
{%- elif replace and not can_replace_table %}
{% if table_exists %}
DROP TABLE IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
{% elif view_exists %}
DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
{% endif %}

CREATE TABLE {{ full_name }}
{%- endif %}

{%- if select is undefined or select is none %}
{%- if columns is defined and columns|length > 0 and all_columns_have_type %}
(

{%- for col_def in columns %}
{{ col_def['name'] }} {{ col_def['type'] }}
{{- ' UNIQUE' if col_def.get('unique')}}
{{- ' NOT NULL' if col_def.get('not_null')}}
{{- ',' if not loop.last else ''}}
{%- endfor %}
)
{%- endif %}
{%- endif %}

{%- block table_attributes %}
{%- if partition is defined and partition is not none %}
PARTITION BY {{ partition }}
{% endif %}

{%- if cluster is defined and cluster is not none %}
CLUSTER BY {{ cluster|join(', ') }}
{% endif %}

{%- if distribution is defined and distribution is not none %}
DISTSTYLE {{ distribution['type'] }}
{% if distribution['type'] == 'KEY' %}DISTKEY({{ distribution['column'] }}){% endif %}
{% endif %}

{%- if sorting is defined and sorting is not none %}
{{ sorting['type']+' ' if sorting['type'] else '' }}SORTKEY({{ sorting['columns']|join(', ') }})
{% endif %}
{% endblock -%}

{%- if temporary %}
DATA_RETENTION_TIME_IN_DAYS = 0
{% endif -%}

{%- if select is defined and select is not none %}

AS
{{ select }}

{% endif -%}
;

{% block indexes %}
{% if indexes is defined %}
{% for name, idx_def in indexes.items() %}
CREATE INDEX {{ table_name }}_{{ name }} ON {{ full_name }}({{ ', '.join(idx_def['columns']) }});
{% endfor %}
{% endif %}
{% endblock %}

{% block permissions %}
{% if permissions is defined %}
{% for role, priv in permissions.items() %}
GRANT {{ priv }} ON {{ full_name }} TO {{ role }};
{% endfor %}
{% endif %}
{% endblock %}
4 changes: 2 additions & 2 deletions sayn/database/templates/snowflake_load_batch.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% set file_format = full_table_name + '_csv_format' %}
{% set stage = full_table_name + '_csv_stage' %}

CREATE OR REPLACE FILE FORMAT {{ file_format }}
CREATE OR REPLACE TEMP FILE FORMAT {{ file_format }}
TYPE = 'CSV'
FIELD_DELIMITER = '\t'
SKIP_HEADER = 1
Expand All @@ -11,7 +11,7 @@ CREATE OR REPLACE FILE FORMAT {{ file_format }}
ESCAPE_UNENCLOSED_FIELD = '\\'
;

CREATE OR REPLACE stage {{ stage }}
CREATE OR REPLACE TEMP stage {{ stage }}
file_format = {{ file_format }};

PUT file://{{ temp_file_directory }}/{{ temp_file_name }} @{{ stage }} auto_compress=true;
Expand Down
10 changes: 9 additions & 1 deletion sayn/tasks/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,15 @@ def execute(self, execute, debug, is_full_load, limit=None):
load_schema = self.tmp_schema
if is_full_load or self.mode == "full":
steps.append("Move Table")
is_temporary = False
else:
steps.append("Merge Tables")
is_temporary = True
else:
load_db = self.database
load_table = self.table
load_schema = self.schema
is_temporary = False

self.set_run_steps(steps)

Expand All @@ -458,7 +461,12 @@ def execute(self, execute, debug, is_full_load, limit=None):
create_ddl["columns"] = [c for c in self.columns["columns"]]

query = self.target_db.create_table(
load_table, schema=load_schema, db=load_db, replace=True, **create_ddl
load_table,
schema=load_schema,
db=load_db,
replace=True,
temporary=is_temporary,
**create_ddl,
)
if debug:
self.write_compilation_output(query, "create_table")
Expand Down
Loading