From 28fdcf7563cc28f7d089ab426c160c24f5305bb1 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Wed, 24 Jan 2024 15:56:47 +0900 Subject: [PATCH] Revert several commits to fix the unit and integration tests (#319) * Revert "Bump black from 23.11.0 to 23.12.0 (#303)" This reverts commit ee2aa88663fd51ffc1ad1a1dbe04321206453386. * Revert "Update versions" This reverts commit f6a377e9f67ef26ca29ae4138638cd5b7062dad4. * Revert "Adds minimal model contract enforcement for glue adapter (#297)" This reverts commit 419234625aa377e9f16d2ff40fb2aeb9c92ad83f. * Revert "Fix wrong role create db#285 (#286)" This reverts commit 1416de86faea5b61ebd2b5ac16f862d8ec39f32e. * Revert "Glue session: enable users to fix their glue_session_id name and re-use it (#301)" This reverts commit d2d31f109c292efdeba2c1b588cd1cbc60f32a71. * Revert "Revert "Update versions"" This reverts commit 2b5e65614ebc1e7e6d44ac1a1fe458d299cc0f25. * Revert "Revert "Bump black from 23.11.0 to 23.12.0 (#303)"" This reverts commit 9a2b09419c3dea549c8a58c569623b0b2e4bae90. * Update CHANGELOG --- .gitignore | 1 - CHANGELOG.md | 3 + README.md | 4 +- dbt/adapters/glue/column.py | 16 --- dbt/adapters/glue/connections.py | 11 +- dbt/adapters/glue/credentials.py | 2 +- dbt/adapters/glue/gluedbapi/connection.py | 136 ++++++++++------------ dbt/adapters/glue/gluedbapi/cursor.py | 1 - dbt/adapters/glue/impl.py | 17 ++- dbt/include/glue/macros/adapters.sql | 11 -- 10 files changed, 76 insertions(+), 126 deletions(-) delete mode 100644 dbt/adapters/glue/column.py diff --git a/.gitignore b/.gitignore index ed252bee..d8c22280 100644 --- a/.gitignore +++ b/.gitignore @@ -26,7 +26,6 @@ var/ *.egg *.mypy_cache/ logs/ -.venv # PyInstaller # Usually these files are written by a python script from a template diff --git a/CHANGELOG.md b/CHANGELOG.md index 72c320a8..2a2d4dfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## next version +- Revert "Adds limited model contract enforcement" +- Revert "glue_session_id is automatically created or re-used when user provides it" +- Revert "Fix wrong role create db" ## v1.7.1 - Remove unnecessary parameter for Delta Lake from readme diff --git a/README.md b/README.md index 9169b046..be733eec 100644 --- a/README.md +++ b/README.md @@ -241,8 +241,8 @@ The table below describes all the options. | seed_format | By default `parquet`, can be Spark format compatible like `csv` or `json` | no | | seed_mode | By default `overwrite`, the seed data will be overwritten, you can set it to `append` if you just want to add new data in your dataset | no | | default_arguments | The map of key value pairs parameters belonging to the session. More information on [Job parameters used by AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html). Ex: `--enable-continuous-cloudwatch-log=true,--enable-continuous-log-filter=true` | no | -| glue_session_id | re-use a glue-session to run multiple dbt run commands. Will create a new glue-session using glue_session_id if it does not exists yet. | no | -| glue_session_reuse | re-use the glue-session to run multiple dbt run commands: If set to true, the glue session will not be closed for re-use. If set to false, the session will be closed. The glue session will close after idle_timeout time is expired after idle_timeout time | no | +| glue_session_id | re-use the glue-session to run multiple dbt run commands: set a glue session id you need to use | no | +| glue_session_reuse | re-use the glue-session to run multiple dbt run commands: If set to true, the glue session will not be closed for re-use. If set to false, the session will be closed | no | | datalake_formats | The ACID datalake format that you want to use if you are doing merge, can be `hudi`, `ìceberg` or `delta` |no| ## Configs diff --git a/dbt/adapters/glue/column.py b/dbt/adapters/glue/column.py deleted file mode 100644 index 222daea6..00000000 --- a/dbt/adapters/glue/column.py +++ /dev/null @@ -1,16 +0,0 @@ -from dataclasses import dataclass -from typing import ClassVar, Dict - -from dbt.adapters.base.column import Column - - -@dataclass -class GlueColumn(Column): - # Overwriting dafult string types to support glue - # TODO: Convert to supported glue types as needed - # Please ref: https://github.com/dbt-athena/dbt-athena/blob/main/dbt/adapters/athena/column.py - TYPE_LABELS: ClassVar[Dict[str, str]] = { - "STRING": "STRING", - "TEXT": "STRING", - "VARCHAR": "STRING" - } diff --git a/dbt/adapters/glue/connections.py b/dbt/adapters/glue/connections.py index 57867ead..1399a509 100644 --- a/dbt/adapters/glue/connections.py +++ b/dbt/adapters/glue/connections.py @@ -29,15 +29,6 @@ class GlueConnectionManager(SQLConnectionManager): TYPE = "glue" GLUE_CONNECTIONS_BY_KEY: Dict[str, GlueConnection] = {} - @classmethod - def data_type_code_to_name(cls, type_code: str) -> str: - """ - Get the string representation of the data type from the metadata. Dbt performs a - query to retrieve the types of the columns in the SQL query. Then these types are compared - to the types in the contract config, simplified because they need to match what is returned - by metadata (we are only interested in the broader type, without subtypes nor granularity). - """ - return type_code.split("(")[0].split("<")[0].upper() @classmethod def open(cls, connection): @@ -112,7 +103,7 @@ def get_result_from_cursor(cls, cursor: GlueCursor, limit: Optional[int]) -> aga data: List[Any] = [] column_names: List[str] = [] if cursor.description is not None: - column_names = [col[0] for col in cursor.description] + column_names = [col[0] for col in cursor.description()] if limit: rows = cursor.fetchmany(limit) else: diff --git a/dbt/adapters/glue/credentials.py b/dbt/adapters/glue/credentials.py index 0ada1166..fb55e92e 100644 --- a/dbt/adapters/glue/credentials.py +++ b/dbt/adapters/glue/credentials.py @@ -29,7 +29,7 @@ class GlueCredentials(Credentials): seed_mode: Optional[str] = "overwrite" default_arguments: Optional[str] = None iceberg_glue_commit_lock_table: Optional[str] = "myGlueLockTable" - use_interactive_session_role_for_api_calls: bool = True + use_interactive_session_role_for_api_calls: bool = False lf_tags: Optional[str] = None glue_session_id: Optional[str] = None glue_session_reuse: Optional[bool] = False diff --git a/dbt/adapters/glue/gluedbapi/connection.py b/dbt/adapters/glue/gluedbapi/connection.py index 607df201..7b904d8a 100644 --- a/dbt/adapters/glue/gluedbapi/connection.py +++ b/dbt/adapters/glue/gluedbapi/connection.py @@ -70,103 +70,89 @@ def _connect(self): return self.session_id - def _create_session(self): - args = { + def _start_session(self): + logger.debug("GlueConnection _start_session called") + + if self.credentials.glue_session_id: + logger.debug(f"The existing session {self.credentials.glue_session_id} is used") + try: + self._session = self.client.get_session( + Id=self.credentials.glue_session_id, + RequestOrigin='string' + ) + except Exception as e: + logger.error( + f"Got an error when attempting to open a GlueSession : {e}" + ) + raise dbterrors.FailedToConnectError(str(e)) + + self._session_create_time = time.time() + else: + args = { "--enable-glue-datacatalog": "true" } - if (self._create_session_config["default_arguments"] is not None): - args.update(self._string_to_dict(self._create_session_config["default_arguments"].replace(' ', ''))) - - if (self._create_session_config["extra_jars"] is not None): - args["--extra-jars"] = f"{self._create_session_config['extra_jars']}" + if (self._create_session_config["default_arguments"] is not None): + args.update(self._string_to_dict(self._create_session_config["default_arguments"].replace(' ', ''))) - if (self._create_session_config["conf"] is not None): - args["--conf"] = f"{self._create_session_config['conf']}" + if (self._create_session_config["extra_jars"] is not None): + args["--extra-jars"] = f"{self._create_session_config['extra_jars']}" - if (self._create_session_config["extra_py_files"] is not None): - args["--extra-py-files"] = f"{self._create_session_config['extra_py_files']}" + if (self._create_session_config["conf"] is not None): + args["--conf"] = f"{self._create_session_config['conf']}" - additional_args = {} - additional_args["NumberOfWorkers"] = self._create_session_config["workers"] - additional_args["WorkerType"] = self._create_session_config["worker_type"] - additional_args["IdleTimeout"] = self._create_session_config["idle_timeout"] - additional_args["Timeout"] = self._create_session_config["query_timeout_in_minutes"] - additional_args["RequestOrigin"] = 'dbt-glue' + if (self._create_session_config["extra_py_files"] is not None): + args["--extra-py-files"] = f"{self._create_session_config['extra_py_files']}" - if (self._create_session_config['glue_version'] is not None): - additional_args["GlueVersion"] = f"{self._create_session_config['glue_version']}" + additional_args = {} + additional_args["NumberOfWorkers"] = self._create_session_config["workers"] + additional_args["WorkerType"] = self._create_session_config["worker_type"] + additional_args["IdleTimeout"] = self._create_session_config["idle_timeout"] + additional_args["Timeout"] = self._create_session_config["query_timeout_in_minutes"] + additional_args["RequestOrigin"] = 'dbt-glue' - if (self._create_session_config['security_configuration'] is not None): - additional_args["SecurityConfiguration"] = f"{self._create_session_config['security_configuration']}" + if (self._create_session_config['glue_version'] is not None): + additional_args["GlueVersion"] = f"{self._create_session_config['glue_version']}" - if (self._create_session_config["connections"] is not None): - additional_args["Connections"] = {"Connections": list(set(self._create_session_config["connections"].split(',')))} + if (self._create_session_config['security_configuration'] is not None): + additional_args["SecurityConfiguration"] = f"{self._create_session_config['security_configuration']}" - if (self._create_session_config["tags"] is not None): - additional_args["Tags"] = self._string_to_dict(self._create_session_config["tags"]) + if (self._create_session_config["connections"] is not None): + additional_args["Connections"] = {"Connections": list(set(self._create_session_config["connections"].split(',')))} - if (self.credentials.datalake_formats is not None): - args["--datalake-formats"] = f"{self.credentials.datalake_formats}" + if (self._create_session_config["tags"] is not None): + additional_args["Tags"] = self._string_to_dict(self._create_session_config["tags"]) + if (self.credentials.datalake_formats is not None): + args["--datalake-formats"] = f"{self.credentials.datalake_formats}" - if self.credentials.glue_session_id: - new_id = self.credentials.glue_session_id - else: session_uuid = uuid.uuid4() session_uuid_str = str(session_uuid) session_prefix = self._create_session_config["role_arn"].partition('/')[2] or self._create_session_config["role_arn"] new_id = f"{session_prefix}-dbt-glue-{session_uuid_str}" - if self._session_id_suffix: - new_id = f"{new_id}-{self._session_id_suffix}" + if self._session_id_suffix: + new_id = f"{new_id}-{self._session_id_suffix}" - try: - logger.debug(f"A new session {new_id} is created") - self._session = self.client.create_session( - Id=new_id, - Role=self._create_session_config["role_arn"], - DefaultArguments=args, - Command={ - "Name": "glueetl", - "PythonVersion": "3" - }, - **additional_args) - except Exception as e: - logger.error( - f"Got an error when attempting to open a GlueSession : {e}" - ) - raise dbterrors.FailedToConnectError(str(e)) - - self._session_create_time = time.time() - - def _start_session(self): - logger.debug("GlueConnection _start_session called") - - if self.credentials.glue_session_id: - logger.debug(f"Fetching session {self.credentials.glue_session_id}") try: - self._session = self.client.get_session( - Id=self.credentials.glue_session_id, - RequestOrigin='string' - ) - logger.debug(f"{self.session_id} in {self.state} state") - - if self.state in [GlueSessionState.TIMEOUT, GlueSessionState.STOPPED, GlueSessionState.FAILED]: - logger.debug(f"Deleting the session {self.credentials.glue_session_id} in order to create it back") - self.client.delete_session(Id=self.credentials.glue_session_id) - logger.debug(f"Creating the session {self.credentials.glue_session_id}") - self._create_session() - + logger.debug(f"A new session {new_id} is created") + self._session = self.client.create_session( + Id=new_id, + Role=self._create_session_config["role_arn"], + DefaultArguments=args, + Command={ + "Name": "glueetl", + "PythonVersion": "3" + }, + **additional_args) except Exception as e: - logger.error(f"Session does not exists or could not be fetched : {e}") - logger.debug(f"Creating the session {self.credentials.glue_session_id}") - self._create_session() - + logger.error( + f"Got an error when attempting to open a GlueSession : {e}" + ) + raise dbterrors.FailedToConnectError(str(e)) + self._session_create_time = time.time() - else: - self._create_session() - + def _init_session(self): logger.debug("GlueConnection _init_session called for session_id : " + self.session_id) statement = GlueStatement(client=self.client, session_id=self.session_id, code=SQLPROXY) diff --git a/dbt/adapters/glue/gluedbapi/cursor.py b/dbt/adapters/glue/gluedbapi/cursor.py index 942cbb98..aff0c8bd 100644 --- a/dbt/adapters/glue/gluedbapi/cursor.py +++ b/dbt/adapters/glue/gluedbapi/cursor.py @@ -206,7 +206,6 @@ def __next__(self): raise StopIteration return item - @property def description(self): logger.debug("GlueCursor description called") if self.response: diff --git a/dbt/adapters/glue/impl.py b/dbt/adapters/glue/impl.py index f99aafc7..358b3a74 100644 --- a/dbt/adapters/glue/impl.py +++ b/dbt/adapters/glue/impl.py @@ -11,9 +11,9 @@ from dbt.adapters.base import available from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.base.column import Column from dbt.adapters.sql import SQLAdapter from dbt.adapters.glue import GlueConnectionManager -from dbt.adapters.glue.column import GlueColumn from dbt.adapters.glue.gluedbapi import GlueConnection from dbt.adapters.glue.relation import SparkRelation from dbt.adapters.glue.lakeformation import ( @@ -33,7 +33,6 @@ class GlueAdapter(SQLAdapter): ConnectionManager = GlueConnectionManager Relation = SparkRelation - Column = GlueColumn relation_type_map = {'EXTERNAL_TABLE': 'table', 'MANAGED_TABLE': 'table', @@ -87,15 +86,15 @@ def get_connection(self): RoleSessionName="dbt" ) credentials = assumed_role_object['Credentials'] - glue_client = boto3.client("glue", region_name=glueSession.credentials.region, - aws_access_key_id=credentials['AccessKeyId'], - aws_secret_access_key=credentials['SecretAccessKey'], - aws_session_token=credentials['SessionToken']) - return glueSession, glue_client + session = boto3.Session( + aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'] + ) - glue_client = boto3.client("glue", region_name=glueSession.credentials.region) + client = boto3.client("glue", region_name=glueSession.credentials.region) - return glueSession, glue_client + return glueSession, client def list_schemas(self, database: str) -> List[str]: session, client = self.get_connection() diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index eecf822e..99e20a48 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -66,13 +66,6 @@ {{ create_temporary_view(relation, sql) }} {%- else -%} create table {{ relation }} - {% set contract_config = config.get('contract') %} - {% if contract_config.enforced %} - {{ get_assert_columns_equivalent(sql) }} - {#-- This does not enforce contstraints and needs to be a TODO #} - {#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #} - {#-- you do not specify the columns #} - {% endif %} {{ glue__file_format_clause() }} {{ partition_cols(label="partitioned by") }} {{ clustered_cols(label="clustered by") }} @@ -131,10 +124,6 @@ {% endmacro %} {% macro glue__create_view_as(relation, sql) -%} - {%- set contract_config = config.get('contract') -%} - {%- if contract_config.enforced -%} - {{ get_assert_columns_equivalent(sql) }} - {%- endif -%} DROP VIEW IF EXISTS {{ relation }} dbt_next_query create view {{ relation }}