Skip to content

Commit

Permalink
Revert several commits to fix the unit and integration tests (#319)
Browse files Browse the repository at this point in the history
* Revert "Bump black from 23.11.0 to 23.12.0 (#303)"

This reverts commit ee2aa88.

* Revert "Update versions"

This reverts commit f6a377e.

* Revert "Adds minimal model contract enforcement for glue adapter (#297)"

This reverts commit 4192346.

* Revert "Fix wrong role create db#285 (#286)"

This reverts commit 1416de8.

* Revert "Glue session: enable users to fix their glue_session_id name and re-use it (#301)"

This reverts commit d2d31f1.

* Revert "Revert "Update versions""

This reverts commit 2b5e656.

* Revert "Revert "Bump black from 23.11.0 to 23.12.0 (#303)""

This reverts commit 9a2b094.

* Update CHANGELOG
  • Loading branch information
aajisaka authored Jan 24, 2024
1 parent 11e5357 commit 28fdcf7
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 126 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var/
*.egg
*.mypy_cache/
logs/
.venv

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## next version
- Revert "Adds limited model contract enforcement"
- Revert "glue_session_id is automatically created or re-used when user provides it"
- Revert "Fix wrong role create db"

## v1.7.1
- Remove unnecessary parameter for Delta Lake from readme
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ The table below describes all the options.
| seed_format | By default `parquet`, can be Spark format compatible like `csv` or `json` | no |
| seed_mode | By default `overwrite`, the seed data will be overwritten, you can set it to `append` if you just want to add new data in your dataset | no |
| default_arguments | The map of key value pairs parameters belonging to the session. More information on [Job parameters used by AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html). Ex: `--enable-continuous-cloudwatch-log=true,--enable-continuous-log-filter=true` | no |
| glue_session_id | re-use a glue-session to run multiple dbt run commands. Will create a new glue-session using glue_session_id if it does not exists yet. | no |
| glue_session_reuse | re-use the glue-session to run multiple dbt run commands: If set to true, the glue session will not be closed for re-use. If set to false, the session will be closed. The glue session will close after idle_timeout time is expired after idle_timeout time | no |
| glue_session_id | re-use the glue-session to run multiple dbt run commands: set a glue session id you need to use | no |
| glue_session_reuse | re-use the glue-session to run multiple dbt run commands: If set to true, the glue session will not be closed for re-use. If set to false, the session will be closed | no |
| datalake_formats | The ACID datalake format that you want to use if you are doing merge, can be `hudi`, `ìceberg` or `delta` |no|

## Configs
Expand Down
16 changes: 0 additions & 16 deletions dbt/adapters/glue/column.py

This file was deleted.

11 changes: 1 addition & 10 deletions dbt/adapters/glue/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ class GlueConnectionManager(SQLConnectionManager):
TYPE = "glue"
GLUE_CONNECTIONS_BY_KEY: Dict[str, GlueConnection] = {}

@classmethod
def data_type_code_to_name(cls, type_code: str) -> str:
"""
Get the string representation of the data type from the metadata. Dbt performs a
query to retrieve the types of the columns in the SQL query. Then these types are compared
to the types in the contract config, simplified because they need to match what is returned
by metadata (we are only interested in the broader type, without subtypes nor granularity).
"""
return type_code.split("(")[0].split("<")[0].upper()

@classmethod
def open(cls, connection):
Expand Down Expand Up @@ -112,7 +103,7 @@ def get_result_from_cursor(cls, cursor: GlueCursor, limit: Optional[int]) -> aga
data: List[Any] = []
column_names: List[str] = []
if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
column_names = [col[0] for col in cursor.description()]
if limit:
rows = cursor.fetchmany(limit)
else:
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/glue/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GlueCredentials(Credentials):
seed_mode: Optional[str] = "overwrite"
default_arguments: Optional[str] = None
iceberg_glue_commit_lock_table: Optional[str] = "myGlueLockTable"
use_interactive_session_role_for_api_calls: bool = True
use_interactive_session_role_for_api_calls: bool = False
lf_tags: Optional[str] = None
glue_session_id: Optional[str] = None
glue_session_reuse: Optional[bool] = False
Expand Down
136 changes: 61 additions & 75 deletions dbt/adapters/glue/gluedbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,103 +70,89 @@ def _connect(self):

return self.session_id

def _create_session(self):
args = {
def _start_session(self):
logger.debug("GlueConnection _start_session called")

if self.credentials.glue_session_id:
logger.debug(f"The existing session {self.credentials.glue_session_id} is used")
try:
self._session = self.client.get_session(
Id=self.credentials.glue_session_id,
RequestOrigin='string'
)
except Exception as e:
logger.error(
f"Got an error when attempting to open a GlueSession : {e}"
)
raise dbterrors.FailedToConnectError(str(e))

self._session_create_time = time.time()
else:
args = {
"--enable-glue-datacatalog": "true"
}

if (self._create_session_config["default_arguments"] is not None):
args.update(self._string_to_dict(self._create_session_config["default_arguments"].replace(' ', '')))

if (self._create_session_config["extra_jars"] is not None):
args["--extra-jars"] = f"{self._create_session_config['extra_jars']}"
if (self._create_session_config["default_arguments"] is not None):
args.update(self._string_to_dict(self._create_session_config["default_arguments"].replace(' ', '')))

if (self._create_session_config["conf"] is not None):
args["--conf"] = f"{self._create_session_config['conf']}"
if (self._create_session_config["extra_jars"] is not None):
args["--extra-jars"] = f"{self._create_session_config['extra_jars']}"

if (self._create_session_config["extra_py_files"] is not None):
args["--extra-py-files"] = f"{self._create_session_config['extra_py_files']}"
if (self._create_session_config["conf"] is not None):
args["--conf"] = f"{self._create_session_config['conf']}"

additional_args = {}
additional_args["NumberOfWorkers"] = self._create_session_config["workers"]
additional_args["WorkerType"] = self._create_session_config["worker_type"]
additional_args["IdleTimeout"] = self._create_session_config["idle_timeout"]
additional_args["Timeout"] = self._create_session_config["query_timeout_in_minutes"]
additional_args["RequestOrigin"] = 'dbt-glue'
if (self._create_session_config["extra_py_files"] is not None):
args["--extra-py-files"] = f"{self._create_session_config['extra_py_files']}"

if (self._create_session_config['glue_version'] is not None):
additional_args["GlueVersion"] = f"{self._create_session_config['glue_version']}"
additional_args = {}
additional_args["NumberOfWorkers"] = self._create_session_config["workers"]
additional_args["WorkerType"] = self._create_session_config["worker_type"]
additional_args["IdleTimeout"] = self._create_session_config["idle_timeout"]
additional_args["Timeout"] = self._create_session_config["query_timeout_in_minutes"]
additional_args["RequestOrigin"] = 'dbt-glue'

if (self._create_session_config['security_configuration'] is not None):
additional_args["SecurityConfiguration"] = f"{self._create_session_config['security_configuration']}"
if (self._create_session_config['glue_version'] is not None):
additional_args["GlueVersion"] = f"{self._create_session_config['glue_version']}"

if (self._create_session_config["connections"] is not None):
additional_args["Connections"] = {"Connections": list(set(self._create_session_config["connections"].split(',')))}
if (self._create_session_config['security_configuration'] is not None):
additional_args["SecurityConfiguration"] = f"{self._create_session_config['security_configuration']}"

if (self._create_session_config["tags"] is not None):
additional_args["Tags"] = self._string_to_dict(self._create_session_config["tags"])
if (self._create_session_config["connections"] is not None):
additional_args["Connections"] = {"Connections": list(set(self._create_session_config["connections"].split(',')))}

if (self.credentials.datalake_formats is not None):
args["--datalake-formats"] = f"{self.credentials.datalake_formats}"
if (self._create_session_config["tags"] is not None):
additional_args["Tags"] = self._string_to_dict(self._create_session_config["tags"])

if (self.credentials.datalake_formats is not None):
args["--datalake-formats"] = f"{self.credentials.datalake_formats}"

if self.credentials.glue_session_id:
new_id = self.credentials.glue_session_id
else:
session_uuid = uuid.uuid4()
session_uuid_str = str(session_uuid)
session_prefix = self._create_session_config["role_arn"].partition('/')[2] or self._create_session_config["role_arn"]
new_id = f"{session_prefix}-dbt-glue-{session_uuid_str}"

if self._session_id_suffix:
new_id = f"{new_id}-{self._session_id_suffix}"
if self._session_id_suffix:
new_id = f"{new_id}-{self._session_id_suffix}"

try:
logger.debug(f"A new session {new_id} is created")
self._session = self.client.create_session(
Id=new_id,
Role=self._create_session_config["role_arn"],
DefaultArguments=args,
Command={
"Name": "glueetl",
"PythonVersion": "3"
},
**additional_args)
except Exception as e:
logger.error(
f"Got an error when attempting to open a GlueSession : {e}"
)
raise dbterrors.FailedToConnectError(str(e))

self._session_create_time = time.time()

def _start_session(self):
logger.debug("GlueConnection _start_session called")

if self.credentials.glue_session_id:
logger.debug(f"Fetching session {self.credentials.glue_session_id}")
try:
self._session = self.client.get_session(
Id=self.credentials.glue_session_id,
RequestOrigin='string'
)
logger.debug(f"{self.session_id} in {self.state} state")

if self.state in [GlueSessionState.TIMEOUT, GlueSessionState.STOPPED, GlueSessionState.FAILED]:
logger.debug(f"Deleting the session {self.credentials.glue_session_id} in order to create it back")
self.client.delete_session(Id=self.credentials.glue_session_id)
logger.debug(f"Creating the session {self.credentials.glue_session_id}")
self._create_session()

logger.debug(f"A new session {new_id} is created")
self._session = self.client.create_session(
Id=new_id,
Role=self._create_session_config["role_arn"],
DefaultArguments=args,
Command={
"Name": "glueetl",
"PythonVersion": "3"
},
**additional_args)
except Exception as e:
logger.error(f"Session does not exists or could not be fetched : {e}")
logger.debug(f"Creating the session {self.credentials.glue_session_id}")
self._create_session()

logger.error(
f"Got an error when attempting to open a GlueSession : {e}"
)
raise dbterrors.FailedToConnectError(str(e))

self._session_create_time = time.time()
else:
self._create_session()


def _init_session(self):
logger.debug("GlueConnection _init_session called for session_id : " + self.session_id)
statement = GlueStatement(client=self.client, session_id=self.session_id, code=SQLPROXY)
Expand Down
1 change: 0 additions & 1 deletion dbt/adapters/glue/gluedbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def __next__(self):
raise StopIteration
return item

@property
def description(self):
logger.debug("GlueCursor description called")
if self.response:
Expand Down
17 changes: 8 additions & 9 deletions dbt/adapters/glue/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

from dbt.adapters.base import available
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.column import Column
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.glue import GlueConnectionManager
from dbt.adapters.glue.column import GlueColumn
from dbt.adapters.glue.gluedbapi import GlueConnection
from dbt.adapters.glue.relation import SparkRelation
from dbt.adapters.glue.lakeformation import (
Expand All @@ -33,7 +33,6 @@
class GlueAdapter(SQLAdapter):
ConnectionManager = GlueConnectionManager
Relation = SparkRelation
Column = GlueColumn

relation_type_map = {'EXTERNAL_TABLE': 'table',
'MANAGED_TABLE': 'table',
Expand Down Expand Up @@ -87,15 +86,15 @@ def get_connection(self):
RoleSessionName="dbt"
)
credentials = assumed_role_object['Credentials']
glue_client = boto3.client("glue", region_name=glueSession.credentials.region,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
return glueSession, glue_client
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)

glue_client = boto3.client("glue", region_name=glueSession.credentials.region)
client = boto3.client("glue", region_name=glueSession.credentials.region)

return glueSession, glue_client
return glueSession, client

def list_schemas(self, database: str) -> List[str]:
session, client = self.get_connection()
Expand Down
11 changes: 0 additions & 11 deletions dbt/include/glue/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@
{{ create_temporary_view(relation, sql) }}
{%- else -%}
create table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{#-- This does not enforce contstraints and needs to be a TODO #}
{#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #}
{#-- you do not specify the columns #}
{% endif %}
{{ glue__file_format_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
Expand Down Expand Up @@ -131,10 +124,6 @@
{% endmacro %}

{% macro glue__create_view_as(relation, sql) -%}
{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(sql) }}
{%- endif -%}
DROP VIEW IF EXISTS {{ relation }}
dbt_next_query
create view {{ relation }}
Expand Down

0 comments on commit 28fdcf7

Please sign in to comment.