-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsource.py
150 lines (126 loc) · 3.46 KB
/
source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import pvrhino
import pvporcupine
from pvrhino import Rhino, Inference
from pvrecorder import PvRecorder
from dotenv import load_dotenv
import os
import time
import RPi.GPIO as GPIO
load_dotenv()
ACCESS_KEY = os.getenv("ACCESS_KEY")
CONTEXT_FILE_PATH_RHINO = os.getenv("CONTEXT_FILE_PATH_RHINO")
CONTEXT_FILE_PATH_PORCUPINE = os.getenv("CONTEXT_FILE_PATH_PORCUPINE")
availableLocations = {
'bathroom': {
'label': 'Red',
'pin': 11,
},
'closet': {
'label': 'Blue',
'pin': 10,
},
'pantry': {
'label': 'Green',
'pin': 9,
},
}
def ledSetUp():
# declare pin standard
GPIO.setmode(GPIO.BCM)
# set pin mode for GPIO pins to OUTPUT
for led in availableLocations.values():
pin = led['pin']
print(f"[!] led: {led['label']}, pin: {pin}")
GPIO.setup(pin, GPIO.OUT)
def switchAllLights(state):
print(f"[!] turning all lights {state}")
value = GPIO.HIGH if state == "on" else GPIO.LOW
for led in availableLocations.values():
pin = led['pin']
GPIO.output(pin, value)
def switchLightsInLocation(location, state):
print(f"[!] turning {state} the lights in the {location}")
if location in availableLocations:
GPIO.output(availableLocations[location]['pin'], GPIO.HIGH if state == "on" else GPIO.LOW)
class Watchdog():
def __init__(self, timeout=10):
self.timeout = timeout
self._t = None
def _expire(self):
print("[!] Watchdog expired")
raise TimeoutError
def check(self):
if time.time() - self._t >= self.timeout:
self._expire()
def start(self):
if self._t is None:
self._t = time.time()
def stop(self):
if self._t is not None:
self._t = None
def refresh(self):
if self._t is not None:
self.stop()
self.start()
def wakeUp(porcupine, audio_frame):
keyword_index = porcupine.process(audio_frame)
if keyword_index == 0:
# detected 'Jarvis'
return True
return False
def executeCommands(rhino, recorder):
watch_dog = Watchdog()
watch_dog.start()
try:
while True:
audio_frame = recorder.read()
is_finalized = rhino.process(audio_frame)
if is_finalized:
inference = rhino.get_inference()
if not inference.is_understood:
print("Sorry, didn't understand the command")
continue
else:
watch_dog.refresh()
if "location" in inference.slots and "state" in inference.slots:
switchLightsInLocation(inference.slots['location'], inference.slots['state'])
elif "state" in inference.slots:
switchAllLights(inference.slots['state'])
watch_dog.check()
except TimeoutError:
print("[!] stopped listening ...")
print("[!] Say wakeup word to start listening again")
return
def main():
# Initialize LEDs
ledSetUp()
# Initialize AI models
porcupine = pvporcupine.create(
access_key=ACCESS_KEY,
keyword_paths=[CONTEXT_FILE_PATH_PORCUPINE]
)
rhino = pvrhino.create(
access_key=ACCESS_KEY,
context_path=CONTEXT_FILE_PATH_RHINO
)
# Initialize and start recorder
recorder = PvRecorder(frame_length=rhino.frame_length)
recorder.get_available_devices()
print(f"[!] selected device to record: {recorder.selected_device}")
recorder.start()
try:
while True:
audio_frame = recorder.read()
if wakeUp(porcupine, audio_frame):
print("[!] Listening for commands ... ")
executeCommands(rhino, recorder)
except KeyboardInterrupt:
print('[!] Ending process ...')
finally:
# cleaning process
rhino.delete()
porcupine.delete()
recorder.delete()
GPIO.cleanup()
if __name__ == "__main__":
main()