-
Notifications
You must be signed in to change notification settings - Fork 7
/
custom_batchrunner.py
228 lines (200 loc) · 7.82 KB
/
custom_batchrunner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Copy of mesa's batchrunner, slightly modified for our purposes (e.g. save results to csv file as they come)
"""
from functools import partial
import itertools
from tqdm import tqdm
from copy import copy
from multiprocessing import Pool
from typing import (
Any,
Counter,
Dict,
Iterable,
List,
Mapping,
Tuple,
Type,
Union,
)
import pathlib
from mesa.model import Model
import logic.helper as hlp
def custom_batch_run(
model_cls,
parameters,
batch_run_id,
number_processes=1,
iterations=1,
data_collection_period=-1,
max_steps=1000,
display_progress= True,
initial_seed=0
):
"""Batch run a mesa model with a set of parameter values.
Parameters
----------
model_cls : Type[Model]
The model class to batch-run
parameters : Mapping[str, Union[Any, Iterable[Any]]],
Dictionary with model parameters over which to run the model. You can either pass single values or iterables.
number_processes : int, optional
Number of processes used, by default 1. Set this to None if you want to use all CPUs.
iterations : int, optional
Number of iterations for each parameter combination, by default 1
data_collection_period : int, optional
Number of steps after which data gets collected, by default -1 (end of episode)
max_steps : int, optional
Maximum number of model steps after which the model halts, by default 1000
display_progress : bool, optional
Display batch run process, by default True
Returns
-------
List[Dict[str, Any]]
[description]
"""
seq_id = hlp.read_seq_id() + 1
hlp.write_seq_id(seq_id)
batch_run_id = str(seq_id) + '-' + batch_run_id
path = pathlib.Path.cwd() / "output" / batch_run_id
pathlib.Path(path).mkdir(parents=True)
parameters.update({'parent_dir': path})
kwargs_list, fixed_params = _make_model_kwargs(parameters)
final_kwargs_list = [
copy(kwarg_dict) | {'seed': initial_seed + i} # add different seed for each iteration with same param combination
for kwarg_dict in kwargs_list
for i in range(iterations)
]
for i, params in enumerate(final_kwargs_list):
seq_id = i + 1
execution_id = '-'.join([str(key) + '-' + str(value) for key, value in params.items() if key not in fixed_params])
params.update({'seq_id': seq_id, 'execution_id': execution_id})
process_func = partial(
_model_run_func,
model_cls,
max_steps=max_steps,
data_collection_period=data_collection_period,
batch_run_id=batch_run_id,
fixed_params=fixed_params
)
total_iterations = len(final_kwargs_list)
run_counter = itertools.count()
results: List[Dict[str, Any]] = []
with tqdm(total_iterations, disable=not display_progress) as pbar:
iteration_counter: Counter[Tuple[Any, ...]] = Counter()
def _fn(paramValues, rawdata):
iteration_counter[paramValues] += 1
iteration = iteration_counter[paramValues]
run_id = next(run_counter)
data = []
for run_data in rawdata:
out = {"RunId": run_id, "iteration": iteration - 1}
out.update(run_data)
data.append(out)
results.extend(data)
pbar.update()
if number_processes == 1:
for kwargs in final_kwargs_list:
paramValues, rawdata = process_func(kwargs)
_fn(paramValues, rawdata)
else:
with Pool(number_processes) as p:
for paramValues, rawdata in p.imap_unordered(process_func, final_kwargs_list):
_fn(paramValues, rawdata)
return results, path
def _make_model_kwargs(
parameters: Mapping[str, Union[Any, Iterable[Any]]]
) -> Tuple[List[Dict[str, Any]], List[str]]:
"""Create model kwargs from parameters dictionary.
Parameters
----------
parameters : Mapping[str, Union[Any, Iterable[Any]]]
Single or multiple values for each model parameter name
Returns
-------
Tuple[List[Dict[str, Any]], List[str]]
A list of all kwargs combinations and a list of all variable parameter names
"""
parameter_list = []
fixed_params = []
for param, values in parameters.items():
if isinstance(values, str):
# The values is a single string, so we shouldn't iterate over it.
all_values = [(param, values)]
fixed_params.append(param)
else:
try:
all_values = [(param, value) for value in values]
except TypeError:
all_values = [(param, values)]
fixed_params.append(param)
parameter_list.append(all_values)
all_kwargs = itertools.product(*parameter_list)
kwargs_list = [dict(kwargs) for kwargs in all_kwargs]
return kwargs_list, fixed_params
def _model_run_func(
model_cls: Type[Model],
kwargs: Dict[str, Any],
max_steps: int,
data_collection_period: int,
batch_run_id: str,
fixed_params: List[str]
) -> Tuple[Tuple[Any, ...], List[Dict[str, Any]]]:
"""Run a single model run and collect model and agent data.
Parameters
----------
model_cls : Type[Model]
The model class to batch-run
kwargs : Dict[str, Any]
model kwargs used for this run
max_steps : int
Maximum number of model steps after which the model halts, by default 1000
data_collection_period : int
Number of steps after which data gets collected
Returns
-------
Tuple[Tuple[Any, ...], List[Dict[str, Any]]]
Return model_data, agent_data from the reporters
"""
model = model_cls(**kwargs)
while model.running and model.schedule.steps <= max_steps:
model.step()
data = []
steps = list(range(0, model.schedule.steps, data_collection_period))
if not steps or steps[-1] != model.schedule.steps - 1:
steps.append(model.schedule.steps - 1)
for step in steps:
model_data, all_agents_data = _collect_data(model, step)
# If there are agent_reporters, then create an entry for each agent
if all_agents_data:
stepdata = [
{**{"Step": step}, **kwargs, **model_data, **agent_data}
for agent_data in all_agents_data
]
# If there is only model data, then create a single entry for the step
else:
stepdata = [{**{"Step": step}, **kwargs, **model_data}]
if step == model.schedule.steps - 1:
# write end-of-run results to file
header = [key for key in stepdata[0].keys() if key not in fixed_params]
row = [value for key, value in stepdata[0].items() if key not in fixed_params]
filename = 'aggregate-results-' + batch_run_id + '.csv'
path = pathlib.Path.cwd() / "output" / batch_run_id
filepath = path / filename
hlp.write_to_csv(filepath, header, row)
data.extend(stepdata)
return tuple(kwargs.values()), data
def _collect_data(
model: Model,
step: int,
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""Collect model and agent data from a model using mesas datacollector."""
dc = model.datacollector
model_data = {param: values[step] for param, values in dc.model_vars.items()}
all_agents_data = []
raw_agent_data = dc._agent_records.get(step, [])
for data in raw_agent_data:
agent_dict = {"AgentID": data[1]}
agent_dict.update(zip(dc.agent_reporters, data[2:]))
all_agents_data.append(agent_dict)
return model_data, all_agents_data