-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupdatedClosedLoopAlgorithm.py
189 lines (152 loc) · 6.03 KB
/
updatedClosedLoopAlgorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from itertools import count
from tracemalloc import start
from gdx import gdx
from datetime import datetime
gdx = gdx.gdx()
import os
completion = 0
sleep = 0
def sensorSetUp():
global list_of_measurements
global time_between_measurements
global startingPattern
global pattern
global goalPattern
global actions
actions = []
startingPattern = [2000, 2000] # starting breathing pattern, time in (ms)
pattern = [2000, 2000] # time in (ms)
goalPattern = [6000, 6000] # maximum breathing pattern, time in (ms)
list_of_measurements = [0]
time_between_measurements = 300 # sampling time in (ms)
#gdx.open_usb() # opens sensor via USB
gdx.open_ble("GDX-RB 0K4005K3") # opens sensor via Bluetooth
gdx.select_sensors([1,2]) # selects sensors to be used
gdx.start(time_between_measurements) # starts data collection, takes sampling time as argument
def sensor():
now = datetime.now()
raw_measurements = gdx.read()
force = round(raw_measurements[0], 2)
respiration_rate = round(raw_measurements[1], 2)
list_of_measurements.append(force)
resetPattern()
decreasePattern()
slope = (list_of_measurements[-1] - list_of_measurements[-2])/(time_between_measurements/1000)
slope_string = str(slope)
# Each action is assigned a number which is later used to validate its completion
# the higher the reliability of the classification the higher the assigned action value
# since higher action values are more disruptive in the validation process
if slope > 0.5:
date = now.strftime("%m/%d/%Y,%H:%M:%S") # date and time for data storage
action = 1
breathAction = "INHALING"
elif slope < -0.5:
date = now.strftime("%m/%d/%Y,%H:%M:%S")
action = -1
breathAction = "EXHALING"
else:
date = now.strftime("%m/%d/%Y,%H:%M:%S")
action = 0
breathAction = "HOLDING"
actions.append(action)
# 1 if the pattern was completed, 0 if it was not completed
completionResult = completionCheck()
comma = ","
# results to be stored in the csv file
message = [date, str(completionResult), breathAction, str(force), str(respiration_rate)]
results = comma.join(message)
return completionResult, results
# checks if current breathing pattern was completed or not
def completionCheck():
global completion
global sleep
# calculates number of measurements in each time interval
times = []
for time in pattern:
# -100 adds a buffer so that patterns can be completed by following vibration
# before you would have to extend inhales/exhales to complete pattern
times.append((time-150)/time_between_measurements)
# loops through each time interval and the number of measurements recorded in the current interval
for i in range(len(times)):
validation = 0
for j in range(1, int(times[i])+1):
if i == 0: #INHALE
try:
#starts from end of list to read most recent measurements
# skips past other intervals,
# e.g., If the pattern is (Inhale -> Hold -> Exhale)<- Starts reading from here
# For Inhale, the Exhale and Hold measurements will be skipped
# so that only Inhale measurements are read
validation += actions[-j-(int(times[1]))]
except:
return None
elif i == 1: #EXHALE
try:
validation += actions[-j]
except:
return None
#print(str(validation) + "<=" + str(times[0]+3))
# checks if each section of breathing pattern was completed
# by comparing validation sum to expected sum (with a buffer)
if i == 0:
if validation>=times[0]-round(times[0]*0.2):
inhaleCompletion = 1
else:
inhaleCompletion = 0
if i == 1:
if validation<=times[1]*-1+round(times[1]*0.2):
exhaleCompletion = 1
else:
exhaleCompletion = 0
# sleeps algorithm for 5 seconds to prevent pattern from changing several times in a row
if completion == 1:
sleep = 5000/time_between_measurements
if sleep > 0:
sleep -= 1
completion = 0
return None
# completion = 1 if the pattern was completed
# completion = 0 if the pattern was failed
else:
totalCompletion = inhaleCompletion + exhaleCompletion
if totalCompletion == 2:
completion = 1
increasePattern()
else:
completion = 0
#print(inhaleCompletion, holdCompletion, exhaleCompletion, totalCompletion)
printing = str(inhaleCompletion) + str(exhaleCompletion) + str(totalCompletion)
return completion
# changes the breathing pattern duration once the current pattern is completed
def increasePattern():
global pattern, startingPattern
if (pattern[0] < goalPattern[0]):
for i in range(len(pattern)):
pattern[i] += 250
print(pattern)
# decreases breathing pattern
def decreasePattern():
global pattern, startingPattern
file_path = "decrease.txt"
# check if size of file is 0
if os.stat(file_path).st_size != 0:
startingPattern = [2000, 2000] # time in (ms)
f = open(file_path, 'r+')
f.truncate(0) # clear file
if (pattern[0] > startingPattern[0]):
for i in range(len(pattern)):
pattern[i] -= 250
print(f"PATTERN DECREASED TO: {pattern}")
# resets breathing pattern
def resetPattern():
global pattern, startingPattern
file_path = "reset.txt"
# check if size of file is 0
if os.stat(file_path).st_size != 0:
startingPattern = [2000, 2000] # time in (ms)
f = open(file_path, 'r+')
f.truncate(0) # clear file
pattern = startingPattern
print(f"PATTERN RESET TO: {pattern}")
#gdx.stop() #stops sensor
#gdx.close() #closes sensor