-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo-easy-v1.sql
190 lines (168 loc) · 5.24 KB
/
demo-easy-v1.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
-- backend: flink
-- config: easy_sql.etl_type=streaming
-- config: flink.pipeline.jars=/opt/flink/lib/userlib/paimon-flink-1.16-0.5-20230515.002018-12.jar;/opt/flink/lib/userlib/flink-sql-connector-postgres-cdc-2.3.0.jar;/opt/flink/lib/userlib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-- config: flink.execution.checkpointing.interval=10 s
-- target=action
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 'file:///opt/flink/paimon'
);
-- target=action
USE CATALOG paimon;
-- target=action
CREATE TABLE if not exists ods_orders (
order_id STRING,
product_id STRING,
customer_id STRING,
purchase_timestamp TIMESTAMP_LTZ,
dd STRING,
hh INT,
pts as PROCTIME(),
ts as cast(purchase_timestamp as TIMESTAMP_LTZ(3)),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
PRIMARY KEY (order_id, dd, hh) NOT ENFORCED
) PARTITIONED BY (dd ,hh)
with (
'changelog-producer' = 'input'
);
-- target=action
-- register a PostgreSQL table 'orders' in Flink SQL
CREATE TEMPORARY TABLE if not exists orders_source (
order_id STRING,
product_id STRING,
customer_id STRING,
purchase_timestamp TIMESTAMP_LTZ,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'orders',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'orders'
);
-- target=action
-- TODO: set config from env
-- set 'pipeline.name' = 'ods_orders';
insert into ods_orders select *, DATE_FORMAT(purchase_timestamp, 'yyyy-MM-dd') as dd, cast( hour(purchase_timestamp) as int) as hh from orders_source;
-- target=action
CREATE TABLE if not exists ods_products (
product_id STRING,
product_name STRING,
sale_price INT,
product_rating DOUBLE,
ts TIMESTAMP_LTZ(3),
-- WATERMARK FOR ts AS ts,
pts as PROCTIME(),
PRIMARY KEY (product_id) NOT ENFORCED
) with (
'changelog-producer' = 'input'
);
-- target=action
CREATE TEMPORARY TABLE if not exists products_source (
product_id STRING,
product_name STRING,
sale_price INT,
product_rating DOUBLE,
ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'products',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'products'
);
-- target=action
-- set 'pipeline.name' = 'ods_products';
insert into ods_products select * from products_source;
-- target=action
-- ods customers
CREATE TABLE if not exists ods_customers (
customer_id STRING,
customer_name STRING,
ts TIMESTAMP_LTZ(3),
pts as PROCTIME(),
PRIMARY KEY (customer_id) NOT ENFORCED
) with (
'changelog-producer' = 'input'
);
-- target=action
CREATE TEMPORARY TABLE if not exists customers_source (
customer_id STRING,
customer_name STRING,
ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'customers',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'customers'
);
-- target=action
-- set 'pipeline.name' = 'ods_customers';
insert into ods_customers select * from customers_source;
-- target=action
-- order enriched with product info at the time and with customer info
CREATE TABLE if not exists dwd_order_enriched (
order_id STRING,
product_id STRING,
customer_id STRING,
purchase_timestamp TIMESTAMP,
product_name STRING,
sale_price INT,
customer_name STRING,
PRIMARY KEY (order_id) NOT ENFORCED
)
with (
'changelog-producer' = 'lookup',
'merge-engine' = 'partial-update',
'partial-update.ignore-delete' = 'true'
);
-- target=action
-- enriched by order_with_product_at_the_time
-- set 'pipeline.name' = 'enriched by order_with_product_at_the_time';
insert into dwd_order_enriched(order_id, product_id, customer_id, purchase_timestamp, product_name, sale_price)
select
/*+ LOOKUP('table'='ods_products', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') */
o.order_id, o.product_id, o.customer_id, o.purchase_timestamp, p.product_name, p.sale_price
from ods_orders o
left join ods_products FOR SYSTEM_TIME AS OF o.pts as p
on o.product_id = p.product_id;
-- target=action
-- enriched by customer
-- set 'pipeline.name' = 'enriched by customer';
insert into dwd_order_enriched(order_id, customer_name)
select o.order_id, c.customer_name
from ods_customers c
join ods_orders o
on c.customer_id = o.customer_id;
-- target=action
-- product_sales_by_hh
CREATE TABLE if not exists dws_product_sales_by_hh (
product_id STRING,
hh BIGINT,
sales INT,
PRIMARY KEY (product_id, hh) NOT ENFORCED
) WITH (
'changelog-producer' = 'lookup',
'merge-engine' = 'aggregation',
'fields.sales.aggregate-function' = 'sum'
);
-- target=action
-- set 'pipeline.name' = 'dws_product_sales_by_hh';
insert into dws_product_sales_by_hh
select oe.product_id, hour(oe.purchase_timestamp) as hh, sale_price from dwd_order_enriched oe;