-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathie_databus.py
191 lines (158 loc) · 6.01 KB
/
ie_databus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import json
from dataclasses import dataclass
from threading import Event, Lock
from typing import Dict, Any, Optional
import paho.mqtt.client as mqtt
@dataclass
class Tag:
"""Object for organizing and providing easy access to the data received
from MQTT
Attributes
----------
name : str
The name of the PLC tag
id: str
The ID of the PLC tag
data_type: str
The original data type of the val attribute
qc: int, optional
qx: int, optional
ts: str, optional
The timestamp of when this data was received
val: float, optional
The current value of the PLC tag
"""
name: str
id: str
data_type: str
qc: Optional[int]
qx: Optional[int]
ts: Optional[str]
val: Optional[float]
class IEDatabus:
"""The main object for interfacing with an edge device's IE Databus in Python
Attributes
----------
write_topic : str
Used to change which MQTT topic `write_to_tag` publishes to
_client : mqtt.Client
The underlying MQTT client that connects to the IE Databus
_tags : Dict[str, Tag]
The underlying tag dictionary that is thread protected by a
`property` and a `Lock`
_tag_headers : Dict[str, Dict[str, str]]
A dictionary that contains the header data that is received once
when the MQTT client first connects to the IE Databus; this data
allows for the correct mapping of a tag's name to its ID
_tags_lock : threading.Lock
The Lock object that makes the `tags` attribute thread-safe
_ready_event : threading.Event
Becomes set once the client receives enough data from the IE Databus
to populate the `tags` attribute
"""
def __init__(self, username: str, password: str):
"""
Parameters
----------
username : str
The username required for connecting to the IE Databus' MQTT broker
password : str
The password required for connecting to the IE Databus' MQTT broker
"""
# mqtt client setup
self._client = mqtt.Client()
self._client.username_pw_set(username, password)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.connect('ie-databus')
# setup tag access vars
self._tags: Dict[str, Tag] = {}
self._tag_headers: Dict[str, Dict[str, str]] = {}
self._tags_lock = Lock()
self._ready_event = Event()
# public class vars
self.write_topic = 'ie/d/j/simatic/v1/s7c1/dp/w/USC_PLC'
@property
def tags(self) -> Dict[str, Tag]:
"""A property that provides access to all the PLC tags and their
values in real time
Returns
-------
Dict[str, Tag]
A mapping between the name of the tag and the `Tag` object associated
with it
"""
with self._tags_lock:
value = self._tags.copy()
return value
@tags.setter
def tags(self, value: Dict[str, Tag]):
"""A thread-safe solution to exposing the real-time tag data to this
API
Parameters
----------
value : Dict[str, Tag]
The new tag dictionary to update the old one with
"""
with self._tags_lock:
self._tags = value
def start(self):
"""Start listening for incoming MQTT data on the IE Databus"""
self._client.loop_start()
self._ready_event.wait()
def stop(self):
"""Stop listening for incoming MQTT data on the IE Databus"""
self._client.loop_stop()
def write_to_tag(self, tag: str, data: Any):
"""Writes serializable data to a specific PLC tag
This method blocks until the data has been published
Parameters
---------
tag : str
The name of the PLC tag to write the data to
data : Any
The data to send to the specified PLC tag
"""
payload = {'seq': 1, 'vals': [{'id': self.tags[tag].id, 'val': data}]}
msg_info = self._client.publish(self.write_topic, json.dumps(payload))
msg_info.wait_for_publish()
def _on_connect(self, client, userdata, flags, rc):
"""An override method for connecting to the MQTT broker"""
if rc == 0:
print('Connected successfully')
else:
print('Error: ' + str(rc))
client.subscribe('ie/#')
def _on_message(self, client, userdata, msg):
"""An override method for receiving a message from the MQTT broker"""
if msg.topic == self.write_topic:
return
data = json.loads(msg.payload.decode())
if len(self._tag_headers) == 0:
try:
dpds = data['connections'][0]['dataPoints'][0][
'dataPointDefinitions']
except KeyError:
pass
else:
for data_point in dpds:
self._tag_headers[data_point['id']] = data_point
else:
# create tags
tags = {}
for value_dict in data['vals']:
header = self._tag_headers[value_dict['id']]
tags[header['name']] = Tag(name=header['name'],
id=header['id'],
data_type=header['dataType'],
qc=value_dict.get('qc'),
qx=value_dict.get('qx'),
ts=value_dict.get('ts'),
val=value_dict.get('val'))
self.tags = tags
self._ready_event.set()
if __name__ == '__main__':
databus = IEDatabus('edge', 'edge')
databus.start()
for key, tag in databus.tags.items():
print(f'{key}: {tag.val}')