forked from do-mpc/do-mpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata.py
457 lines (357 loc) · 17.8 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
#
# This file is part of do-mpc
#
# do-mpc: An environment for the easy, modular and efficient implementation of
# robust nonlinear model predictive control
#
# Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
# TU Dortmund. All rights reserved
#
# do-mpc is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# do-mpc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with do-mpc. If not, see <http://www.gnu.org/licenses/>.
"""
Storage and handling of data.
"""
import numpy as np
import casadi.tools as castools
import pdb
import pickle
import do_mpc
import os
from typing import Union,Tuple,Dict
class Data:
"""**do-mpc** data container. An instance of this class is created for the active **do-mpc** classes,
e.g. :py:class:`do_mpc.simulator.Simulator`, :py:class:`do_mpc.estimator.MHE`.
The class is initialized with an instance of the :py:class:`do_mpc.model.Model` which contains all
information about variables (e.g. states, inputs etc.).
The :py:class:`Data` class has a public API but is mostly used by other **do-mpc** classes, e.g. updated in the ``.make_step`` calls.
Args:
model: model object from the :py:class:`do_mpc.model`
"""
def __init__(self, model:Union[do_mpc.model.Model,do_mpc.model.LinearModel]):
self.dtype = 'default'
assert model.flags['setup'] == True, 'Model was not setup. After the complete model creation call model.setup().'
# As discussed here: https://groups.google.com/forum/#!topic/casadi-users/dqAb4tnA2ik
# struct_SX cannot be unpickled (seems like a bug)
# TODO: Find better workaround.
self.model = model.__dict__.copy()
self.model.pop('_rhs')
self.model.pop('_aux_expression')
self.model.pop('_y_expression')
self.model.pop('_alg')
self.model.pop('sv')
# TODO: n_aux not existing
#self._aux = np.empty((0, model.n_aux))
# Dictionary with possible data_fields in the class and their respective dimension. All data is numpy ndarray.
self.data_fields = {
'_time': 1,
'_x': model.n_x,
'_y': model.n_y,
'_u': model.n_u,
'_z': model.n_z,
'_tvp': model.n_tvp,
'_p': model.n_p,
'_aux': model.n_aux,
}
self.init_storage()
self.meta_data = {}
# Accelerate __getitem__ calls (to retrieve results) by saving indices of previous queries.
self.result_queries = {'ind':[], 'f_ind':[]}
def __getitem__(self, ind:Tuple)->np.ndarray:
"""Query data fields. This method can be used to obtain the stored results in the :py:class:`Data` instance.
The full list of available fields can be inspected with:
::
print(data.data_fields)
The dict also denotes the dimension of each field.
The method allows for power indexing the results for the fields
``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``, ``_aux``, ``_y``
where further indices refer to the configured variables in the :py:class:`do_mpc.model.Model` and :py:class:`do_mpc.model.LinearModel` instance.
**Example:**
::
# Assume the following model was used (excerpt):
model = do_mpc.model.Model('continuous')
model.set_variable('_x', 'Temperature', shape=(5,1)) # Vector
model.set_variable('_p', 'disturbance', shape=(3,3)) # Matrix
model.set_variable('_u', 'heating') # scalar
...
# the model was used (among others) for the MPC controller
mpc = do_mpc.controller.MPC(model)
...
# Query the mpc.data instance:
mpc.data['_x'] # Return all states
mpc.data['_x', 'Temperature'] # Return the 5 temp states
mpc.data['_x', 'Temperature', :2] # Return the first 2 temp. states
mpc.data['_p', 'disturbance', 0, 2] # Matrix allows for further indices
# Other fields can also be queried, e.g.:
mpc.data['_time'] # current time
mpc.data['t_wall_total'] # optimizer runtime
# These do not allow further indices.
Args:
ind: Power index to query the prediction of a specific variable.
Returns:
Returns the queried data field (for all time instances)
"""
# ensure list:
if not isinstance(ind, tuple):
ind = [ind]
# First element is the data_field:
data_field = ind[0]
# Check validity:
keys = self.data_fields.keys()
assert data_field in keys, 'Your queried variable {} is not available. Please choose from {}'.format(data_field, keys)
if len(ind)>1:
# If further indices exist:
if ind in self.result_queries['ind']:
i = self.result_queries['ind'].index(ind)
f_ind = self.result_queries['f_ind'][i]
else:
powerind = ind[1:]
f_ind = self.model[data_field].f[powerind]
self.result_queries['ind'].append(ind)
self.result_queries['f_ind'].append(f_ind)
out = getattr(self, data_field)[:, f_ind]
else:
# If not just return the field:
out = getattr(self, data_field)
return out
def init_storage(self)->None:
"""Create new (empty) arrays for all variables.
The variables of interest are listed in the ``data_fields`` dictionary,
with their respective dimension. This dictionary may be updated.
The :py:class:`do_mpc.controller.MPC` class adds for example optimizer information.
"""
for field_i, dim_i in self.data_fields.items():
setattr(self, field_i, np.empty((0, dim_i)))
def set_meta(self, **kwargs)->None:
"""Set meta data for the current instance of the data object.
"""
for key, value in kwargs.items():
self.meta_data[key] = value
def update(self, **kwargs:Dict[np.ndarray,castools.DM])->None:
"""Update value(s) of the data structure with key word arguments.
These key word arguments must exist in the data fields of the data objective.
See self.data_fields for a complete list of data fields.
Example:
::
_x = np.ones((1, 3))
_u = np.ones((1, 2))
data.update('_x': _x, '_u': _u)
or:
data.update('_x': _x)
data.update('_u': _u)
Alternatively:
data_dict = {
'_x':np.ones((1, 3)),
'_u':np.ones((1, 2))
}
data.update(**data_dict)
Args:
kwargs : Arbitrary number of key word arguments for data fields that should be updated.
Raises:
assertion: Keyword must be in existing data_fields.
"""
for key, value in kwargs.items():
assert key in self.data_fields.keys(), 'Cannot update non existing key {} in data object.'.format(key)
if type(value) == castools.structure3.DMStruct:
value = value.cat
if type(value) == castools.DM:
# Convert to numpy
value = value.full()
elif type(value) in [float, int, bool]:
value = np.array(value)
# Get current results array for the given key:
arr = getattr(self, key)
# Append current value to results array:
updated = np.append(arr, value.reshape(1,-1), axis=0)
# Update results array:
setattr(self, key, updated)
def export(self)->dict:
"""The export method returns a dictionary of the stored data.
Returns:
Dictionary of the currently stored data.
"""
export_dict = {field_name: getattr(self, field_name) for field_name in self.data_fields}
return export_dict
class MPCData(Data):
"""**do-mpc** data container for the :py:class:`do_mpc.controller.MPC` instance.
This method inherits from :py:class:`Data` and extends it to query the MPC predictions.
Warning:
For robust multi-stage MPC, the :py:class:`MPCData` class stores by default only the nominal values of the uncertain parameters.
Args:
model: model from :py:class:`do_mpc.model`
"""
def __init__(self, model:Union[do_mpc.model.Model,do_mpc.model.LinearModel]):
super().__init__(model)
# Accelerate prediction calls by saving indices of previous queries.
self.prediction_queries = {'ind':[], 'f_ind':[]}
def prediction(self, ind:tuple, t_ind:float=-1)->np.ndarray:
"""Query the MPC trajectories.
Use this method to obtain specific MPC trajectories from the data object.
Warnings:
This method requires that the optimal solution is stored in the :py:class:`do_mpc.data.MPCData` instance.
Storing the optimal solution must be activated with :py:func:`do_mpc.controller.MPC.set_param`.
Querying predicted trajectories requires the use of power indices, which is passed as tuple e.g.:
::
data.prediction((var_type, var_name, i), t_ind)
where
* ``var_type`` refers to ``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``, ``_aux``
* ``var_name`` refers to the user-defined names in the :py:class:`do_mpc.model.Model`
* Use ``i`` to index vector valued variables.
The method returns a multidimensional numpy.ndarray. The dimensions refer to:
::
arr = data.prediction(('_x', 'x_1'))
arr.shape
>> (n_size, n_horizon, n_scenario)
with:
* ``n_size`` denoting the number of elements in ``x_1``, where ``n_size = 1`` is a scalar variable.
* ``n_horizon`` is the MPC horizon defined with :py:func:`do_mpc.controller.MPC.set_param`
* ``n_scenario`` refers to the number of uncertain scenarios (for robust MPC).
Additional to the power index tuple, a time index (``t_ind``) can be passed to access the prediction for a certain
time.
Args:
ind: Power index to query the prediction of a specific variable.
t_ind: Time index
Returns:
Predicted trajectories for the queries variable.
"""
assert self.meta_data['store_full_solution'], 'Optimal trajectory is not stored. Please update your MPC settings.'
assert isinstance(ind, tuple), 'Query index must be of type tuple.'
structure_scenario = self.meta_data['structure_scenario']
if self._opt_x_num.shape[0]==0:
_opt_x_num = np.zeros((1,self.opt_x.shape[0]))
_opt_p_num = np.zeros((1,self.opt_p.shape[0]))
_opt_aux_num = np.zeros((1,self.opt_aux.shape[0]))
else:
_opt_x_num = self._opt_x_num
_opt_p_num = self.opt_p_num
_opt_aux_num = self._opt_aux_num
if ind[0] in ['_x', '_z']:
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_x.f[(ind[0], slice(None), lambda v: castools.horzcat(*v),slice(None), -1)+ind[1:]]
f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# sort pred such that each column belongs to one scenario
# - By indexing structure_scenario until f_ind.shape[0] we cover the case of _x and _z at the same time
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:f_ind.shape[0],:].T].T
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_x_num[t_ind,f_ind]
elif ind[0] =='_u':
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_x.f[(ind[0], slice(None), lambda v: castools.horzcat(*v),slice(None))+ind[1:]]
f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# sort pred such that each column belongs to one scenario
if self.meta_data['open_loop']:
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:-1,[0]].T].T
else:
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:-1,:].T].T
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_x_num[t_ind,f_ind]
elif ind[0]=='_tvp':
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_p.f[(ind[0], slice(None))+ind[1:]]
f_ind = np.array(f_ind).reshape(1, -1, 1)
#f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_p_num[t_ind,f_ind]
elif ind[0]=='_aux':
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_aux.f[(ind[0], slice(None), lambda v: castools.horzcat(*v),slice(None))+ind[1:]]
f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# sort pred such that each column belongs to one scenario
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:-1,:].T].T
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_aux_num[t_ind,f_ind]
else:
raise ValueError('Index {} not recognized.'.format(ind[0]))
return out
def save_results(save_list:list,
result_name:str='results',
result_path:str='./results/',
overwrite:bool=False
)->None:
"""Exports the data objects from the **do-mpc** modules in ``save_list`` as a pickled file. Supply any, all or a selection of (as a list):
* :py:class:`do_mpc.controller.MPC`
* :py:class:`do_mpc.simulator.Simulator`
* :py:class:`do_mpc.estimator.Estimator`
These objects can be used in post-processing to create graphics with the :py:class:`do_mpc.graphics_backend`.
Args:
save_list: List of the objects to be stored.
result_name: Name of the result file, defaults to 'result'.
result_path: Result path, defaults to './results/'.
overwrite: Option to overwrite existing results, defaults to False. Index will be appended if file already exists.
Raises:
assertion: save_list must be a list.
assertion: result_name must be a string.
assertion: results_path must be a string.
assertion: overwrite must be boolean.
Exception: save_list contains object which is neither do_mpc simulator, optimizizer nor estimator.
"""
assert isinstance(save_list, list), 'save_list must be a list.'
assert isinstance(result_name, str), 'result_name must be a string.'
assert isinstance(result_path, str), 'results_path must be a string.'
assert isinstance(overwrite, bool), 'overwrite must be boolean.'
if not os.path.exists(result_path):
os.makedirs(result_path)
results = {}
for obj_i in save_list:
if isinstance(obj_i, do_mpc.controller.MPC):
results.update({'mpc': obj_i.data})
elif isinstance(obj_i, do_mpc.simulator.Simulator):
results.update({'simulator': obj_i.data})
elif isinstance(obj_i, (do_mpc.estimator.StateFeedback, do_mpc.estimator.EKF, do_mpc.estimator.MHE)):
results.update({'estimator': obj_i.data})
else:
raise Exception('save_list contains object which is neither do_mpc simulator, optimizizer nor estimator.')
# Dynamically generate new result name if name is already taken in result_path.
if overwrite==False:
ind = 1
ext_result_name = result_name
while os.path.isfile(result_path+ext_result_name+'.pkl'):
ext_result_name = '{ind:03d}_{name}'.format(ind=ind, name=result_name)
ind += 1
result_name = ext_result_name
with open(result_path+result_name+'.pkl', 'wb') as f:
pickle.dump(results, f)
def load_results(file_name:str)->Dict:
""" Simple wrapper to open and unpickle a file.
If used for **do-mpc** results, this will return a dictionary with the stored **do-mpc** modules:
* :py:class:`do_mpc.controller.MPC`
* :py:class:`do_mpc.simulator.Simulator`
* :py:class:`do_mpc.estimator.Estimator`
Args:
file_name : File name (including path) for the file to be opened and unpickled.
Returns:
Returns the results stored in .pkl file.
"""
with open(file_name, 'rb') as f:
results = pickle.load(f)
return results