From f57894d3e244953b69ea635a8365db9f7257e9d0 Mon Sep 17 00:00:00 2001 From: Jaume Sanjuan Date: Mon, 23 Sep 2024 18:17:44 +0200 Subject: [PATCH 1/5] allow bigger seeds --- dbt/adapters/glue/impl.py | 55 ++++++++++++++++++++++++++++++-------- tests/unit/test_adapter.py | 14 ++++++++++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/dbt/adapters/glue/impl.py b/dbt/adapters/glue/impl.py index 5dae40c..1ad71ca 100644 --- a/dbt/adapters/glue/impl.py +++ b/dbt/adapters/glue/impl.py @@ -534,9 +534,37 @@ def create_csv_table(self, model, agate_table): else: mode = "False" - code = f''' -custom_glue_code_for_dbt_adapter -csv = {json.loads(f.getvalue())} + csv_chunks = self._split_csv_records_into_chunks(json.loads(f.getvalue())) + statements = self._map_csv_chunks_to_code(csv_chunks, session, model, mode) + try: + cursor = session.cursor() + for statement in statements: + cursor.execute(statement) + except DbtDatabaseError as e: + raise DbtDatabaseError(msg="GlueCreateCsvFailed") from e + except Exception as e: + logger.error(e) + + def _map_csv_chunks_to_code(self, csv_chunks: list[list[dict]], session: GlueConnection, model, mode): + statements = [] + for i, csv_chunk in enumerate(csv_chunks): + is_first = i == 0 + is_last = i == len(csv_chunk) - 1 + code = "custom_glue_code_for_dbt_adapter\n" + if is_first: + code += f""" +csv = {csv_chunk} +""" + else: + code += f""" +csv.extend({csv_chunk}) +""" + if not is_last: + code += f''' +SqlWrapper2.execute("""select 1""") +''' + else: + code += f''' df = spark.createDataFrame(csv) table_name = '{model["schema"]}.{model["name"]}' if (spark.sql("show tables in {model["schema"]}").where("tableName == lower('{model["name"]}')").count() > 0): @@ -551,20 +579,25 @@ def create_csv_table(self, model, agate_table): .saveAsTable(table_name) SqlWrapper2.execute("""select * from {model["schema"]}.{model["name"]} limit 1""") ''' - try: - session.cursor().execute(code) - except DbtDatabaseError as e: - raise DbtDatabaseError(msg="GlueCreateCsvFailed") from e - except Exception as e: - logger.error(e) + statements.append(code) + return statements + + def _split_csv_records_into_chunks(self, records: list[dict], target_size=60000): + chunks = [[]] + for record in records: + if len(str([*chunks[-1], record])) > target_size: + chunks.append([record]) + else: + chunks[-1].append(record) + return chunks def _update_additional_location(self, target_relation, location): session, client = self.get_connection() table_input = {} try: table_input = client.get_table( - DatabaseName=f'{target_relation.schema}', - Name=f'{session.credentials.delta_athena_prefix}_{target_relation.name}', + DatabaseName=f"{target_relation.schema}", + Name=f"{session.credentials.delta_athena_prefix}_{target_relation.name}", ).get("Table", {}) except client.exceptions.EntityNotFoundException as e: logger.debug(e) diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 590251c..cd229b6 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -1,10 +1,12 @@ from typing import Any, Dict, Optional import unittest from unittest import mock +from unittest.mock import Mock from multiprocessing import get_context from botocore.client import BaseClient from moto import mock_aws +import agate from dbt.config import RuntimeConfig import dbt.flags as flags @@ -86,3 +88,15 @@ def test_get_table_type(self): connection = adapter.acquire_connection("dummy") connection.handle # trigger lazy-load self.assertEqual(adapter.get_table_type(target_relation), "iceberg_table") + + def test_create_csv_table_slices_big_datasets(self): + config = self._get_config() + adapter = GlueAdapter(config, get_context("spawn")) + model = {"name": "mock_model", "schema": "mock_schema"} + session_mock = Mock() + adapter.get_connection = lambda: (session_mock, 'mock_client') + test_table = agate.Table([(f'mock_value_{i}',f'other_mock_value_{i}') for i in range(2000)], column_names=['value', 'other_value']) + adapter.create_csv_table(model, test_table) + + # test table is between 120000 and 180000 characters so it should be split three times (max chunk is 60000) + self.assertEqual(session_mock.cursor().execute.call_count, 3) \ No newline at end of file From 16703672aeda594f23a58fbc1c4ca58db777fb64 Mon Sep 17 00:00:00 2001 From: Jaume Sanjuan Date: Mon, 23 Sep 2024 21:54:20 +0200 Subject: [PATCH 2/5] fix typo --- dbt/adapters/glue/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/glue/impl.py b/dbt/adapters/glue/impl.py index 1ad71ca..66c5777 100644 --- a/dbt/adapters/glue/impl.py +++ b/dbt/adapters/glue/impl.py @@ -549,7 +549,7 @@ def _map_csv_chunks_to_code(self, csv_chunks: list[list[dict]], session: GlueCon statements = [] for i, csv_chunk in enumerate(csv_chunks): is_first = i == 0 - is_last = i == len(csv_chunk) - 1 + is_last = i == len(csv_chunks) - 1 code = "custom_glue_code_for_dbt_adapter\n" if is_first: code += f""" From 034dde5624cb6f75f6a09b933517bc37663efa12 Mon Sep 17 00:00:00 2001 From: Jaume Sanjuan Date: Mon, 23 Sep 2024 22:55:00 +0200 Subject: [PATCH 3/5] Update Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4511ae..f763df1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Fix null values handling in seeds - Fix exceptions import for FailedToConnectError and ExecutableError - Fix the case-sensitive comparison on the seed name +- Allow to load big seed files ## v1.8.1 - Fix typo in README.md From f4687a68a7a52aef1a2719761b519b2a10e5f675 Mon Sep 17 00:00:00 2001 From: Jaume Sanjuan Date: Fri, 27 Sep 2024 12:18:04 +0200 Subject: [PATCH 4/5] update changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6ec05..f797ee2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## New version +- Allow to load big seed files + ## v1.8.6 - Fix session provisioning timeout and delay handling - Add write options for Delta format @@ -9,7 +12,6 @@ - Fix null values handling in seeds - Fix exceptions import for FailedToConnectError and ExecutableError - Fix the case-sensitive comparison on the seed name -- Allow to load big seed files ## v1.8.1 - Fix typo in README.md From ca3b153eb07b7a95172f80d22d6bc8978f3ad490 Mon Sep 17 00:00:00 2001 From: Jaume Sanjuan Date: Mon, 30 Sep 2024 10:21:52 +0200 Subject: [PATCH 5/5] fix type hints for python3.8 --- dbt/adapters/glue/impl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/glue/impl.py b/dbt/adapters/glue/impl.py index 66c5777..2ba68cf 100644 --- a/dbt/adapters/glue/impl.py +++ b/dbt/adapters/glue/impl.py @@ -545,7 +545,7 @@ def create_csv_table(self, model, agate_table): except Exception as e: logger.error(e) - def _map_csv_chunks_to_code(self, csv_chunks: list[list[dict]], session: GlueConnection, model, mode): + def _map_csv_chunks_to_code(self, csv_chunks: List[List[dict]], session: GlueConnection, model, mode): statements = [] for i, csv_chunk in enumerate(csv_chunks): is_first = i == 0 @@ -582,7 +582,7 @@ def _map_csv_chunks_to_code(self, csv_chunks: list[list[dict]], session: GlueCon statements.append(code) return statements - def _split_csv_records_into_chunks(self, records: list[dict], target_size=60000): + def _split_csv_records_into_chunks(self, records: List[dict], target_size=60000): chunks = [[]] for record in records: if len(str([*chunks[-1], record])) > target_size: