-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathfivetran_platform__audit_table.sql
142 lines (112 loc) · 5.51 KB
/
fivetran_platform__audit_table.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
{{ config(
materialized='incremental',
unique_key='unique_table_sync_key',
partition_by={
'field': 'sync_start',
'data_type': 'timestamp',
'granularity': 'day'
} if target.type == 'bigquery' else ['sync_start_day'],
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert',
file_format='parquet'
) }}
with sync_log as (
select
*,
{{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }} as table_name
from {{ ref('stg_fivetran_platform__log') }}
where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end', 'records_modified')
{% if is_incremental() %}
-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('max_sync_start', fetch_result=True) -%}
select date(max(sync_start)) from {{ this }}
{%- endcall -%}
-- load the result from the above query into a new variable
{%- set query_result = load_result('max_sync_start') -%}
-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set max_sync_start = query_result['data'][0][0] -%}
-- compare the new batch of data to the latest sync already stored in this model
and date(created_at) >= '{{ max_sync_start }}'
{% endif %}
),
connector as (
select *
from {{ ref('fivetran_platform__connector_status') }}
),
add_connector_info as (
select
sync_log.*,
connector.connector_name,
connector.destination_id,
connector.destination_name
from sync_log
left join connector
on connector.connector_id = sync_log.connector_id
),
sync_timestamps as (
select
connector_id,
connector_name,
table_name,
event_subtype,
destination_id,
destination_name,
created_at as write_to_table_start,
min(case when event_subtype = 'write_to_table_end' then created_at else null end)
over (partition by connector_id, table_name order by created_at ROWS between CURRENT ROW AND UNBOUNDED FOLLOWING) as write_to_table_end,
max(case when event_subtype = 'sync_start' then created_at else null end)
over (partition by connector_id order by created_at ROWS between UNBOUNDED PRECEDING and CURRENT ROW) as sync_start,
min(case when event_subtype = 'sync_end' then created_at else null end)
over (partition by connector_id order by created_at ROWS between CURRENT ROW AND UNBOUNDED FOLLOWING) as sync_end, -- coalesce with next_sync_start
min(case when event_subtype = 'sync_start' then created_at else null end)
over (partition by connector_id order by created_at ROWS between CURRENT ROW AND UNBOUNDED FOLLOWING) as next_sync_start
from add_connector_info
),
-- this will be the base for every record in the final CTE
limit_to_table_starts as (
select *
from sync_timestamps
where event_subtype = 'write_to_table_start'
),
records_modified_log as (
select
connector_id,
created_at,
{{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_utils.json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_utils.json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_utils.json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count
from sync_log
where event_subtype = 'records_modified'
),
sum_records_modified as (
select
limit_to_table_starts.connector_id,
limit_to_table_starts.connector_name,
limit_to_table_starts.table_name,
limit_to_table_starts.destination_id,
limit_to_table_starts.destination_name,
limit_to_table_starts.write_to_table_start,
limit_to_table_starts.write_to_table_end,
limit_to_table_starts.sync_start,
case when limit_to_table_starts.sync_end > limit_to_table_starts.next_sync_start then null else limit_to_table_starts.sync_end end as sync_end,
sum(case when records_modified_log.operation_type = 'REPLACED_OR_INSERTED' then records_modified_log.row_count else 0 end) as sum_rows_replaced_or_inserted,
sum(case when records_modified_log.operation_type = 'UPDATED' then records_modified_log.row_count else 0 end) as sum_rows_updated,
sum(case when records_modified_log.operation_type = 'DELETED' then records_modified_log.row_count else 0 end) as sum_rows_deleted
from limit_to_table_starts
left join records_modified_log on
limit_to_table_starts.connector_id = records_modified_log.connector_id
and limit_to_table_starts.table_name = records_modified_log.table_name
-- confine it to one sync
and records_modified_log.created_at > limit_to_table_starts.sync_start
and records_modified_log.created_at < coalesce(limit_to_table_starts.sync_end, limit_to_table_starts.next_sync_start)
{{ dbt_utils.group_by(n=9) }}
),
final as (
select
*,
{{ dbt_utils.generate_surrogate_key(['connector_id', 'destination_id', 'table_name', 'write_to_table_start']) }} as unique_table_sync_key, -- for incremental materialization
{{ dbt.date_trunc('day', 'sync_start') }} as sync_start_day -- for partitioning in databricks
from sum_records_modified
)
select *
from final