-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_client.py
62 lines (48 loc) · 2.37 KB
/
mqtt_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import paho.mqtt.client as mqtt
import logging
import os
from core.logging_utils import configure_logging
configure_logging()
class MqttClientWrapper:
def __init__(self, client_id_suffix=None, server=None, port=1883, username=None, password=None):
self.client_id = f"mqtt-client-{os.environ.get('DEVICE_ID', 'default')}-{client_id_suffix}" if client_id_suffix else f"mqtt-client-{os.environ.get('DEVICE_ID', 'default')}"
self.server = server or os.environ.get('MQTT_SERVER')
self.port = port
self.username = username or os.environ.get('MQTT_USERNAME')
self.password = password or os.environ.get('MQTT_PASSWORD')
logging.info(f"Initializing MQTT client with client_id: {self.client_id}")
self.client = mqtt.Client(client_id=self.client_id)
self.setup_client()
def setup_client(self):
logging.debug(f"Setting up MQTT client")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
if self.username and self.password:
self.client.username_pw_set(self.username, self.password)
self.connect()
def connect(self):
try:
self.client.connect(self.server, self.port, 60)
except Exception as e:
logging.error(f"Error connecting to MQTT server: {e}")
raise
def on_connect(self, client, userdata, flags, rc):
logging.info(f"Connected to MQTT server with result code {rc}")
# You can subscribe to topics here if there are any generic subscriptions
def on_disconnect(self, client, userdata, rc):
logging.info(f"Disconnected from MQTT server with result code {rc}")
def on_message(self, client, userdata, message):
# Default message handler, can be overridden by setting client.on_message to another function
logging.info(f"Message received: {message.payload} on topic {message.topic}")
def subscribe(self, topic):
self.client.subscribe(topic)
logging.info(f"Subscribed to topic: {topic}")
def publish(self, topic, payload, qos=0, retain=False):
self.client.publish(topic, payload, qos, retain)
def loop_forever(self):
self.client.loop_forever()
def loop_stop(self):
self.client.loop_stop()
def disconnect(self):
self.client.disconnect()