-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsockets.py
162 lines (124 loc) · 4.74 KB
/
sockets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from json import dumps, loads
from pprint import pprint
from secrets import token_urlsafe as genUUID
import logging
import websocket
from PySide6.QtCore import QObject, QThread, Signal
from messages import *
class _Messenger(QObject):
signal: Signal = Signal(Observation)
running: bool = False
stationID: int
messageTypes = {'rapid_wind': WindMessage, 'evt_precip': RainStartMessage, 'evt_strike': LightningMessage,
'obs_st': TempestMessage, 'obs_air': AirMessage, 'obs_sky': SkyMessage,
'hub_status': HubStatusMessage, 'device_status': DeviceStatusMessage}
def testMessage(self):
sample = {'serial_number': 'ST-00024322', 'type': 'obs_st', 'hub_sn': 'HB-00040538', 'obs': [[1612817710, 0.0, 0.49, 1.21, 117, 3, 1030.06, 4.16, 42.16, 12326, 0.4, 103, 0.0, 0, 0, 0, 2.825, 1]], 'firmware_revision': 134}
# sample = {'serial_number': 'ST-00024322', 'type': 'device_status', 'hub_sn': 'HB-00040538', 'timestamp': 1612834329, 'uptime': 2876646, 'voltage': 2.73, 'firmware_revision': 134, 'rssi': -61, 'hub_rssi': -62, 'sensor_status': 8+16,
# 'debug': 0}
self.push(sample)
def push(self, message: dict):
if message['type'] in self.messageTypes:
logging.debug("MESSAGE RECEIVED")
pprint.pprint(message)
messageType = self.messageTypes[message['type']]
message = messageType(message)
self.signal.emit(message)
else:
logging.debug("INVALID MESSAGE TYPE", message['type'])
def setStation(self, value):
self.stationID = int(value)
class WSMessenger(QThread, _Messenger):
token = '61173523-5a94-4392-bc6b-2e1d375d17fe'
uri = 'wss://ws.weatherflow.com/swd/data?token=' + token
uuid: str
def __init__(self, parent=None):
self.uuid = genUUID(8)
super(WSMessenger, self).__init__(parent)
self.WS = websocket.WebSocketApp(self.uri,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
def genMessage(self, messageType: str) -> dict[str:str]:
return {"type": messageType,
"device_id": self.stationID,
"id": self.uuid}
def run(self):
self.WS.run_forever()
def begin(self):
self.running = True
self.start()
def end(self):
self.running = False
self.WS.close()
def on_open(self, ws):
ws.send(dumps(self.genMessage('listen_start')))
ws.send(dumps(self.genMessage('listen_rapid_start')))
def on_message(self, ws, message):
self.push(loads(message))
def on_error(self, ws, error):
print(error)
def on_close(self, ws):
print("### closed ###")
def terminate(self):
self.WS.close()
class UDPMessenger(_Messenger):
from PySide6.QtNetwork import QUdpSocket
udpSocket: QUdpSocket
def __init__(self, *args, **kwargs):
super(UDPMessenger, self).__init__(*args, **kwargs)
def begin(self):
self.udpSocket = self.QUdpSocket(self)
self.connectSocket()
self.running = True
self.listen()
def end(self):
self.running = False
self.udpSocket.close()
def connectSocket(self):
self.udpSocket.bind(50222)
self.udpSocket.readyRead.connect(self.listen)
logging.info("UDP Connected to port: 50222")
def listen(self):
while self.udpSocket.hasPendingDatagrams():
datagram, host, port = self.udpSocket.readDatagram(self.udpSocket.pendingDatagramSize())
message = loads(str(datagram, encoding='ascii'))
self.push(message)
web = True
if __name__ == '__main__' and web:
ws = WSMessenger()
ws.run()
elif __name__ == '__main__':
from select import select
from socket import AF_INET, INADDR_ANY, inet_aton, IP_ADD_MEMBERSHIP, IPPROTO_IP, IPPROTO_UDP, SO_REUSEADDR, SOCK_DGRAM, socket, SOL_SOCKET
from struct import pack
# create broadcast listener socket
def create_broadcast_listener_socket(broadcast_ip, broadcast_port):
b_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
b_sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
b_sock.bind(('', broadcast_port))
mreq = pack("4sl", inet_aton(broadcast_ip), INADDR_ANY)
b_sock.setsockopt(IPPROTO_IP, IP_ADD_MEMBERSHIP, mreq)
return b_sock
BROADCAST_IP = '239.255.255.250'
BROADCAST_PORT = 50222
sock_list = [create_broadcast_listener_socket(BROADCAST_IP, BROADCAST_PORT)]
try:
while True:
readable, writable, exceptional = select(sock_list, [], sock_list, 0)
for s in readable:
data, addr = s.recvfrom(4096)
data = loads(data)
if data['type'] == 'evt_precip':
m = RainStartMessage(data)
if data['type'] == 'obs_st':
m = TempestMessage(data)
elif data['type'] == 'rapid_wind': # and data['ob'][1] > 0:
m = WindMessage(data)
elif data['type'] not in ['hub_status', 'device_status', 'light_debug']:
print(data)
else:
print(data)
except KeyboardInterrupt:
pass