-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnector.py
79 lines (65 loc) · 3.13 KB
/
connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""Configures a Kafka Connector for Postgres Station data"""
# extract station information from our PostgreSQL database into Kafka
import json
import logging
import requests
logger = logging.getLogger(__name__)
KAFKA_CONNECT_URL = "http://localhost:8083/connectors"
CONNECTOR_NAME = "stations"
def configure_connector():
"""Starts and configures the Kafka Connect connector"""
logging.debug("creating or updating kafka connect connector...")
resp = requests.get(f"{KAFKA_CONNECT_URL}/{CONNECTOR_NAME}")
if resp.status_code == 200:
logging.debug("connector already created skipping recreation")
return
# TODO: Complete the Kafka Connect Config below.
# Directions: Use the JDBC Source Connector to connect to Postgres. Load the `stations` table
# using incrementing mode, with `stop_id` as the incrementing column name.
# Make sure to think about what an appropriate topic prefix would be, and how frequently Kafka
# Connect should run this connector (hint: not very often!)
return
# TODO: Complete the Kafka Connect Config below.
# Directions: Use the JDBC Source Connector to connect to Postgres. Load the `stations` table
# using incrementing mode, with `stop_id` as the incrementing column name.
# Make sure to think about what an appropriate topic prefix would be, and how frequently Kafka
# Connect should run this connector (hint: not very often!)
# See: https://docs.confluent.io/current/connect/references/restapi.html
# See: https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html
logger.info("connector code not completed skipping connector creation")
resp = requests.post(
KAFKA_CONNECT_URL,
headers={"Content-Type": "application/json"},
data=json.dumps({
"name": CONNECTOR_NAME,
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"batch.max.rows": "500",
# TODO
"connection.url": "jdbc:postgresql://localhost:5432/cta",
# TODO
"connection.user": "cta_admin",
# TODO
"connection.password": "chicago",
# TODO whitelist : only works with `stations` table, blacklist : works with all tables in database
"table.whitelist": "stations",
# TODO
"mode": "incrementing",
# TODO
"incrementing.column.name": "stop_id",
# TODO
"topic.prefix": "stations.",
# TODO
"poll.interval.ms": "900000",
}
}),
)
## Ensure a healthy response was given
resp.raise_for_status()
logging.debug("connector created successfully")
if __name__ == "__main__":
configure_connector()