-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport_view_query.py
56 lines (43 loc) · 1.51 KB
/
import_view_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
import shutil
from google.cloud import bigquery
QUERY_DIR = 'query'
try:
DATASET_ID_LIST = os.environ['DATASETS'].split(',')
except KeyError:
raise RuntimeError("Environment Variable `DATASETS` must be set for view query import.")
TABLE_ID = os.environ.get('TABLE')
client = bigquery.Client()
def main():
views = list()
for dataset_id in DATASET_ID_LIST:
table_iter = client.list_tables(dataset_id)
views.extend([table for table in table_iter if table.table_type == 'VIEW'])
should_replace_all_queries = not TABLE_ID
if should_replace_all_queries:
remove_old_queries()
for view in views:
if TABLE_ID and view.table_id != TABLE_ID:
continue
table = client.get_table(view)
filepath = f"{QUERY_DIR}/{table.dataset_id}/{table.table_id}.sql"
save_query(filepath, table.view_query)
def remove_old_queries():
"""
Delete all existing view queries to avoid phantom cache.
"""
for dataset_id in DATASET_ID_LIST:
dirpath = f"{QUERY_DIR}/{dataset_id}"
print(f"Delete all queries: {dirpath}")
shutil.rmtree(dirpath, ignore_errors=True)
def save_query(filepath: str, query: str):
"""Save new view query"""
print(f"Sync new query from bq: {filepath}")
dirname = os.path.dirname(filepath)
os.makedirs(dirname, exist_ok=True)
if os.path.exists(filepath):
os.remove(filepath)
with open(filepath, 'w') as file:
file.write(query)
if __name__ == '__main__':
main()