From 8c3653b1e4338fe2373d352f3468f34d7c386b51 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 5 Jul 2024 14:27:24 +0100 Subject: [PATCH] [DO NOT MERGE] PoC for fetching metrics from SnowflakeCursor --- dbt/adapters/snowflake/connections.py | 55 ++++++++++++++++----------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index 6e9a5aaba..64931a5c2 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -1,22 +1,34 @@ import base64 import datetime import os - -import pytz import re from contextlib import contextmanager -from dataclasses import dataclass +from dataclasses import dataclass, field from io import StringIO from time import sleep +from typing import Optional, Tuple, Union, Any, List, Iterable, TYPE_CHECKING, Dict -from typing import Optional, Tuple, Union, Any, List, Iterable, TYPE_CHECKING - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization +import pytz import requests import snowflake.connector import snowflake.connector.constants import snowflake.connector.errors +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from dbt.adapters.contracts.connection import AdapterResponse, Connection, Credentials +from dbt.adapters.events.logging import AdapterLogger +from dbt.adapters.events.types import AdapterEventWarning, AdapterEventError +from dbt.adapters.exceptions.connection import FailedToConnectError +from dbt.adapters.sql import SQLConnectionManager +from dbt_common.events.functions import warn_or_error +from dbt_common.exceptions import DbtDatabaseError +from dbt_common.exceptions import ( + DbtInternalError, + DbtRuntimeError, + DbtConfigError, +) +from dbt_common.ui import line_wrap_message, warning_tag +from snowflake.connector.cursor import SnowflakeCursor from snowflake.connector.errors import ( Error, DatabaseError, @@ -29,25 +41,11 @@ OtherHTTPRetryableError, BindUploadError, ) - -from dbt_common.exceptions import ( - DbtInternalError, - DbtRuntimeError, - DbtConfigError, -) -from dbt_common.exceptions import DbtDatabaseError -from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt.adapters.contracts.connection import AdapterResponse, Connection, Credentials -from dbt.adapters.sql import SQLConnectionManager -from dbt.adapters.events.logging import AdapterLogger -from dbt_common.events.functions import warn_or_error -from dbt.adapters.events.types import AdapterEventWarning, AdapterEventError -from dbt_common.ui import line_wrap_message, warning_tag +from snowflake.connector.telemetry import TelemetryField if TYPE_CHECKING: import agate - logger = AdapterLogger("Snowflake") if os.getenv("DBT_SNOWFLAKE_CONNECTOR_DEBUG_LOGGING"): @@ -66,6 +64,15 @@ @dataclass class SnowflakeAdapterResponse(AdapterResponse): query_id: str = "" + metrics: Dict[str, Any] = field(default_factory=dict) + + +class DbtSnowflakeCursor(SnowflakeCursor): + telemetry_job_data: Dict[str, Any] = {} + + def _log_telemetry_job_data(self, telemetry_field: TelemetryField, value: Any) -> None: + self.telemetry_job_data[telemetry_field.name] = value + return super()._log_telemetry_job_data(telemetry_field, value) @dataclass @@ -446,6 +453,7 @@ def get_response(cls, cursor) -> SnowflakeAdapterResponse: rows_affected=cursor.rowcount, code=code, query_id=cursor.sfqid, + metrics=cursor.telemetry_job_data, ) # disable transactional logic by default on Snowflake @@ -497,6 +505,9 @@ def process_results(cls, column_names, rows): # to replace them with sane timezones. return super().process_results(column_names, cls._fix_rows(rows)) + def get_cursor(self, connection: Connection): + return connection.handle.cursor(cursor_class=DbtSnowflakeCursor) + def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None ) -> Tuple[AdapterResponse, "agate.Table"]: