-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathksql.py
67 lines (50 loc) · 1.64 KB
/
ksql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Configures KSQL to combine station and turnstile data"""
import json
import logging
import requests
import topic_check
logger = logging.getLogger(__name__)
KSQL_URL = "http://localhost:8088"
#
# TODO: Complete the following KSQL statements.
# TODO: For the first statement, create a `turnstile` table from your turnstile topic.
# Make sure to use 'avro' datatype!
# TODO: For the second statment, create a `turnstile_summary` table by selecting from the
# `turnstile` table and grouping on station_id.
# Make sure to cast the COUNT of station id to `count`
# Make sure to set the value format to JSON
KSQL_STATEMENT = """
CREATE TABLE turnstile (
station_id INT,
station_name VARCHAR,
line VARCHAR
) WITH (
KAFKA_TOPIC = 'org.chicago.cta.turnstile',
VALUE_FORMAT = 'AVRO',
KEY='station_id'
);
CREATE TABLE turnstile_summary
WITH (VALUE_FORMAT = 'JSON',) AS
SELECT station_id , COUNT(station_id) AS count
FROM turntile
GROUP BY station_id;
"""
def execute_statement():
"""Executes the KSQL statement against the KSQL API"""
if topic_check.topic_exists("TURNSTILE_SUMMARY") is True:
return
logging.debug("executing ksql statement...")
resp = requests.post(
f"{KSQL_URL}/ksql",
headers={"Content-Type": "application/vnd.ksql.v1+json"},
data=json.dumps(
{
"ksql": KSQL_STATEMENT,
"streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"},
}
),
)
# Ensure that a 2XX status code was returned
resp.raise_for_status()
if __name__ == "__main__":
execute_statement()