-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdriver.py
608 lines (498 loc) · 20.5 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
from smartsim import Experiment
from smartsim.database import Orchestrator
from smartredis import Client
from os import environ, getcwd
from os.path import join
import math
exp_name = "openfoam_ml"
exp = None
def create_of_model(nodes, ppn,
exe, exe_args, model_name,
exec_dir, env_vars):
"""Construct an SmartSim Model for the OpenFOAM
executable.
:param nodes: The number of nodes to use
for the model run settings
:type nodes: int
:param ppn: The processes per node for the model
run settings
:type ppn: int
:param exe: The location of the executable (bsolute path)
:type exe: str
:param exe_args: The executable arguments
:type exe_args: dict with key for the argument
and value for the argument value
:param model_name: The name of the model
:type model_name: str
:param exec_dir: The directory to associated with
model execution. Can be None.
:type exec_dir: str
:param env_vars: A dictionary of environment variables
:type env_vars: dict
:return: A SmartSim model
:rtype: SmartSim Model
"""
run_settings = exp.create_run_settings(exe,
run_command="auto",
exe_args=exe_args,
env_vars=env_vars)
# Slurm requires the number of nodes but aprun does not support this
if(exp._launcher == "slurm"):
run_settings.set_nodes(nodes)
run_settings.set_tasks(nodes*ppn)
run_settings.set_tasks_per_node(ppn)
open_foam = exp.create_model(model_name,
run_settings=run_settings,
path=exec_dir)
return open_foam
def get_openfoam_env_vars():
"""Return the environment variables for OpenFOAM
This function returns the environment variables
in the current environment that are related to
OpenFOAM
:return: dictionary of environment variables
:rtype: dict with key str and value str
"""
env_vars = {}
for key, val in environ.items():
if len(key)>2 and "WM_" in key[0:3]:
env_vars[key] = val
if len(key)>4 and "FOAM_" in key[0:5]:
env_vars[key] = val
if key == "MPI_BUFFER_SIZE":
env_vars[key] = val
if key == "MPI_ARCH_INC":
env_vars[key] = val
return env_vars
def start_database(port, nodes, cpus, tpq, interface):
"""Create and start the Redis database
:param port: port number of database
:type port: int
:param nodes: number of database nodes
:type nodes: int
:param cpus: number of cpus per node
:type cpus: int
:param tpq: number of threads per queue
:type tpq: int
:param interface: the network interface to bind to
:type interface: str
:return: orchestrator instance
:rtype: Orchestrator
"""
db = Orchestrator(launcher='auto',
db_nodes=nodes,
batch=False,
interface=interface)
db.set_cpus(cpus)
exp.generate(db)
exp.start(db)
return db
def run_decomposition(foam_env_vars, exec_dir,
model_prefix="", block=False):
"""Run the OpenFOAM decomposition utility in a
specified directory.
:param foam_env_vars: Environment variables
needed to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param exec_dir: The directory where decomp should be run
:type exec_dir: str
:param model_prefix: A prefix to add to the model
name
:type model_prefix: str
:param block: Boolean indicating if the decomp
should block on execution
:type block: bool
"""
# Create a SmartSim model for decomposition utility
executable = foam_env_vars['FOAM_APPBIN'] + "/decomposePar"
name = model_prefix + "decomp"
nodes = 1
ppn = 1
exe_args = None
decomp_model = create_of_model(nodes, ppn, executable,
exe_args, name, exec_dir, foam_env_vars)
# Run the openFOAM decomposition utility
exp.start(decomp_model, block=block)
def run_reconstruction(foam_env_vars, exec_dir,
model_prefix="", block=False):
"""Run the openFOAM parallel reconstruction utility
:param foam_env_vars: Environment variables needed
to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param exec_dir: The directory where decomp should be run
:type exec_dir: str
:param model_prefix: A prefix to add to the model
name
:type model_prefix: str
:param block: Boolean indicating if the reconstruction
should block on execution
:type block: bool
"""
# Create the reconstruction model
executable = foam_env_vars['FOAM_APPBIN'] + "/reconstructPar"
name = model_prefix + "recon"
nodes = 1
ppn = 1
exe_args = None
openfoam_recon = create_of_model(nodes, ppn, executable,
exe_args, name, exec_dir, foam_env_vars)
# Start the reconstrucion utility
exp.start(openfoam_recon, block=block)
def generate_data_gen_files(node_per_case, tasks_per_node,
input_dir, name):
"""Generate the OpenFOAM cases used for training data
:param node_per_case: The number of nodes to
use per data generation case
:type node_per_case: int
:param tasks_per_node: The number of tasks
per compute node
:type tasks_per_node: int
:param input_dir: The directory where the cases
(i.e. Case1, Case2, Case3..)
are located
:type input_dir: str
:param name: The name of the data generation model
:type name: str
"""
# Calculate the closest near-square values of n_proc
# In the worst case, 1 x n_proc will be used for
# decomposition
n_proc = tasks_per_node * node_per_case
big = math.ceil(math.sqrt(n_proc))
small = math.floor(n_proc/big)
while small * big != float(n_proc):
big -= 1
small = math.floor(n_proc/big)
# Save the processor counts as a single string
# param for now since multiple tags per line
# is currently not supported
params = {
"proc_x_y":str(big) + " " + str(small),
"n_procs":str(n_proc)}
# Create a SmartSim model that will generate and
# configure all files
model = exp.create_model(name, None, params=params)
# Attach the entire data generation directory
model.attach_generator_files(to_configure=input_dir)
# Generate the experiment file directory and replace
# config parameters
exp.generate(model, tag="@", overwrite=True)
def run_data_gen_decomposition(foam_env_vars, dir):
"""Run the decomposition step for the training data
cases
:param foam_env_vars: Environment variables needed
to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param dir: The directory where the generated
cases are located (i.e. Case1, Case2, etc..)
:type dir: str
"""
case_dirs = ["/".join([dir, f"Case{i}"]) for i in range(1,7)]
for i, d in enumerate(case_dirs):
model_prefix = f"case{i+1}_"
run_decomposition(foam_env_vars, d,
model_prefix=model_prefix, block=False)
exp.poll()
def run_data_generation(foam_env_vars, node_per_case,
tasks_per_node, gen_dir):
"""Run the openFOAM data generation simulations
:param foam_env_vars: Environment variables needed
to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param node_per_case: The number of nodes to
use per data generation case
:type node_per_case: int
:param tasks_per_node: The number of tasks per compute node
:type tasks_per_node: int
:param gen_dir: The directory where the generated
cases are located (i.e. Case1, Case2, etc..)
:type gen_dir: str
"""
# Store the executable as a variable
executable = foam_env_vars['FOAM_APPBIN'] + "/simpleFoam"
# Set exec args to "-parallel" if needed
exe_args = None
if tasks_per_node>1:
exe_args = "-parallel"
for i in range(1,7):
# Create the simulation model
exec_path = "/".join([gen_dir,f"Case{i}"])
model_name = f"data_gen{i}"
model = create_of_model(node_per_case, tasks_per_node,
executable, exe_args, model_name, exec_path,
foam_env_vars)
# Start the simulation model
exp.start(model, block=False)
exp.poll()
def run_data_gen_reconstruction(foam_env_vars, gen_dir):
"""Run the data generation reconstruction step
:param foam_env_vars: Environment variables needed
to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param gen_dir: The directory where data generation cases
reside (e.g. Case1, Case2, ...)
:type gen_dir: str
"""
case_dirs = ["/".join([gen_dir, f"Case{i}"]) for i in range(1,7)]
for i, dir in enumerate(case_dirs):
model_prefix = f"case{i+1}_"
run_reconstruction(foam_env_vars, dir,
model_prefix=model_prefix, block=False)
exp.poll()
def run_data_gen_dataset_construction(gen_dir):
"""Run the script to aggregate training data
:param gen_dir: The directory where the generated cases
are located (i.e. Case1, Case2, etc..)
:type gen_dir: str
"""
# Store the executable and exec args
executable = "python"
exe_args = "training_data_maker.py"
# Create the data aggregation model
model_name = "dataset_construction"
nodes = 1
tasks_per_node = 1
script_model = create_of_model(nodes, tasks_per_node,
executable, exe_args, model_name, gen_dir,
foam_env_vars)
# Start the data aggregation script
exp.start(script_model)
def run_training(training_dir, training_node_count,
training_tasks_per_node, gen_dir):
"""Run the TensorFlow training script
:param training_dir: The directory where the training
script and training data are located
:type training_dir: str
:param training_node_count: The number of compute nodes
to use for the simulation
:type training_node_count: int
:param training_tasks_per_node: The number of tasks
per compute node
:type training_tasks_per_node: int
:param gen_dir: The directory where data generation cases
reside
:type gen_dir: str
"""
# Create a SmartSim model for the training model
model_name = "training"
executable = "python"
exe_args = "ML_Model.py"
modle_name = "training"
nodes = training_node_count
tasks_per_node = training_tasks_per_node
training_model = create_of_model(nodes, tasks_per_node,
executable, exe_args, model_name, None,
foam_env_vars)
# Set the model to copy input files
files_to_copy = []
files_to_copy.append(training_dir)
files_to_copy.append("/".join([gen_dir,"Total_dataset.npy"]))
files_to_copy.append("/".join([gen_dir,"means"]))
training_model.attach_generator_files(to_copy=files_to_copy)
# Generate the experiment directory
exp.generate(training_model, overwrite=True)
# Run the training script
exp.start(training_model)
def set_model(model_file, device, batch_size, address, cluster):
"""Set the Tensorflow openFOAM ML model in the orchestrator
:param model_file: A full path to the model file
:type model_file: str
:param device: The device to use for model evaluation
(e.g. CPU or GPU)
:type device: str
:param batch_size: The batch size to use model evaluation
:type batch_size: int
:param address: The address to use for client connection
:type address: str
:param cluster: Boolean for cluster or non-cluster connection
:type cluster: bool
"""
client = Client(address=address, cluster=cluster)
client.set_model_from_file("ml_sa_cg_model",
model_file,
"TF",
device,
batch_size,
0,
"v0.0",
["x"],
["Identity"])
def generate_simulation_files(node_count, tasks_per_node,
sim_input_dir, sim_name, gen_dir):
"""Generate the OpenFOAM simulation directory
:param node_count: The number of compute nodes
to use for the simulation
:type node_count: int
:param tasks_per_node: The number of tasks
per compute node
:type tasks_per_node: int
:param sim_input_dir: The directory where sim input
files are located
:type sim_input_dir: str
:param sim_name: The name of the simulation (model)
:type sim_name: str
:param gen_dir: The directory where data generation cases
reside
:type gen_dir: str
"""
# Calculate the closest near-square values of n_proc
# In the worst case, 1 x n_proc will be used for
# decomposition
n_proc = node_count * tasks_per_node
big = math.ceil(math.sqrt(n_proc))
small = math.floor(n_proc/big)
while small * big != float(n_proc):
big -= 1
small = math.floor(n_proc/big)
# Save the processor counts as a single string
# param for now since multiple tags per line
# is currently not supported
params = {
"proc_x_y":str(big) + " " + str(small),
"n_procs":str(n_proc)}
# Create a SmartSim model that will generate and
# configure all files
model = exp.create_model(sim_name, None, params=params)
# Copy "means" files from data generation directory
files_to_copy = ["/".join([gen_dir,"means"])]
# Attach the entire data generation directory and files
# to copy
model.attach_generator_files(to_copy=files_to_copy,
to_configure=sim_input_dir)
# Generate the experiment file directory and replace
# config parameters
exp.generate(model, tag="@", overwrite=True)
def run_simulation(foam_env_vars, nodes, ppn, sim_dir):
"""Run the openFOAM simulation
:param foam_env_vars: Environment variables needed
to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param nodes: The number of compute nodes
to use for the simulation
:type nodes: int
:param ppn: The number of processors per node
:type ppn: int
:param sim_dir: The directory where the generated input files
are located
:type sim_dir: str
"""
# Store the executable as a variable
executable = foam_env_vars['FOAM_APPBIN'] + "/simpleFoam_ML"
# Set exec args to "-parallel" if needed
exe_args = None
if (nodes*ppn)>1:
exe_args = "-parallel"
# Create the simulation model
model_name = "sim"
model = create_of_model(nodes, ppn, executable,
exe_args, model_name, sim_dir,
foam_env_vars)
# Start the simulation model
exp.start(model)
def run_foamtovtk(foam_env_vars, sim_dir):
"""Run the foamToVTK utility to process output
files into VTK files
:param foam_env_vars: Environment variables needed
to run openFOAM
:type foam_env_vars: dict of str keys and str values
:param sim_dir: The directory where the generated simulation
files are located
:type sim_dir: str
"""
# Store the executable as a variable
executable = foam_env_vars['FOAM_APPBIN'] + "/foamToVTK"
# Create the reconstruction model
model_name = "foamtovtk"
nodes = 1
ppn = 1
exe_args = None
model = create_of_model(nodes, ppn, executable,
exe_args, model_name, sim_dir,
foam_env_vars)
# Start the fomatovtk utility
exp.start(model)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run OpenFOAM ML Experiment")
parser.add_argument("--db_nodes", type=int, default=1, help="Number of nodes for the database")
parser.add_argument("--db_port", type=int, default=6780, help="Port for the database")
parser.add_argument("--db_interface", type=str, default="ipogif0", help="Network interface for the database")
parser.add_argument("--gen_nodes", type=int, default=2, help="Number of nodes to use for each data generation case")
parser.add_argument("--gen_ppn", type=int, default=24, help="Number of processors per node for each generation case")
parser.add_argument("--sim_nodes", type=int, default=1, help="Number of nodes for the OpenFOAM inference case")
parser.add_argument("--sim_ppn", type=int, default=24, help="Number of processors per node for OpenFOAM inference case")
args = parser.parse_args()
# Orchestrator settings
db_node_count = args.db_nodes
db_port = args.db_port
db_interface = args.db_interface
db_cpus = 16
db_tpq = 4
# Data generation settings
gen_nodes_per_case = args.gen_nodes
gen_tasks_per_node = args.gen_ppn
gen_input_dir = "./data_generation/"
gen_name = "data_generation"
# Simulation settings
sim_node_count = args.sim_nodes
sim_tasks_per_node = args.sim_ppn
sim_input_dir = "./simulation_inputs/"
# Training settings (do not change)
training_node_count = 1
training_tasks_per_node = 1
training_dir = "./training"
# Model settings
model_file = "./" + exp_name + "/training/ML_SA_CG.pb"
device = "CPU"
batch_size = 1
# Script variables
sim_name = "openfoam"
sim_dir = "/".join([getcwd(),exp_name,sim_name])
gen_dir = "/".join([getcwd(),exp_name,gen_name])
# Create and set the global variable exp
exp = Experiment(name=exp_name, launcher="auto")
# Launch orchestrator
db = start_database(db_port, db_node_count, db_cpus, db_tpq, db_interface)
# Retrieve one of the orchestrator addresses to set
# the ML model into the database
address = db.get_address()[0]
# Retrieve OpenFOAM environment variables for execution
foam_env_vars = get_openfoam_env_vars()
# Generate the data generation input files
generate_data_gen_files(gen_nodes_per_case, gen_tasks_per_node,
gen_input_dir, gen_name)
# Run data generation domain decomposition
if (gen_tasks_per_node * gen_nodes_per_case) > 1:
run_data_gen_decomposition(foam_env_vars,gen_dir)
# Run the data generation cases
run_data_generation(foam_env_vars, gen_nodes_per_case,
gen_tasks_per_node,
gen_dir)
# Run the reconstruction step for data generation
if (gen_tasks_per_node * gen_nodes_per_case) > 1:
run_data_gen_reconstruction(foam_env_vars, gen_dir)
# Run the script to create training dataset
run_data_gen_dataset_construction(gen_dir)
# Train the ML model for the simulation
run_training(training_dir,
training_node_count,
training_tasks_per_node, gen_dir)
# Set the trained model into the database
set_model(model_file, device, batch_size, address,
bool(db_node_count>1))
# Generate simulation files
generate_simulation_files(sim_node_count, sim_tasks_per_node,
sim_input_dir, sim_name, gen_dir)
# Run decomposition for parallel execution
if sim_tasks_per_node * sim_node_count > 1:
run_decomposition(foam_env_vars, sim_dir,
model_prefix="sim_", block=True)
# Run the openFOAM simulation
run_simulation(foam_env_vars,
sim_node_count, sim_tasks_per_node, sim_dir)
# Run reconstruction for parallel execution
if sim_tasks_per_node * sim_node_count > 1:
run_reconstruction(foam_env_vars, sim_dir,
model_prefix="sim_", block=True)
# Run foamToVTK to generate VTK output files
run_foamtovtk(foam_env_vars, sim_dir)