-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow big seeds #447
Allow big seeds #447
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -534,9 +534,37 @@ def create_csv_table(self, model, agate_table): | |
else: | ||
mode = "False" | ||
|
||
code = f''' | ||
custom_glue_code_for_dbt_adapter | ||
csv = {json.loads(f.getvalue())} | ||
csv_chunks = self._split_csv_records_into_chunks(json.loads(f.getvalue())) | ||
statements = self._map_csv_chunks_to_code(csv_chunks, session, model, mode) | ||
try: | ||
cursor = session.cursor() | ||
for statement in statements: | ||
cursor.execute(statement) | ||
except DbtDatabaseError as e: | ||
raise DbtDatabaseError(msg="GlueCreateCsvFailed") from e | ||
except Exception as e: | ||
logger.error(e) | ||
|
||
def _map_csv_chunks_to_code(self, csv_chunks: list[list[dict]], session: GlueConnection, model, mode): | ||
statements = [] | ||
for i, csv_chunk in enumerate(csv_chunks): | ||
is_first = i == 0 | ||
is_last = i == len(csv_chunks) - 1 | ||
code = "custom_glue_code_for_dbt_adapter\n" | ||
if is_first: | ||
code += f""" | ||
csv = {csv_chunk} | ||
""" | ||
else: | ||
code += f""" | ||
csv.extend({csv_chunk}) | ||
""" | ||
if not is_last: | ||
code += f''' | ||
SqlWrapper2.execute("""select 1""") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's so the cursor execute can retrieve the response. Otherwise it breaks when retrieving the result on if self.connection.use_arrow:
result_bucket = self.response.get("result_bucket")
result_key = self.response.get("result_key")
if result_bucket and result_key:
pdf = get_pandas_dataframe_from_result_file(result_bucket, result_key)
self.result = pdf.to_dict('records')[0] or on
I noticed it being handled in this same way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
''' | ||
else: | ||
code += f''' | ||
df = spark.createDataFrame(csv) | ||
table_name = '{model["schema"]}.{model["name"]}' | ||
if (spark.sql("show tables in {model["schema"]}").where("tableName == lower('{model["name"]}')").count() > 0): | ||
|
@@ -551,20 +579,25 @@ def create_csv_table(self, model, agate_table): | |
.saveAsTable(table_name) | ||
SqlWrapper2.execute("""select * from {model["schema"]}.{model["name"]} limit 1""") | ||
''' | ||
try: | ||
session.cursor().execute(code) | ||
except DbtDatabaseError as e: | ||
raise DbtDatabaseError(msg="GlueCreateCsvFailed") from e | ||
except Exception as e: | ||
logger.error(e) | ||
statements.append(code) | ||
return statements | ||
|
||
def _split_csv_records_into_chunks(self, records: list[dict], target_size=60000): | ||
chunks = [[]] | ||
for record in records: | ||
if len(str([*chunks[-1], record])) > target_size: | ||
chunks.append([record]) | ||
else: | ||
chunks[-1].append(record) | ||
return chunks | ||
|
||
def _update_additional_location(self, target_relation, location): | ||
session, client = self.get_connection() | ||
table_input = {} | ||
try: | ||
table_input = client.get_table( | ||
DatabaseName=f'{target_relation.schema}', | ||
Name=f'{session.credentials.delta_athena_prefix}_{target_relation.name}', | ||
DatabaseName=f"{target_relation.schema}", | ||
Name=f"{session.credentials.delta_athena_prefix}_{target_relation.name}", | ||
).get("Table", {}) | ||
except client.exceptions.EntityNotFoundException as e: | ||
logger.debug(e) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,12 @@ | ||
from typing import Any, Dict, Optional | ||
import unittest | ||
from unittest import mock | ||
from unittest.mock import Mock | ||
from multiprocessing import get_context | ||
from botocore.client import BaseClient | ||
from moto import mock_aws | ||
|
||
import agate | ||
from dbt.config import RuntimeConfig | ||
|
||
import dbt.flags as flags | ||
|
@@ -86,3 +88,15 @@ def test_get_table_type(self): | |
connection = adapter.acquire_connection("dummy") | ||
connection.handle # trigger lazy-load | ||
self.assertEqual(adapter.get_table_type(target_relation), "iceberg_table") | ||
|
||
def test_create_csv_table_slices_big_datasets(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add another test to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't get what you mean. What would be the test scenario? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah it was my bad, this test case already covered required one. |
||
config = self._get_config() | ||
adapter = GlueAdapter(config, get_context("spawn")) | ||
model = {"name": "mock_model", "schema": "mock_schema"} | ||
session_mock = Mock() | ||
adapter.get_connection = lambda: (session_mock, 'mock_client') | ||
test_table = agate.Table([(f'mock_value_{i}',f'other_mock_value_{i}') for i in range(2000)], column_names=['value', 'other_value']) | ||
adapter.create_csv_table(model, test_table) | ||
|
||
# test table is between 120000 and 180000 characters so it should be split three times (max chunk is 60000) | ||
self.assertEqual(session_mock.cursor().execute.call_count, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to have this per chunk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so the way this works is that it slices the data across multiple statement executions, appending the data to an array, with an execute per chunk.
Since cursor implementation differentiates between python code and sql code by checking if the code contains "custom_glue_code_for_dbt_adapter", otherwise wrapping it in the SqlWrapper, we need it so it's executed as python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense