-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuzzerclient.py
135 lines (115 loc) · 4.96 KB
/
buzzerclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import _thread
import asyncio
import sys
import uasyncio
import utime as utime
import urequests as requests
import machine
from logger import Logger
class BuzzerHttpClient:
def __init__(self, delay_to_start, interval, hwWrapper, config, configManager):
self.delay_to_start = delay_to_start
self.hwWrapper = hwWrapper
self.interval = interval
self.config = config
self.command = ""
self.button_pressed = False
self.logger = Logger()
self.caller = "BuzzerHttpClient"
self.initial_call = True
self.configManager = configManager
async def start(self):
self.logger.debug(self.caller, "Start")
_thread.start_new_thread(self.__check_button__, ())
await asyncio.sleep(self.delay_to_start)
return await uasyncio.create_task(self.__loop__())
def __check_button__(self):
reset_pico_count = 0
while True:
if self.hwWrapper.button.value() == 0:
reset_pico_count = reset_pico_count + 1
if reset_pico_count >= 80:
self.logger.warning(self.caller, "Remove config file")
self.configManager.remove_config()
self.hwWrapper.blink_red()
machine.reset()
if self.command == "light_on":
self.button_pressed = True
self.hwWrapper.off()
self.logger.info(self.caller, "The right button was pressed")
utime.sleep(0.10)
else:
reset_pico_count = 0
async def __loop__(self):
while True:
if self.button_pressed:
button_state = "yes"
else:
button_state = "no"
url_encoded_data = "client_name={}&button_pressed={}&light_on={}".format(self.config.name, button_state, "no")
self.logger.debug(self.caller, "Try to reach server [{}] with data [{}]".format(self.config.servername, url_encoded_data))
response = None
try:
response = requests.post("http://{}/client".format(self.config.servername), data=url_encoded_data)
if self.initial_call:
self.hwWrapper.blink_green()
self.logger.debug(self.caller, "Server response: {}".format(response.text))
self.command = response.text
if self.button_pressed:
self.button_pressed = False
if self.command == "light_on":
self.hwWrapper.on()
else:
self.hwWrapper.off()
except Exception as e:
self.logger.error(self.caller, "Exception: {}".format(e))
sys.print_exception(e)
finally:
if response is not None:
response.close()
self.initial_call = False
await asyncio.sleep(self.interval)
class BuzzerServerClient:
def __init__(self, hwWrapper, config, clientHandler, configManager):
self.config = config
self.hwWrapper = hwWrapper
self.light_on = False
self.button_pressed = False
self.clientHandler = clientHandler
self.logger = Logger()
self.caller = "BuzzerServerClient"
self.configManager = configManager
async def start(self, delay_to_start=10, interval=1):
self.logger.debug(self.caller, "Start")
_thread.start_new_thread(self.__check_button__, ())
await asyncio.sleep(delay_to_start)
return await uasyncio.create_task(self.__loop__(interval))
def __check_button__(self):
reset_pico_count = 0
while True:
if self.hwWrapper.button.value() == 0:
reset_pico_count = reset_pico_count + 1
if reset_pico_count >= 80:
self.logger.warning(self.caller, "Remove config file")
self.configManager.remove_config()
self.hwWrapper.blink_red()
machine.reset()
if self.clientHandler.any_client_with_a_command() and self.config.name in self.clientHandler.clients:
clientConfig = self.clientHandler.clients[self.config.name]
if clientConfig.command == "light_on":
self.button_pressed = True
self.hwWrapper.off()
self.logger.info(self.caller, "The right button was pressed")
utime.sleep(0.10)
else:
reset_pico_count = 0
async def __loop__(self, interval):
while True:
command = self.clientHandler.handle_client(self.config.name, self.button_pressed, self.light_on)
if self.button_pressed:
self.button_pressed = False
if command == "light_on":
self.hwWrapper.on()
else:
self.hwWrapper.off()
await asyncio.sleep(interval)