-
Notifications
You must be signed in to change notification settings - Fork 0
/
snowflakehandler.py
131 lines (105 loc) · 3.72 KB
/
snowflakehandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import pandas as pd
import snowflake.connector as snf
import logging
from argparse import ArgumentParser
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class SnowflakeQueryRunner:
"""
A class to run optimized SQL queries on a Snowflake database.
Attributes:
connection_params (dict): Dictionary containing Snowflake connection parameters.
"""
def __init__(self, connection_params: dict) -> None:
"""
Initializes the SnowflakeQueryRunner with connection parameters.
Args:
connection_params (dict): Snowflake connection parameters.
"""
assert isinstance(connection_params, dict), "connection_params must be a dictionary."
self.connection_params = connection_params
self.connection = None
def connect_to_snowflake(self) -> None:
"""
Connects to the Snowflake database.
Raises:
Exception: If connection fails.
"""
try:
self.connection = snf.connect(**self.connection_params)
logging.info("Connected to Snowflake.")
except Exception as e:
logging.error(f"Failed to connect to Snowflake: {e}")
raise e
def execute_query(self, query: str) -> pd.DataFrame:
"""
Executes a query and returns the result as a DataFrame.
Args:
query (str): The SQL query to execute.
Returns:
pd.DataFrame: The result of the query as a DataFrame.
"""
assert isinstance(query, str), "query must be a string."
try:
cursor = self.connection.cursor()
cursor.execute("alter session set rows_per_resultset = 0")
cursor.execute(query)
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
cursor.close()
logging.info(f"Snowflake Query ID: {cursor.sfqid}")
return pd.DataFrame(result, columns=columns)
except Exception as e:
logging.error(f"Failed to execute query: {e}")
return None
def close_connection(self) -> None:
"""
Closes the connection to Snowflake.
"""
if self.connection:
self.connection.close()
logging.info("Disconnected from Snowflake.")
def main():
"""The main function to run the Snowflake Query Runner."""
parser = ArgumentParser(description="Snowflake Query Runner")
parser.add_argument(
"--query",
type=str,
required=True,
help="The SQL query to execute on Snowflake.",
)
parser.add_argument(
"--email_id", type=str, required=True, help="Your email ID for authentication."
)
parser.add_argument(
"--schema", type=str, required=True, help="Snowflake schema name."
)
parser.add_argument(
"--database", type=str, required=True, help="Snowflake database name."
)
parser.add_argument(
"--warehouse", type=str, required=True, help="Snowflake warehouse name."
)
parser.add_argument(
"--role", type=str, required=True, help="Snowflake role for access."
)
args = parser.parse_args()
connection_params = {
"user": args.email_id,
"account": "xxxxxxxxxx.us-east-1",
"schema": args.schema,
"database": args.database,
"warehouse": args.warehouse,
"role": args.role,
"authenticator": "externalbrowser",
}
runner = SnowflakeQueryRunner(connection_params)
runner.connect_to_snowflake()
df = runner.execute_query(args.query)
if df is not None:
print(df)
runner.close_connection()
if __name__ == "__main__":
main()