This repository has been archived by the owner on Nov 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcase_mgmt.py
511 lines (422 loc) · 21.8 KB
/
case_mgmt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
import os
import shutil as sh
from wetb.hawc2 import HTCFile
from wetb.hawc2.htc_contents import HTCSection
import helper_functions as hf
import source_model as sm
import propagation_model as pm
import reception_model as rm
import reconstruction_model as cm
"""
========================================================================================================================
=== ===
=== All the stuff to manage Auralisation cases ===
=== ===
========================================================================================================================
Copyright (c) 2023 Josephine Pockelé. Licensed under MIT license.
"""
__all__ = ['CaseLoader', 'Case']
class CaseLoader:
# Some predefined values for the HAWC2.aero.aero_noise module
octave_bandwidth = '1'
output_filename = 'aeroload'
def __init__(self, project_path: str, case_file: str):
"""
================================================================================================================
Class to load an auralisation case defined by a .aur file.
================================================================================================================
:param project_path: path of the overarcing auralisation project folder.
:param case_file: file name of the .aur file inside the project folder.
"""
''' Preprocessing of the input parameters '''
# Create paths for project and for the HAWC2 model.
self.project_path = project_path
self.h2model_path = os.path.join(project_path, 'H2model')
self.case_file = os.path.join(project_path, case_file)
self.file_name = case_file.replace('.aur', '')
# Check that the input file actually exists.
if not os.path.isfile(self.case_file):
raise FileNotFoundError('Given input file name does not exist.')
''' Input file parsing process '''
# Open the input file and read the lines
with open(self.case_file, 'r') as f:
lines = f.readlines()
# Remove all leading and trailing spaces from the lines for parsing
lines = [line.strip(' ') for line in lines]
# Dictionaries to store inputs for the models
self.conditions_dict = {}
self.source_dict = {}
self.propagation_dict = {}
self.receiver_dict = {}
self.reception_dict = {}
self.reconstruction_dict = {}
# File paths and names
self.case_name = ''
self.htc_base_name = ''
self.htc_base_path = ''
self.htc_path = ''
self.hawc2_path = ''
# Placeholder for the real .htc file
self.htc = HTCFile()
# Parameters from the hawc2 input block
self.n_obs = 0
self.run_hawc = False
# Parse the input file lines
self._parse_input_file(lines)
''' Preparations for HAWC2 '''
# Set the variables to store the properties of the HAWC2 sphere
self.h2result_sphere = None
self.h2result_path = os.path.join(self.h2model_path, 'res', self.case_name)
@staticmethod
def _get_blocks(lines: list):
"""
Obtain code blocks from the given list of lines
:param lines: list of lines containing auralisation input code
:return: a dictionary containing the code blocks. dict(str: list(str))
"""
# Create empty list for returning the blocks
blocks = {}
# Go over all the lines
while lines:
line = lines.pop(0)
# Figure out the blocks
if line.startswith('begin'):
# Get the name of the current block
_, block_name, *_ = line.split(' ')
# Create block collection list
block = [line, ]
# Obtain all lines in the current block
while not block[-1].strip(' ').startswith(f'end {block_name}') and lines:
block.append(lines.pop(0))
# Add this block to the list
blocks[block_name] = block
# Put non-block lines in the dictionary as well
elif not (line.startswith(';') or line.startswith('\n')):
key, value, *_ = line.split(' ')
blocks[key] = value
return blocks
def _parse_input_file(self, lines: list):
"""
Parse the input file into the blocks and then parse those.
:param lines: list of lines containing auralisation input code
"""
# Obtain the blocks from the input file lines
blocks = self._get_blocks(lines)
self.case_name = blocks['name']
# Check if input file contains required stuff
for block_name_required in ('conditions', 'HAWC2', 'source', 'propagation', 'reception', 'reconstruction'):
if block_name_required not in blocks.keys():
raise KeyError(f'No {block_name_required} block found in {self.case_name}')
# Go over all loaded blocks and send to respective parser functions
for block_name, block in blocks.items():
# Parse Conditions block
if block_name == 'conditions':
self._parse_conditions(block)
# Parse HAWC2 block
elif block_name == 'HAWC2':
self._parse_hawc2(block)
# Parse source model block
elif block_name == 'source':
self._parse_source(block)
# Parse propagation model block
elif block_name == 'propagation':
self._parse_propagation(block)
# Parse reception model block
elif block_name == 'reception':
self._parse_reception(block)
# Parse reconstruction model block
elif block_name == 'reconstruction':
self._parse_reconstruction(block)
def _parse_conditions(self, lines: list):
"""
Parse the conditions block into the conditions dictionary
:param lines: list of lines containing auralisation input code
"""
for line in lines[1:-1]:
if not (line.startswith(';') or line.startswith('\n')):
key, value, *_ = line.split(' ')
if key in ('rotor_radius', 'rotor_rpm', 'wsp', 'z_wsp', 'z0_wsp', 'groundtemp', 'groundpres',
'humidity', 'delta_t', ):
self.conditions_dict[key] = float(value)
elif key in ():
self.conditions_dict[key] = int(value)
elif key == 'hub_pos':
x, y, z = value.split(',')
self.conditions_dict[key] = hf.Cartesian(float(x), float(y), float(z))
else:
self.conditions_dict[key] = value
def _parse_hawc2(self, lines: list):
"""
Parse the HAWC2 block and create the case .htc file
:param lines: list of lines containing auralisation input code
"""
# Get the blocks from the HAWC2 block
blocks = self._get_blocks(lines[1:-1])
try:
self.run_hawc = bool(int(blocks['run_hawc2']))
except KeyError:
self.run_hawc = False
# Get the name and path of the base .htc file to work with
self.htc_base_name = blocks['htc_name']
self.htc_base_path = os.path.join(self.h2model_path, f'{self.htc_base_name}.htc')
# Load the base htc file
self.htc = HTCFile(filename=self.htc_base_path)
# Add the case "wind" and "aero_noise" sections
self.htc.add_section(HTCSection.from_lines(blocks['wind']))
self.htc.aero.add_section(HTCSection.from_lines(blocks['aero_noise']))
# Add necessary parameters to the "aero_noise" section
self.htc.aero.aero_noise.add_line(name='temperature', values=(self.conditions_dict['groundtemp'],),
comments='')
self.htc.aero.aero_noise.add_line(name='atmospheric_pressure', values=(self.conditions_dict['groundpres'],),
comments='')
self.htc.aero.aero_noise.add_line(name='output_filename', values=(self.output_filename, ),
comments='')
self.htc.aero.aero_noise.add_line(name='octave_bandwidth', values=(self.octave_bandwidth,),
comments='')
# Create the path for the case-specific htc file and save the htc file
self.htc_path = os.path.join(self.h2model_path, f'{self.htc_base_name}_{self.case_name}.htc')
# Extract the HAWC2 executable file location
self.hawc2_path = os.path.join(blocks['hawc2_path'])
# Extract the number of observer points
self.n_obs = int(blocks['n_obs'])
def _parse_source(self, lines: list):
"""
Parse the source block
:param lines: list of lines containing auralisation input code
"""
blocks = self._get_blocks(lines[1:-1])
for key, value in blocks.items():
if key in ('blade_percent', 'radius_factor'):
self.source_dict[key] = float(value)
elif key in ('n_rays', 'n_threads'):
self.source_dict[key] = int(value)
else:
self.source_dict[key] = value
def _parse_propagation(self, lines: list):
"""
Parse the propagation block
:param lines: list of lines containing auralisation input code
"""
blocks = self._get_blocks(lines[1:-1])
for key, value in blocks.items():
if key in ():
self.propagation_dict[key] = float(value)
elif key in ('n_threads', ):
self.propagation_dict[key] = int(value)
elif key == 'models':
self.propagation_dict[key] = tuple(value.split(','))
elif key in ('pickle', 'unpickle'):
self.propagation_dict[key] = bool(int(value))
else:
self.propagation_dict[key] = value
if 'models' not in self.propagation_dict.keys():
self.propagation_dict['models'] = ()
def _parse_receiver(self, lines: list):
"""
Parse a reception > receiver block
:param lines: list of lines containing auralisation input code
"""
blocks = self._get_blocks(lines[1:-1])
parse_dict = {}
for key, value in blocks.items():
if key in ('rotation', ):
parse_dict[key] = float(value)
elif key in ('index', ):
parse_dict[key] = int(value)
elif key in ('pos', ):
parse_dict['pos'] = (float(val) for val in value.split(','))
else:
parse_dict[key] = value
self.receiver_dict[parse_dict['index']] = rm.Receiver(parse_dict)
def _parse_reception(self, lines: list):
"""
Parse the reception block
:param lines: list of lines containing auralisation input code
"""
blocks = self._get_blocks(lines[1:-1])
for key, block in blocks.items():
if key in ():
self.reception_dict[key] = float(block)
elif key in ():
self.reception_dict[key] = int(block)
elif key.startswith('receiver'):
self._parse_receiver(block)
elif key in ('save_spectrogram', 'load_spectrogram'):
self.reception_dict[key] = bool(int(block))
else:
self.reception_dict[key] = block
def _parse_reconstruction(self, lines: list):
"""
Parse the reconstruction block
:param lines: list of lines containing auralisation input code
"""
blocks = self._get_blocks(lines[1:-1])
for key, value in blocks.items():
if key in ('wav_norm', 't_audio', ):
self.reconstruction_dict[key] = float(value)
elif key in ('f_s_desired', 'overlap', 'zeropad', ):
self.reconstruction_dict[key] = int(value)
else:
self.reconstruction_dict[key] = value
class Case(CaseLoader):
def __init__(self, project_path: str, case_file: str):
"""
================================================================================================================
Class to manage an auralisation case defined by a .aur file.
================================================================================================================
:param project_path: path of the overarcing auralisation project folder.
:param case_file: file name of the .aur file inside the project folder.
"""
# Call the CaseLoader
super().__init__(project_path, case_file)
''' Setup of the models '''
# Set the path for the atmosphere cache file
self.atmosphere_path = os.path.join(self.project_path, f'atm/atm_{self.case_name}.dat')
# Generate atmosphere if it does not exist yet
if not os.path.isfile(self.atmosphere_path):
self.atmosphere = hf.Atmosphere(self.conditions_dict['z_wsp'], self.conditions_dict['wsp'],
self.conditions_dict['humidity'], wind_z0=self.conditions_dict['z0_wsp'],
delta_h=1., t_0m=self.conditions_dict['groundtemp'],
p_0m=self.conditions_dict['groundpres'], atm_path=self.atmosphere_path)
# Otherwise, load the cache file
else:
self.atmosphere = hf.Atmosphere(self.conditions_dict['z_wsp'], self.conditions_dict['wsp'],
self.conditions_dict['humidity'], wind_z0=self.conditions_dict['z0_wsp'],
atm_path=self.atmosphere_path)
def generate_hawc2_sphere(self):
"""
Generate a sphere of evenly spaced observer points
"""
coordinates, fail, _ = hf.uniform_spherical_grid(self.n_obs)
if fail:
raise ValueError(f'Parameter n_obs = {self.n_obs} resulted in incomplete sphere. Try a different value.')
scale = self.source_dict['radius_factor'] * self.conditions_dict['rotor_radius']
self.h2result_sphere = [scale * coordinate + self.conditions_dict['hub_pos'] for coordinate in coordinates]
for pi, p in enumerate(self.h2result_sphere):
self.htc.aero.aero_noise.add_line(name='xyz_observer', values=p.vec, comments=f'Observer_{pi}')
def _simulate_hawc2(self):
"""
Run the HTCFile.simulate function with compensation for its stupidity
"""
try:
self.htc.simulate(self.hawc2_path)
# If an error is thrown, just smile and wave
except Exception as e:
return e
def run_hawc2(self):
"""
Run the HAWC2 simulations for this case.
"""
''' Preprocessing '''
# Make sure the HAWC2 path is valid
if not os.path.isfile(self.hawc2_path):
raise FileNotFoundError('Invalid file path given for HAWC2.')
print(' -- HAWC2')
# Make sure an observer sphere is generated
if self.h2result_sphere is None:
self.generate_hawc2_sphere()
print('Generating observer sphere: Done!')
# Set the aero_noise simulation mode to 2, meaning to run and store the needed parameters
self.htc.aero.aero_noise.add_line(name='noise_mode', values=('2', ))
self.htc.save(self.htc_path)
''' Running HAWC2 '''
# Create and start a progress thread to continuously print the progress and .....
p_thread = hf.ProgressThread(2, 'Running HAWC2 simulations')
p_thread.start()
# Run the base simulation
self._simulate_hawc2()
# Set 1 to 2 in the progress thread
p_thread.update()
# Prepare for noise simulation by setting the noise mode to calculate
self.htc.aero.aero_noise.noise_mode = 3
self.htc.save(self.htc_path)
# Run the noise simulation
self._simulate_hawc2()
# Stop the progress thread
p_thread.stop()
del p_thread
''' Postprocessing '''
# Remove the temp htc file
os.remove(self.htc_path)
# Create a directory to move noise results to if it does not exist yet
if not os.path.isdir(self.h2result_path):
os.mkdir(self.h2result_path)
# Loop over all generated result files
for fname in os.listdir(os.path.join(self.h2model_path, 'res')):
# Create the path string for the current result file
fpath = os.path.join(self.h2model_path, 'res', fname)
# Store the base HAWC2 output files
if fname.endswith('.sel') or fname.endswith('.dat'):
sh.move(fpath, self.h2result_path)
# Store the noise output files
elif fname.endswith('.out'):
# But only the relevant ones...
if fname.startswith('aeroload_noise_psd'):
sh.move(fpath, self.h2result_path)
# Remove the other ones
else:
os.remove(fpath)
def run(self):
"""
Run everything except HAWC2
"""
# ----------------------------------------------------------------------------------------------------------
# Model setup
# ----------------------------------------------------------------------------------------------------------
dummy = self.reception_dict['load_spectrogram'] or self.propagation_dict['unpickle']
source_model = sm.SourceModel(self.conditions_dict, self.source_dict, self.h2result_path, self.atmosphere,
dummy=dummy)
propagation_model = pm.PropagationModel(self.conditions_dict, self.propagation_dict, self.atmosphere)
reception_model = rm.ReceptionModel(self.conditions_dict, self.reception_dict)
reconstruction_model = cm.ReconstructionModel(self.conditions_dict, self.reconstruction_dict)
# ----------------------------------------------------------------------------------------------------------
# Folder setup
# ----------------------------------------------------------------------------------------------------------
pickle_base_path = os.path.join(self.project_path, 'pickles', self.case_name)
if (not os.path.isdir(pickle_base_path)) and self.propagation_dict['pickle']:
os.mkdir(pickle_base_path)
spectrogram_base_dir = os.path.join(self.project_path, 'spectrograms', self.file_name)
if (not os.path.isdir(spectrogram_base_dir)) and self.reception_dict['save_spectrogram']:
os.mkdir(spectrogram_base_dir)
wavfiles_base_dir = os.path.join(self.project_path, 'wavfiles', self.file_name)
if not os.path.isdir(wavfiles_base_dir):
os.mkdir(wavfiles_base_dir)
# ----------------------------------------------------------------------------------------------------------
# Receivers loop
# ----------------------------------------------------------------------------------------------------------
receiver: rm.Receiver
for rec_idx, receiver in self.receiver_dict.items():
pickle_path = os.path.join(pickle_base_path, f'rec{rec_idx}')
spectrogram_path_left = os.path.join(spectrogram_base_dir, f'rec{rec_idx}_left.csv')
spectrogram_path_right = os.path.join(spectrogram_base_dir, f'rec{rec_idx}_right.csv')
print()
if not self.reception_dict['load_spectrogram']:
# --------------------------------------------------------------------------------------------------
# Propagation
# --------------------------------------------------------------------------------------------------
if not self.propagation_dict['unpickle']:
print(f' -- Running Propagation Model for receiver {rec_idx}')
ray_queue: list[pm.SoundRay] = source_model.run(receiver, self.propagation_dict['models'])
propagation_model.run(receiver, ray_queue)
if self.propagation_dict['pickle']:
propagation_model.pickle_ray_queue(ray_queue, pickle_path)
receiver.pickle(os.path.join(pickle_path, 'receiver.pickle.gz'))
else:
ray_queue = propagation_model.unpickle_ray_queue(pickle_path)
# --------------------------------------------------------------------------------------------------
# Reception
# --------------------------------------------------------------------------------------------------
print(f' -- Running Reception Model for receiver {rec_idx}')
reception_model.run(receiver, ray_queue, self.propagation_dict['models'])
del ray_queue
if self.reception_dict['save_spectrogram']:
receiver.spectrogram_to_csv(spectrogram_path_left, spectrogram_path_right)
else:
print(f' -- Running Reception Model for receiver {rec_idx}')
receiver.spectrogram_from_csv(spectrogram_path_left, spectrogram_path_right)
# ----------------------------------------------------------------------------------------------------------
# Reconstruction
# ----------------------------------------------------------------------------------------------------------
print(f' -- Running Reconstruction Model for receiver {rec_idx}')
reconstruction_model.run(receiver, os.path.join(wavfiles_base_dir, f'rec{rec_idx}.wav'))
del receiver