-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt2sql.py
140 lines (127 loc) · 5.77 KB
/
mqtt2sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""
Raspberry Pi Sensors (DHT22 / AM2302)
Subscribe MQTT, get Sensors measurement data and save to SQL Database!
https://github.com/Tob1as/rpi-sensors
"""
import sys
import os
import logging
import time
import mariadb
import json
import paho.mqtt.client as mqtt
import ssl
# Variables
LOGLEVEL = str(os.environ.get('LOGLEVEL', 'INFO').upper())
DB_HOST = str(os.environ.get('DB_HOST', 'localhost'))
DB_PORT = int(os.environ.get('DB_PORT', 3306))
DB_DATABASE = str(os.environ.get('DB_DATABASE', ''))
DB_USER = str(os.environ.get('DB_USER', ''))
DB_PASSWORD = str(os.environ.get('DB_PASSWORD', ''))
MQTT_HOST = str(os.environ.get('MQTT_HOST', 'vernemq'))
MQTT_PORT = int(os.environ.get('MQTT_PORT', 1883))
#MQTT_SSL_ENABLED = bool(os.environ.get('MQTT_SSL_ENABLED', False))
MQTT_SSL_ENABLED = bool(eval(str(os.environ.get('MQTT_SSL_ENABLED', 'False')).title())) # convert to boolean, because env are only str!
MQTT_USER = str(os.environ.get('MQTT_USER', ''))
MQTT_PASSWORD = str(os.environ.get('MQTT_PASSWORD', ''))
MQTT_CLIENT_ID = str(os.environ.get('MQTT_CLIENTNAME', f'sensor-mqtt2sql'))
MQTT_TOPIC = str(os.environ.get('MQTT_TOPIC', 'sensors'))
FILE_SAVE_ENABLED = bool(eval(str(os.environ.get('FILE_SAVE_ENABLED', 'False')).title())) # convert to boolean, because env are only str!
FILE_BASE_PATH= str(os.environ.get('FILE_BASE_PATH', './'))
# Logging
logging.root.handlers = []
logging.basicConfig(format='%(asctime)s %(name)-12s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', encoding='utf-8', stream=sys.stdout, level=LOGLEVEL)
logger = logging.getLogger(__name__)
# current time
def currenttime():
t = time.localtime()
current_time = time.strftime("%Y-%m-%d %H:%M:%S", t)
return current_time
def save_to_sql(sensor_id, temperature_f, temperature_c, humidity, date_time):
# Connect to MariaDB Platform
try:
conn = mariadb.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_DATABASE,
user=DB_USER,
password=DB_PASSWORD
)
except mariadb.Error as e:
logger.error(f"Error connecting to MariaDB Platform: {e}")
#sys.exit(1)
# Get Cursor
cur = conn.cursor()
#insert information
try:
#cur.execute("INSERT INTO measurements (sensor_id,temperature_f,temperature,humidity) VALUES (?,?,?,?)", (sensor_id,temperature_f,temperature_c,humidity))
cur.execute("INSERT INTO measurements (sensor_id,temperature_f,temperature,humidity,date_time) VALUES (?,?,?,?,?)", (sensor_id,temperature_f,temperature_c,humidity,date_time))
except mariadb.Error as e:
logger.error(f"Error: {e}")
conn.commit()
logger.debug(f"Database - Last Inserted ID: {cur.lastrowid}")
conn.close()
def save2file(mqtt_topic, mqtt_msq):
mqtt_topic_f = mqtt_topic.replace("/", "+")
current_time=currenttime().replace(":", "-").replace(" ", "+")
filename=FILE_BASE_PATH+current_time+"++"+mqtt_topic_f+".json"
file = open(filename, 'w', encoding='utf-8')
file.write(mqtt_msq)
file.close()
logger.debug("Save to File: %s", filename)
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.debug("Connected to MQTT Broker!")
else:
logger.debug("Connect to MQTT Broker failed! result code: %d", rc)
#def on_message(client, userdata, message):
# logger.debug("Message Recieved: "+message.payload.decode())
client = mqtt.Client(MQTT_CLIENT_ID)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
client.on_connect = on_connect
if MQTT_SSL_ENABLED == True:
client.tls_set(ca_certs=None, cert_reqs=ssl.CERT_NONE) # for valid check: (ca_certs=/etc/ssl/certs/ca-certificates.crt, cert_reqs=ssl.CERT_REQUIRED)
client.tls_insecure_set(True) # for valid check: False
client.connect(MQTT_HOST, MQTT_PORT)
return client
def on_message(client, userdata, message):
msg = message.payload.decode()
logger.debug("Received message '" + str(msg) + "' on topic '" + message.topic + "' with QoS " + str(message.qos))
msg_values = json.loads(msg)
sensor_id = msg_values['sensor_id']
temperature_f = msg_values['temperature_f']
temperature_c = msg_values['temperature_c']
humidity = msg_values['humidity']
date_time = msg_values['date']
logger.info("Data: SensorID=%s ; Temperature %sF / %s°C ; Humidity: %s%% ; Datetime='%s'" % (sensor_id, temperature_f, temperature_c, humidity, date_time))
# Save the values to the Database
if (DB_DATABASE !='' and DB_USER !='' and DB_PASSWORD !=''):
save_to_sql(sensor_id, temperature_f, temperature_c, humidity, date_time)
else:
logger.debug("Data not save in Database!")
if FILE_SAVE_ENABLED == True:
save2file(message.topic, msg)
else:
logger.debug("Data not save to File!")
if __name__ == "__main__":
logger.info("Sensor Service started!")
logger.info("Database - set Host to \"%s\", Port to \"%s\", DB to \"%s\" and User to \"%s\"" % (DB_HOST, DB_PORT, DB_DATABASE, DB_USER))
logger.info("MQTT - set Host to \"%s\", Port to \"%s\", Topic to \"%s\" and User to \"%s\" with Client-ID=%s and SSL=%s" % (MQTT_HOST, MQTT_PORT, MQTT_TOPIC, MQTT_USER, MQTT_CLIENT_ID, MQTT_SSL_ENABLED))
logger.info("File - set SAVE_ENABLED to \"%s\" and BASE_PATH to \"%s\"" % (FILE_SAVE_ENABLED,FILE_BASE_PATH))
try:
try:
client = connect_mqtt()
client.on_message = on_message
client.subscribe(MQTT_TOPIC, qos=0)
client.loop_forever()
except Exception as error:
logger.error('%s' % error)
except KeyboardInterrupt:
logger.info('KeyboardInterrupt')
try:
sys.exit(0)
except SystemExit:
os._exit(0)