-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_conn.py
29 lines (22 loc) · 831 Bytes
/
get_conn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from snowflake import connector
def get_conn(user, account, private_key_path, passphrase):
with open(private_key_path, 'rb') as f_key:
p_key = serialization.load_pem_private_key(
f_key.read(),
password=passphrase,
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
conn_config = {
'user': user,
'account': account,
'private_key': pkb,
'authenticator': 'snowflake'
}
return connector.connect(**conn_config)