forked from MQTTDevice/cbpi_PIDAutoTunePowerOutput
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
308 lines (258 loc) · 9.75 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
from time import localtime, strftime
import time
import math
import logging
import io
from collections import deque
from collections import namedtuple
from modules import cbpi
from modules.core.controller import KettleController
from modules.core.props import Property
@cbpi.controller
class PIDAutoTunePowerOutput(KettleController):
a_outstep = Property.Number("output step %", True, 100, description="Default: 100. Sets the output when stepping up/down.")
b_maxout = Property.Number("max. output %", True, 100, description="Default: 100. Sets the max power output.")
c_lookback = Property.Number("lookback seconds", True, 30, description="Default: 30. How far back to look for min/max temps.")
def autoOff(self):
cbpi.cache.get("kettle")[self.kettle_id].state = False
super(KettleController, self).stop()
cbpi.emit("UPDATE_KETTLE", cbpi.cache.get("kettle").get(self.kettle_id))
def stop(self):
if self.is_running():
self.notify("AutoTune Interrupted", "AutoTune has been interrupted and was not able to finish", type="danger", timeout=None)
super(KettleController, self).stop()
def run(self):
self.notify("AutoTune In Progress", "Do not turn off Auto mode until AutoTuning is complete", type="success", timeout=None)
sampleTime = 5
wait_time = 5
outstep = float(self.a_outstep)
outmax = float(self.b_maxout)
lookbackSec = float(self.c_lookback)
setpoint = self.get_target_temp()
try:
atune = AutoTunerPower(setpoint, outstep, sampleTime, lookbackSec, 0, outmax)
except Exception as e:
self.notify("AutoTune Error", str(e), type="danger", timeout=None)
atune.log(str(e))
self.autoOff()
atune.log("AutoTune will now begin")
atune.log('heater on')
self.heater_on()
self.sleep(2)
while self.is_running() and not atune.run(self.get_temp()):
heat_percent = int(atune.output)
atune.log('PID setting Power to {}'.format(heat_percent))
self.actor_power(heat_percent)
self.sleep(5)
# heating_time = sampleTime * heat_percent / 100
# wait_time = sampleTime - heating_time
# if heating_time == sampleTime:
# self.heater_on()
# self.sleep(heating_time)
# elif wait_time == sampleTime:
# self.heater_off()
# self.sleep(wait_time)
# else:
# self.heater_on()
# self.sleep(heating_time)
# self.heater_off()
# self.sleep(wait_time)
atune.log('heater off')
self.heater_off()
self.autoOff()
if atune.state == atune.STATE_SUCCEEDED:
atune.log("AutoTune has succeeded")
self.notify("AutoTune Complete", "PID AutoTune was successful", type="success", timeout=None)
for rule in atune.tuningRules:
params = atune.getPIDParameters(rule)
atune.log('rule: {0}'.format(rule))
atune.log('P: {0}'.format(params.Kp))
atune.log('I: {0}'.format(params.Ki))
atune.log('D: {0}'.format(params.Kd))
if rule == "brewing":
self.notify("AutoTune P Value", str(params.Kp), type="info", timeout=None)
self.notify("AutoTune I Value", str(params.Ki), type="info", timeout=None)
self.notify("AutoTune D Value", str(params.Kd), type="info", timeout=None)
elif atune.state == atune.STATE_FAILED:
atune.log("AutoTune has failed")
self.notify("AutoTune Failed", "PID AutoTune has failed", type="danger", timeout=None)
# Based on a fork of Arduino PID AutoTune Library
# See https://github.com/t0mpr1c3/Arduino-PID-AutoTune-Library
class AutoTunerPower(object):
PIDParams = namedtuple('PIDParams', ['Kp', 'Ki', 'Kd'])
PEAK_AMPLITUDE_TOLERANCE = 0.05
STATE_OFF = 'off'
STATE_RELAY_STEP_UP = 'relay step up'
STATE_RELAY_STEP_DOWN = 'relay step down'
STATE_SUCCEEDED = 'succeeded'
STATE_FAILED = 'failed'
_tuning_rules = {
# rule: [Kp_divisor, Ki_divisor, Kd_divisor]
"ziegler-nichols": [34, 40, 160],
"tyreus-luyben": [44, 9, 126],
"ciancone-marlin": [66, 88, 162],
"pessen-integral": [28, 50, 133],
"some-overshoot": [60, 40, 60],
"no-overshoot": [100, 40, 60],
"brewing": [2.5, 3, 3600]
}
def __init__(self, setpoint, outputstep=10, sampleTimeSec=5, lookbackSec=60,
outputMin=float('-inf'), outputMax=float('inf'), noiseband=0.5, getTimeMs=None):
if setpoint is None:
raise ValueError('Kettle setpoint must be specified')
if outputstep < 1:
raise ValueError('Output step % must be greater or equal to 1')
if sampleTimeSec < 1:
raise ValueError('Sample Time Seconds must be greater or equal to 1')
if lookbackSec < sampleTimeSec:
raise ValueError('Lookback Seconds must be greater or equal to Sample Time Seconds (5)')
if outputMin >= outputMax:
raise ValueError('Min Output % must be less than Max Output %')
self._inputs = deque(maxlen=round(lookbackSec / sampleTimeSec))
self._sampleTime = sampleTimeSec * 1000
self._setpoint = setpoint
self._outputstep = outputstep
self._noiseband = noiseband
self._outputMin = outputMin
self._outputMax = outputMax
self._state = AutoTunerPower.STATE_OFF
self._peakTimestamps = deque(maxlen=5)
self._peaks = deque(maxlen=5)
self._output = 0
self._lastRunTimestamp = 0
self._peakType = 0
self._peakCount = 0
self._initialOutput = 100
self._inducedAmplitude = 0
self._Ku = 0
self._Pu = 0
if getTimeMs is None:
self._getTimeMs = self._currentTimeMs
else:
self._getTimeMs = getTimeMs
@property
def state(self):
return self._state
@property
def output(self):
return self._output
@property
def tuningRules(self):
return self._tuning_rules.keys()
def getPIDParameters(self, tuningRule='ziegler-nichols'):
divisors = self._tuning_rules[tuningRule]
kp = self._Ku / divisors[0]
ki = kp / (self._Pu / divisors[1])
kd = kp * (self._Pu / divisors[2])
return AutoTunerPower.PIDParams(kp, ki, kd)
def log(self, text):
filename = "./logs/autotune.log"
formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime())
with open(filename, "a") as file:
file.write("%s,%s\n" % (formatted_time, text))
def run(self, inputValue):
now = self._getTimeMs()
if (self._state == AutoTunerPower.STATE_OFF
or self._state == AutoTunerPower.STATE_SUCCEEDED
or self._state == AutoTunerPower.STATE_FAILED):
self._initTuner(inputValue, now)
elif (now - self._lastRunTimestamp) < self._sampleTime:
return False
self._lastRunTimestamp = now
# check input and change relay state if necessary
if (self._state == AutoTunerPower.STATE_RELAY_STEP_UP
and inputValue > self._setpoint + self._noiseband):
self._state = AutoTunerPower.STATE_RELAY_STEP_DOWN
self.log('switched state: {0}'.format(self._state))
self.log('input: {0}'.format(inputValue))
elif (self._state == AutoTunerPower.STATE_RELAY_STEP_DOWN
and inputValue < self._setpoint - self._noiseband):
self._state = AutoTunerPower.STATE_RELAY_STEP_UP
self.log('switched state: {0}'.format(self._state))
self.log('input: {0}'.format(inputValue))
# set output
if (self._state == AutoTunerPower.STATE_RELAY_STEP_UP):
self._output = self._output + self._outputstep
elif self._state == AutoTunerPower.STATE_RELAY_STEP_DOWN:
self._output = self._output - self._outputstep
# respect output limits
self._output = min(self._output, self._outputMax)
self._output = max(self._output, self._outputMin)
# identify peaks
isMax = True
isMin = True
for val in self._inputs:
isMax = isMax and (inputValue > val)
isMin = isMin and (inputValue < val)
self._inputs.append(inputValue)
# we don't want to trust the maxes or mins until the input array is full
if len(self._inputs) < self._inputs.maxlen:
return False
# increment peak count and record peak time for maxima and minima
inflection = False
# peak types:
# -1: minimum
# +1: maximum
if isMax:
if self._peakType == -1:
inflection = True
self._peakType = 1
elif isMin:
if self._peakType == 1:
inflection = True
self._peakType = -1
# update peak times and values
if inflection:
self._peakCount += 1
self._peaks.append(inputValue)
self._peakTimestamps.append(now)
self.log('found peak: {0}'.format(inputValue))
self.log('peak count: {0}'.format(self._peakCount))
# check for convergence of induced oscillation
# convergence of amplitude assessed on last 4 peaks (1.5 cycles)
self._inducedAmplitude = 0
if inflection and (self._peakCount > 4):
absMax = self._peaks[-2]
absMin = self._peaks[-2]
for i in range(0, len(self._peaks) - 2):
self._inducedAmplitude += abs(self._peaks[i] - self._peaks[i+1])
absMax = max(self._peaks[i], absMax)
absMin = min(self._peaks[i], absMin)
self._inducedAmplitude /= 6.0
# check convergence criterion for amplitude of induced oscillation
amplitudeDev = ((0.5 * (absMax - absMin) - self._inducedAmplitude)
/ self._inducedAmplitude)
self.log('amplitude: {0}'.format(self._inducedAmplitude))
self.log('amplitude deviation: {0}'.format(amplitudeDev))
if amplitudeDev < AutoTunerPower.PEAK_AMPLITUDE_TOLERANCE:
self._state = AutoTunerPower.STATE_SUCCEEDED
# if the autotune has not already converged
# terminate after 10 cycles
if self._peakCount >= 20:
self._output = 0
self._state = AutoTunerPower.STATE_FAILED
return True
if self._state == AutoTunerPower.STATE_SUCCEEDED:
self._output = 0
# calculate ultimate gain
self._Ku = 4.0 * self._outputstep / (self._inducedAmplitude * math.pi)
# calculate ultimate period in seconds
period1 = self._peakTimestamps[3] - self._peakTimestamps[1]
period2 = self._peakTimestamps[4] - self._peakTimestamps[2]
self._Pu = 0.5 * (period1 + period2) / 1000.0
return True
return False
def _currentTimeMs(self):
return time.time() * 1000
def _initTuner(self, inputValue, timestamp):
self._peakType = 0
self._peakCount = 0
self._output = 0
self._initialOutput = 100
self._Ku = 0
self._Pu = 0
self._inputs.clear()
self._peaks.clear()
self._peakTimestamps.clear()
self._peakTimestamps.append(timestamp)
self._state = AutoTunerPower.STATE_RELAY_STEP_UP