diff --git a/sayn/database/__init__.py b/sayn/database/__init__.py index 0bfc6b24..245c33bd 100644 --- a/sayn/database/__init__.py +++ b/sayn/database/__init__.py @@ -637,6 +637,7 @@ def create_table( db=None, select=None, replace=False, + temporary=False, **ddl, ): db_name = db or "" @@ -823,7 +824,12 @@ def merge_query( tmp_db = db create_or_replace = self.create_table( - tmp_table, tmp_schema, tmp_db, select=select, replace=True + tmp_table, + tmp_schema, + tmp_db, + select=select, + replace=True, + temporary=True, ) merge = self.merge_tables( diff --git a/sayn/database/snowflake.py b/sayn/database/snowflake.py index aa34f86a..4a4f664e 100644 --- a/sayn/database/snowflake.py +++ b/sayn/database/snowflake.py @@ -20,7 +20,7 @@ class Snowflake(Database): def feature(self, feature): - return feature in ("TABLE RENAME CHANGES SCHEMA",) + return feature in ("TABLE RENAME CHANGES SCHEMA") def create_engine(self, settings): from snowflake.sqlalchemy import URL @@ -80,3 +80,53 @@ def _load_data_batch(self, table, data, schema, db): temp_file_name=fname, ) ) + + def create_table( + self, + table, + schema=None, + db=None, + select=None, + replace=False, + temporary=False, + **ddl, + ): + db_name = db or "" + schema_name = schema or "" + full_name = fully_qualify(table, schema, db) + if ( + db_name in self._requested_objects + and schema_name in self._requested_objects[db_name] + and table in self._requested_objects[db_name][schema_name] + ): + db_info = self._requested_objects[db_name][schema_name][table] + object_type = db_info.get("type") + table_exists = bool(object_type == "table") + view_exists = bool(object_type == "view") + else: + db_info = dict() + table_exists = True + view_exists = True + + template = self._jinja_env.get_template("snowflake_create_table.sql") + + return template.render( + table_name=table, + full_name=full_name, + view_exists=view_exists, + table_exists=table_exists, + select=select, + replace=True, + temporary=temporary, + can_replace_table=self.feature("CAN REPLACE TABLE"), + needs_cascade=self.feature("NEEDS CASCADE"), + cannot_specify_ddl_select=self.feature("CANNOT SPECIFY DDL IN SELECT"), + all_columns_have_type=len( + [c for c in ddl.get("columns", dict()) if c.get("type") is not None] + ), + **ddl, + ) + + +def fully_qualify(name, schema=None, db=None): + return f"{db+'.' if db is not None else ''}{schema+'.' if schema is not None else ''}{name}" diff --git a/sayn/database/templates/create_table.sql b/sayn/database/templates/create_table.sql index 8509470f..a8d3fd52 100644 --- a/sayn/database/templates/create_table.sql +++ b/sayn/database/templates/create_table.sql @@ -1,14 +1,14 @@ {%- if not replace %} CREATE TABLE IF NOT EXISTS {{ full_name }} {%- elif replace and can_replace_table %} -{%- if table_exists %} + {%- if table_exists %} CREATE OR REPLACE TABLE {{ full_name }} -{%- elif view_exists %} + {%- elif view_exists %} DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}}; CREATE TABLE {{ full_name }} -{%- else %} + {%- else %} CREATE OR REPLACE TABLE {{ full_name }} -{%- endif %} + {%- endif %} {%- elif replace and not can_replace_table %} {% if table_exists %} DROP TABLE IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}}; diff --git a/sayn/database/templates/snowflake_create_table.sql b/sayn/database/templates/snowflake_create_table.sql new file mode 100644 index 00000000..4a2fca6f --- /dev/null +++ b/sayn/database/templates/snowflake_create_table.sql @@ -0,0 +1,83 @@ +{%- if temporary %} +CREATE TEMP TABLE {{ full_name }} +{%- elif not replace %} +CREATE TABLE IF NOT EXISTS {{ full_name }} +{%- elif replace and can_replace_table %} + {%- if table_exists %} +CREATE OR REPLACE TABLE {{ full_name }} + {%- elif view_exists %} +DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}}; +CREATE TABLE {{ full_name }} + {%- else %} +CREATE OR REPLACE TABLE {{ full_name }} + {%- endif %} +{%- elif replace and not can_replace_table %} + {% if table_exists %} +DROP TABLE IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}}; + {% elif view_exists %} +DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}}; + {% endif %} + +CREATE TABLE {{ full_name }} +{%- endif %} + +{%- if select is undefined or select is none %} +{%- if columns is defined and columns|length > 0 and all_columns_have_type %} +( + +{%- for col_def in columns %} + {{ col_def['name'] }} {{ col_def['type'] }} + {{- ' UNIQUE' if col_def.get('unique')}} + {{- ' NOT NULL' if col_def.get('not_null')}} + {{- ',' if not loop.last else ''}} +{%- endfor %} +) +{%- endif %} +{%- endif %} + +{%- block table_attributes %} +{%- if partition is defined and partition is not none %} +PARTITION BY {{ partition }} +{% endif %} + +{%- if cluster is defined and cluster is not none %} +CLUSTER BY {{ cluster|join(', ') }} +{% endif %} + +{%- if distribution is defined and distribution is not none %} +DISTSTYLE {{ distribution['type'] }} +{% if distribution['type'] == 'KEY' %}DISTKEY({{ distribution['column'] }}){% endif %} +{% endif %} + +{%- if sorting is defined and sorting is not none %} +{{ sorting['type']+' ' if sorting['type'] else '' }}SORTKEY({{ sorting['columns']|join(', ') }}) +{% endif %} +{% endblock -%} + +{%- if temporary %} +DATA_RETENTION_TIME_IN_DAYS = 0 +{% endif -%} + +{%- if select is defined and select is not none %} + +AS +{{ select }} + +{% endif -%} +; + +{% block indexes %} +{% if indexes is defined %} + {% for name, idx_def in indexes.items() %} +CREATE INDEX {{ table_name }}_{{ name }} ON {{ full_name }}({{ ', '.join(idx_def['columns']) }}); + {% endfor %} +{% endif %} +{% endblock %} + +{% block permissions %} +{% if permissions is defined %} + {% for role, priv in permissions.items() %} +GRANT {{ priv }} ON {{ full_name }} TO {{ role }}; + {% endfor %} +{% endif %} +{% endblock %} diff --git a/sayn/database/templates/snowflake_load_batch.sql b/sayn/database/templates/snowflake_load_batch.sql index 53bd92e1..b8850b32 100644 --- a/sayn/database/templates/snowflake_load_batch.sql +++ b/sayn/database/templates/snowflake_load_batch.sql @@ -1,7 +1,7 @@ {% set file_format = full_table_name + '_csv_format' %} {% set stage = full_table_name + '_csv_stage' %} -CREATE OR REPLACE FILE FORMAT {{ file_format }} +CREATE OR REPLACE TEMP FILE FORMAT {{ file_format }} TYPE = 'CSV' FIELD_DELIMITER = '\t' SKIP_HEADER = 1 @@ -11,7 +11,7 @@ CREATE OR REPLACE FILE FORMAT {{ file_format }} ESCAPE_UNENCLOSED_FIELD = '\\' ; -CREATE OR REPLACE stage {{ stage }} +CREATE OR REPLACE TEMP stage {{ stage }} file_format = {{ file_format }}; PUT file://{{ temp_file_directory }}/{{ temp_file_name }} @{{ stage }} auto_compress=true; diff --git a/sayn/tasks/copy.py b/sayn/tasks/copy.py index 20d3da22..0d007b23 100644 --- a/sayn/tasks/copy.py +++ b/sayn/tasks/copy.py @@ -432,12 +432,15 @@ def execute(self, execute, debug, is_full_load, limit=None): load_schema = self.tmp_schema if is_full_load or self.mode == "full": steps.append("Move Table") + is_temporary = False else: steps.append("Merge Tables") + is_temporary = True else: load_db = self.database load_table = self.table load_schema = self.schema + is_temporary = False self.set_run_steps(steps) @@ -458,7 +461,12 @@ def execute(self, execute, debug, is_full_load, limit=None): create_ddl["columns"] = [c for c in self.columns["columns"]] query = self.target_db.create_table( - load_table, schema=load_schema, db=load_db, replace=True, **create_ddl + load_table, + schema=load_schema, + db=load_db, + replace=True, + temporary=is_temporary, + **create_ddl, ) if debug: self.write_compilation_output(query, "create_table")