This repository has been archived by the owner on Nov 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpropagation_model.py
713 lines (611 loc) · 30.9 KB
/
propagation_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
import os
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import compress_pickle as pickle
from typing import Self
from scipy.special import erfc
from matplotlib.widgets import Slider
from matplotlib.animation import FuncAnimation
import helper_functions as hf
from reception_model import Receiver
"""
========================================================================================================================
=== ===
=== The propagation model for this auralisation tool ===
=== ===
========================================================================================================================
Copyright (c) 2023 Josephine Pockelé. Licensed under MIT license.
"""
__all__ = ['Ray', 'SoundRay', 'PropagationModel', ]
np.seterr(invalid='ignore')
sigma_e_dict = {
'snow': 25.,
'forest': 50.,
'grass': 250.,
'dirt_roadside': 500.,
'dirt': 5e3,
'asphalt': 10e3,
'concrete': 50e3
}
def spherical_wave_correction(w: np.array) -> np.array:
"""
Determines the spherical wave correction factor of ground reflection (Eq. 3.18, Arntzen, 2014)
- Arntzen, M. (2014). Aircraft noise calculation and synthesis in a non-standard atmosphere
[Doctoral thesis, Delft University of Technology]. doi: 10.4233/uuid:c56e213c-82db-423d-a5bd-503554653413
:param w: numerical distance as defined by Eq. 3.19 (Arntzen, 2014)
"""
res = 1 + 1j * w * np.sqrt(np.pi) * np.exp(-w ** 2) * erfc(-1j * w)
res[np.logical_or(np.isnan(res), np.isinf(res))] = 0.
return res
def numerical_distance(f: np.array, r2: float, theta: float, z: np.array):
"""
Determines the numerical distance for the spherical wave correction of ground reflection (Eq. 3.19, Arntzen, 2014)
- Arntzen, M. (2014). Aircraft noise calculation and synthesis in a non-standard atmosphere
[Doctoral thesis, Delft University of Technology]. doi: 10.4233/uuid:c56e213c-82db-423d-a5bd-503554653413
:param f: array of frequencies (Hz)
:param r2: travel path length of reflected sound ray (m)
:param theta: ground grazing angle of reflected sound ray (rad)
:param z: normalised acoustic surface impedance (-)
"""
return np.sqrt(1j * (2 * np.pi * f / hf.c) * (r2 / 2) * ((np.sin(theta) + (1 / z)) ** 2) / (1 + (np.sin(theta) / z)))
def ground_impedance(f: np.array, sigma_e: float):
"""
Determines the normalised acoustic surface impedance of ground reflection (Eq. 3.20, Arntzen, 2014)
- Arntzen, M. (2014). Aircraft noise calculation and synthesis in a non-standard atmosphere
[Doctoral thesis, Delft University of Technology]. doi: 10.4233/uuid:c56e213c-82db-423d-a5bd-503554653413
:param f: array of frequencies (Hz)
:param sigma_e: effective flow resitivity of surface (Pa / m2 / s)
"""
return 1 + .0511 * (f / sigma_e) ** (-.75) + 0.0768j * (f / sigma_e) ** (-.73)
def oxygen_resonance_frequency(humidity_abs: float, pressure: float, p_0: float = 101325.) -> float:
"""
Equation for the sound resonance frequency of oxygen atoms (Equation 4, Bass et al., 1995)
- Bass, H. E., Sutherland, L. C., Zuckerwar, A. J., Blackstock, D. T., & Hester, D. M. (1995).
Atmospheric absorption of sound: Further developments. The Journal of the Acoustical Society of America, 97(1),
680–683. doi: 10.1121/1.412989
:param humidity_abs: Absolute humidity (%)
:param pressure: Air pressure (Pa)
:param p_0: Optional overwrite of the reference pressure defining 1 atmosphere (Pa)
"""
return (pressure / p_0) * (24. + 4.04e4 * humidity_abs * (.02 + humidity_abs) / (.391 + humidity_abs))
def nitrogen_resonance_frequency(humidity_abs: float, pressure: float, temperature: float,
p_0: float = 101325., t_0: float = 293.15) -> float:
"""
Equation for the sound resonance frequency of nitrogen atoms (Equation 5, Bass et al., 1995)
- Bass, H. E., Sutherland, L. C., Zuckerwar, A. J., Blackstock, D. T., & Hester, D. M. (1995).
Atmospheric absorption of sound: Further developments. The Journal of the Acoustical Society of America, 97(1),
680–683. doi: 10.1121/1.412989
:param humidity_abs: Absolute humidity (%)
:param pressure: Air pressure (Pa)
:param temperature: Air temperature (K)
:param p_0: Optional overwrite of the reference pressure defining 1 atmosphere (Pa)
:param t_0: Optional overwrite of the reference temperature (K)
"""
f1 = (pressure / p_0) * ((t_0 / temperature) ** .5)
f2 = (9. + 280. * humidity_abs * np.exp(-4.17 * ((t_0 / temperature) ** (1 / 3) - 1)))
return f1 * f2
def water_vapour_saturation_pressure(temperature: float, ) -> float:
"""
Equation for the water vapour saturation pressure (Equation 4, Bass et al., 1995)
- Bass, H. E., Sutherland, L. C., Zuckerwar, A. J., Blackstock, D. T., & Hester, D. M. (1995).
Atmospheric absorption of sound: Further developments. The Journal of the Acoustical Society of America, 97(1),
680–683. doi: 10.1121/1.412989
:param temperature: Air temperature (K)
"""
return 101325 * 10 ** (-6.8346 * (273.16 / temperature) ** 1.261 + 4.6151)
def atm_absorption_coefficient(f: np.array, humidity: float, pressure: float, temperature: float,
p_0: float = 101325., t_0: float = 293.15) -> np.array:
"""
Equation for the water vapour saturation pressure (Equation 3, Bass et al., 1995)
- Bass, H. E., Sutherland, L. C., Zuckerwar, A. J., Blackstock, D. T., & Hester, D. M. (1995).
Atmospheric absorption of sound: Further developments. The Journal of the Acoustical Society of America, 97(1),
680–683. doi: 10.1121/1.412989
:param f: array of frequencies (Hz)
:param humidity: Relative humidity (%)
:param pressure: Air pressure (Pa)
:param temperature: Air temperature (K)
:param p_0: Optional overwrite of the reference pressure defining 1 atmosphere (Pa)
:param t_0: Optional overwrite of the reference temperature (K)
"""
# Determine saturation pressure
p_sat = water_vapour_saturation_pressure(temperature)
# Determine absolute humidity from relative
humidity_abs = humidity * p_sat / pressure
# Determine gas resonance frequencies
f_rn = nitrogen_resonance_frequency(humidity_abs, pressure, temperature, p_0, t_0)
f_ro = oxygen_resonance_frequency(humidity_abs, pressure, p_0)
# Determine the individual terms of the absorption coefficient
term_1 = 1.84e-11 / ((pressure / p_0) * (t_0 / temperature) ** .5)
term_2 = .1068 * np.exp(-3352 / temperature) * f_rn / (f ** 2 + f_rn ** 2)
term_3 = .01278 * np.exp(-2239.1 / temperature) * f_ro / (f ** 2 + f_ro ** 2)
# Return the absorption coefficient
return (term_1 + (term_2 + term_3) * (t_0 / temperature) ** 2.5) * f ** 2
class Ray:
def __init__(self, pos_0: hf.Cartesian, vel_0: hf.Cartesian, s_0: float, source_pos: hf.Cartesian,
t_0: float = 0.) -> None:
"""
================================================================================================================
Class for the propagation sound ray model.
================================================================================================================
:param pos_0: initial position in cartesian coordinates (m, m, m)
:param vel_0: initial velocity in cartesian coordinates (m/s, m/s, m/s)
:param s_0: initial beam length (m)
:param t_0: the start time of the ray propagation (s)
"""
# Set initial conditions
self.pos = np.array([pos_0, ])
self.vel = np.array([vel_0, ])
self.dir = np.array([vel_0, ])
self.t = np.array([t_0, ])
self.s = np.array([s_0])
self.received = False
self.reflections = 0
self.source_pos = source_pos
def __repr__(self):
return f'<Ray at {self.pos[-1]}, received={self.received}>'
def update_ray_velocity(self, delta_t: float, atmosphere: hf.Atmosphere) -> (hf.Cartesian, hf.Cartesian):
"""
Update the ray velocity and direction forward by time step delta_t.
:param delta_t: time step (s)
:param atmosphere: the hf.Atmosphere to propagate the Ray through
:return: Updated Cartesian velocity (m/s, m/s, m/s) and direction (m/s, m/s, m/s) vectors of the sound ray
"""
# Get height from last position
height = -self.pos[-1][2]
# Determine direction change
speed_of_sound_gradient = atmosphere.get_speed_of_sound_gradient(height)
direction_change: hf.Cartesian = self.vel[-1].len() * hf.Cartesian(0, 0, speed_of_sound_gradient)
direction: hf.Cartesian = self.dir[-1] + direction_change * delta_t
# Get wind speed and speed of sound at height
wind_speed = atmosphere.get_wind_speed(height)
speed_of_sound = atmosphere.get_speed_of_sound(height)
# Determine new ray velocity v = u + c * direction
if direction.len() > 0:
vel: hf.Cartesian = wind_speed * hf.Cartesian(0, 1, 0) + speed_of_sound * direction / direction.len()
else:
vel: hf.Cartesian = wind_speed * hf.Cartesian(0, 1, 0)
return vel, direction
def update_ray_position(self, delta_t: float, vel: hf.Cartesian, direction: hf.Cartesian) -> (hf.Cartesian, float):
"""
Update the ray position forward by time step delta_t and determine path step delta_s.
:param delta_t: time step (s)
:param vel: Cartesian velocity vector (m/s, m/s, m/s) for current time step
:param direction: Cartesian direction vector (m/s, m/s, m/s) for current time step
:return: Updated position (m, m, m) for current time step and path step (m)
"""
# Determine new position with forward euler stepping
pos_new = self.pos[-1] + vel * delta_t
# Check for reflections and if so: invert z-coordinate and z-velocity and z-direction
# TODO: if the ground effect is not selected, no reflection should happen at all.
# Only direct sound should then be considered.
if pos_new[2] >= 0:
pos_new[2] = -pos_new[2]
vel[2] = -vel[2]
direction[2] = -direction[2]
self.reflections += 1
# Calculate ray path travel
delta_s = (vel * delta_t).len()
return pos_new, delta_s
def ray_step(self, delta_t: float, atmosphere: hf.Atmosphere) -> (hf.Cartesian, hf.Cartesian, hf.Cartesian, float):
"""
Logic for time stepping the sound rays forward one time step.
:param delta_t: time step (s)
:param atmosphere: the hf.Atmosphere to propagate the Ray through
:return: updated Cartesian velocity (m/s, m/s, m/s) and direction (m/s, m/s, m/s) vectors of the sound ray,
updated sound ray position (m, m, m), and the ray path step (m)
"""
# Run the above functions to update velocity and position
vel, direction = self.update_ray_velocity(delta_t, atmosphere)
pos, delta_s = self.update_ray_position(delta_t, vel, direction)
# Propagate time
self.t = np.append(self.t, self.t[-1] + delta_t)
# Store new velocity and direction
self.vel = np.append(self.vel, (vel,))
self.dir = np.append(self.dir, (direction,))
# Store new position
self.pos = np.append(self.pos, (pos, ))
# Propagate travelled distance
self.s = np.append(self.s, self.s[-1] + delta_s)
return vel, direction, pos, delta_s
def check_reception(self, receiver: Receiver, delta_s: float) -> bool:
"""
Check if the ray went past a receiver point.
:param receiver: instance of rm.Receiver
:param delta_s: the last ray path step (m)
:return: boolean indicating whether the receiver has been passed
"""
# Cannot check if less than two points
if self.pos.size < 2:
return False
# Determine perpendicular planes with last 2 points
plane1 = hf.PerpendicularPlane3D(self.pos[-1], self.pos[-2])
plane2 = hf.PerpendicularPlane3D(self.pos[-2], self.pos[-1])
# Determine distances between planes and receiver point
dist1 = plane1.distance_to_point(receiver)
dist2 = plane2.distance_to_point(receiver)
# Check condition for point being between planes
if dist1 <= delta_s and dist2 <= delta_s:
# If yes, the ray has passed the receiver: SUCCESS!!!
return True
# If all else fails: this ray has not yet passed, probably
return False
def propagation_effects(self, atmosphere: hf.Atmosphere) -> None:
"""
TO BE OVERWRITTEN: accommodation for sound propagation effects functionality in subclasses.
:param atmosphere: the hf.Atmosphere to propagate the Ray through
"""
pass
def propagate(self, delta_t: float, receiver: Receiver, atmosphere: hf.Atmosphere, t_lim: float) -> None:
"""
Propagate the Ray until received or kill condition is reached.
:param delta_t: time step (s)
:param receiver: instance of rm.Receiver
:param atmosphere: the hf.Atmosphere to propagate the Ray through
:param t_lim: propagation time limit (s)
"""
# Set initial loop conditions
self.received = False
kill = self.pos[0][2] >= 0
# Add initial propagation effects
self.propagation_effects(atmosphere)
# Loop while condition holds
while not (self.received or kill):
# Step the ray forward
vel, direction, pos, delta_s = self.ray_step(delta_t, atmosphere)
# Check if it is received at the given receiver
self.received = self.check_reception(receiver, delta_s)
# Add propagation effects
self.propagation_effects(atmosphere)
# Kill the ray when time limit is exceeded
kill = self.t[-1] - self.t[0] > t_lim
def pos_array(self) -> np.array:
"""
Convert position history to an easily readable array of shape (self.pos.size, 3).
:return: numpy array with full position history unpacked
"""
# Create empty initial array
arr = np.zeros((3, self.pos.size))
# Loop over the list of coordinates
for pi, pos in enumerate(self.pos):
# Fill the position in the array
arr[:, pi] = pos.vec
return arr
class SoundRay(Ray):
def __init__(self, pos_0: hf.Cartesian, vel_0: hf.Cartesian, s_0: float, source_pos: hf.Cartesian,
beam_width: float, amplitude_spectrum: pd.DataFrame, models: tuple, ground_type: str = 'grass',
t_0: float = 0., label: str = None) -> None:
"""
================================================================================================================
Class for the propagation sound ray model. With the sound spectral effects.
================================================================================================================
:param pos_0: initial position in cartesian coordinates (m, m, m)
:param vel_0: initial velocity in cartesian coordinates (m/s, m/s, m/s)
:param s_0: initial beam length (m)
:param beam_width: initial beam width angle (rad)
:param ground_type: type of ground surface
:param t_0: the start time of the ray propagation (s)
:param label: a string label for SoundRay
"""
super().__init__(pos_0, vel_0, s_0, source_pos, t_0)
self.bw = beam_width
self.label = label
self.models = models
self.ground_type = ground_type
# Initialise the sound spectrum
self.spectrum = pd.DataFrame(amplitude_spectrum)
# Set the column name of the given spectrum to 'a' for amplitude
self.spectrum.columns = ['a']
# Initialise phase and the attenuation effects
self.spectrum['p'] = np.random.uniform(-np.pi, np.pi, self.spectrum.index.size)
self.spectrum['gaussian'] = 1.
self.spectrum['spherical'] = 1.
self.spectrum['atmospheric'] = 1.
self.spectrum['ground'] = 1.
def copy(self) -> Self:
"""
Create a not-yet-propagated copy of this SoundRay.
"""
return SoundRay(self.pos[0], self.vel[0], self.s[0], self.source_pos, self.bw, self.spectrum['a'],
self.models, self.ground_type, self.t[0], self.label)
def propagation_effects(self, atmosphere: hf.Atmosphere) -> None:
"""
Add atmospheric absorption and spherical spreading to the sound spectrum.
:param atmosphere: the hf.Atmosphere to propagate the SoundRay through
"""
# Get current temperature, pressure and speed of sound from the atmosphere
t_current, p_current, _, c_current, _ = atmosphere.get_conditions(-self.pos[-1][2])
if self.t.size >= 2:
c_previous = atmosphere.get_speed_of_sound(-self.pos[-2][2])
# Spherical spreading factor
self.spectrum['spherical'] /= (self.s[-1] / self.s[-2]) * np.sqrt(c_previous / c_current)
# Extract frequencies from spectrum
f = self.spectrum.index
# Determine the absorption coefficient spectrum
alpha = atm_absorption_coefficient(f, atmosphere.humidity, p_current, t_current, )
# Determine the travel distance of the last segment
delta_s = self.s[-1] - self.s[-2] if self.t.size >= 2 else self.s[-1]
# Add absorption to the spectrum
self.spectrum['atmospheric'] *= np.exp(-alpha * delta_s / 2)
def gaussian_factor(self, receiver: Receiver) -> None:
"""
Calculate the Gaussian beam reception transfer function.
:param receiver: an instance of Receiver
"""
# Determine the perpendicular plane just before the receiver
plane = hf.PerpendicularPlane3D(self.pos[-1], self.pos[-2])
# Determine distance from that plane to the receiver
dist1 = plane.distance_to_point(receiver)
# Determine the distance between the ray-plane intersection and the receiver
dist2 = self.pos[-2].dist(receiver)
# Determine the distance between the ray and the receiver
n_sq = dist2**2 - dist1**2
# Determine the distance along the ray to the intersecting line
s = self.s[-2] + dist1
# Determine the filter and clip to between 0 and 1
self.spectrum['gaussian'] = np.clip(np.exp(-n_sq / ((self.bw * s)**2 + 1/(np.pi * self.spectrum.index))), 0, 1)
def ground_effect(self, receiver: Receiver):
"""
:param receiver:
"""
if self.reflections == 1:
rng = np.sqrt((self.source_pos[0] - receiver[0]) ** 2 + (self.source_pos[1] - receiver[1]) ** 2)
h_s = -self.source_pos[2]
h_m = -receiver[2]
th = np.arctan2(h_s + h_m, rng)
f = self.spectrum.index.to_numpy()
z = ground_impedance(f, sigma_e_dict[self.ground_type] * 1000)
rp = (z * np.sin(th) - 1) / (z * np.sin(th) + 1)
w = numerical_distance(f, self.s[-2], th, z)
fs = spherical_wave_correction(w)
q = rp + (1 - rp) * fs
self.spectrum['ground'] *= q
elif self.reflections == 0:
return
else:
raise NotImplementedError('Multiple reflections are not supported at this time. '
'Ground effect could not be determined.')
def receive(self, receiver: Receiver, models: tuple) -> (float, pd.DataFrame, hf.Cartesian):
"""
Function that adds the SoundRay to the receiver.
:param receiver: instance of Receiver
:param models:
"""
# Determine the Gaussian beam attenuation spectrum
self.gaussian_factor(receiver)
self.spectrum['p'] += self.s[-2] * 2 * np.pi * self.spectrum.index / hf.c
# Take the amplitude and phase spectra
spectrum = self.spectrum[['a', 'p']].copy()
# Add the Gaussian beam attenuation
spectrum['a'] *= self.spectrum['gaussian'] ** .5
# Calculate the ground effect if ground reflections are to be included
if 'ground' in models:
self.ground_effect(receiver)
# If ground reflections are not to be included, set spectrum to 0 if the ray has been reflected
elif self.reflections > 0:
spectrum['a'] *= 0.
# Add attenuation from selected models
for model in models:
spectrum['a'] *= np.real(self.spectrum[model])
spectrum['p'] += np.angle(self.spectrum[model])
# Return what is needed to create a ReceivedSound instance
return self.t[-1], spectrum, self.source_pos
class PropagationModel:
def __init__(self, aur_conditions_dict: dict, aur_propagation_dict: dict, atmosphere: hf.Atmosphere) -> None:
"""
================================================================================================================
Class that manages the whole propagation model
================================================================================================================
:param aur_conditions_dict: conditions_dict from the Case class
:param aur_propagation_dict: propagation_dict from the Case class
:param atmosphere: the hf.Atmosphere to propagate the SoundRays through
"""
self.conditions_dict = aur_conditions_dict
self.params = aur_propagation_dict
self.atmosphere = atmosphere
def run(self, receiver: Receiver, in_queue: list[SoundRay]) -> None:
"""
Run the propagation model for one receiver.
:param receiver: instance of rm.Receiver
:param in_queue: queue.Queue instance containing non-propagated SoundRays
:return: A queue.Queue instance containing all propagated SoundRays.
"""
# Set the time limit to limit compute time
t_limit = 3 * receiver.dist(self.conditions_dict['hub_pos']) / hf.c
# Start a ProgressThread to follow the propagation
p_thread = hf.ProgressThread(len(in_queue), f'Propagating rays')
p_thread.start()
for ray in in_queue:
ray.propagate(self.conditions_dict['delta_t'], receiver, self.atmosphere, t_limit)
p_thread.update()
# Stop the ProgressThread
p_thread.stop()
del p_thread
@staticmethod
def pickle_ray_queue(ray_queue: list[SoundRay], ray_cache_path: str) -> None:
"""
Cache a queue of Rays to compressed pickles in the given directory.
:param ray_queue: queue.Queue containing Rays (or SoundRays)
:param ray_cache_path: path to store the pickles to
"""
# Check the existence of the cache directory
if not os.path.isdir(ray_cache_path):
os.mkdir(ray_cache_path)
# Start a ProgressThread
p_thread = hf.ProgressThread(len(ray_queue), 'Pickle-ing sound rays')
p_thread.start()
# Loop over the queue without emptying it
ray: SoundRay
for ray in ray_queue:
# Open the pickle file
ray_file = open(os.path.join(ray_cache_path, f'SoundRay_{p_thread.step}.pickle.gz'), 'wb')
# Dump the pickle
pickle.dump(ray, ray_file)
# Close the pickle file
ray_file.close()
# Update the ProgressThread
p_thread.update()
# Stop the ProgressThread
p_thread.stop()
del p_thread
@staticmethod
def unpickle_ray_queue(ray_cache_path: str) -> list[SoundRay]:
"""
Read out a cache directory of compressed Ray pickles.
:param ray_cache_path: path to directory containing pickles
"""
# Check the existence of the cache directory
if not os.path.isdir(ray_cache_path):
raise NotADirectoryError(f'No {ray_cache_path} directory found. Cannot un-pickle ray queue.')
# Create empty queue.Queue
ray_queue = list[SoundRay]()
# Get paths of all files in the directory with ".pickle" in them
ray_paths = [ray_path for ray_path in os.listdir(ray_cache_path) if '.pickle' in ray_path]
# Check the existence of pickles
if len(ray_paths) <= 0:
raise FileNotFoundError(f'No pickles found in {ray_cache_path}. Cannot un-pickle ray queue.')
# Start a ProgressThread
p_thread = hf.ProgressThread(len(ray_paths), 'Un-pickle-ing sound rays')
p_thread.start()
# Loop over the pickles
for ray_path in ray_paths:
# Open the jar
ray_file = open(os.path.join(ray_cache_path, ray_path), 'rb')
# Take the pickle
ray_queue.append(pickle.load(ray_file))
# Close the jar
ray_file.close()
# Update the ProgressThread
p_thread.update()
# Stop the ProgressThread
p_thread.stop()
del p_thread
return ray_queue
@staticmethod
def interactive_ray_plot(ray_queue: list[SoundRay], receiver: Receiver, hub: hf.Cartesian = None, reference: str = None, gif_out: str = None) -> None:
"""
Makes an interactive plot of the SoundRays in a queue.Queue.
:param ray_queue: queue.Queue containing SoundRays
:param receiver: instance of rm.Receiver with which the SoundRays where propagated
:param hub:
:param reference: string indicating time reference frame, either 'source' or 'reception'
:param gif_out: A path to output a gif animation to
"""
if reference is None or reference == 'source':
idx = 0
elif reference == 'reception':
idx = -1
else:
raise ValueError(f'Invalid time reference frame given: reference={reference}')
# Create an empty ray dictionary
rays = dict[float: list]()
# Loop over the ray_queue without removing the SoundRays
for ray in ray_queue:
# Add any received rays to the dictionary
if ray.received:
# Just stupid sh!te to avoid floating point errors...
t = round(ray.t[idx], 10)
# Fill into the dictionary
if t in rays.keys():
rays[t].append(ray)
else:
rays[t] = list[SoundRay]([ray, ])
if hub is not None:
x_h, y_h, z_h = hub.vec
def update_plot(t_plt: float):
"""
Internal function to update the interactive plot with the Slider.
:param t_plt: time input (s)
"""
# Clear the plot
ax.clear()
# Re-set the axis limits and aspect ratio
ax.set_aspect('equal')
ax.set_xlim(-50, 50)
ax.set_ylim(-50, 50)
ax.set_zlim(0, 100)
# Re-set the labels
ax.set_xlabel('-x (m)')
ax.set_ylabel('y (m)')
ax.set_zlabel('-z (m)')
# Put the receiver point in there
ax.scatter(-receiver[0], receiver[1], -receiver[2], s=50, color='k')
# Get the rays at input time
ray_lst = list[SoundRay](rays[t_plt])
# Loop over all SoundRays at input time
for ry in ray_lst:
# Obtain the pos_array for nicer plotting
pos_array = ry.pos_array()
# Get the sound spectrum
_, spectrum, source_pos = ry.receive(receiver, ('spherical', 'atmospheric', 'ground'))
# Integrate to get the energy
energy = np.trapz(spectrum['a']**2, spectrum.index)
# Check that energy is not zero before continuing
if energy > 0:
# dB that energy
energy = 10 * np.log10(energy / hf.p_ref ** 2)
# Bin the energy
energy_bin = 5 * int(energy // 5) + 2.5
# Apply color to that energy
cmap_lvl = float(np.argwhere(levels == np.clip(energy_bin, 12.5, 62.5))) / (levels.size - 1)
color = cmap(cmap_lvl)
x, y, z = source_pos.vec
x_0, y_0, z_0 = ry.pos[0].vec
# Plot the ray
ax.plot(-pos_array[0], pos_array[1], -pos_array[2], color=color)
ax.plot((-x, -x_0), (y, y_0), (-z, -z_0), color=color, )
ax.scatter(-x, y, -z, color='k', marker='8')
if hub is not None:
ax.plot((-x, -x_h), (y, y_h), (-z, -z_h), color='k', )
def colorbar():
"""
Create the colorbar for the energy levels
"""
# Set the ticks and create an axis on the figure for the colorbar
ticks = np.arange(10, 60 + 5, 5)
ax_cbar = fig.add_axes([.85, 0.1, 0.05, 0.8])
norm = mpl.colors.BoundaryNorm(ticks, cmap.N)
plt.colorbar(mpl.cm.ScalarMappable(cmap=cmap, norm=norm),
cax=ax_cbar,
extend='both',
orientation='vertical',
label='Received OSPL of Sound Ray (dB) (Binned per 10 dB)'
)
# Create the main plot
fig = plt.figure()
ax = fig.add_subplot(projection='3d')
# Pre-set the colorbar levels and colormap
levels = np.arange(12.5, 62.5 + 5, 5)
cmap = mpl.colormaps['viridis'].resampled(10)
# Adjust the main plot to make room for the colorbar
fig.subplots_adjust(left=0., bottom=0.1, right=0.85, top=1.)
colorbar()
# Preset the valstep parameter for the Slider, and to determine the frames of the animation
valstep = list(sorted(rays.keys()))
# Create an animated GIF file if so desired
if gif_out is not None:
ani = FuncAnimation(fig=fig, func=update_plot, frames=valstep, interval=(valstep[1] - valstep[0]) * 1000)
ani.save(gif_out, writer='pillow')
# Create the figure again to avoid issues with the animation stuff
fig = plt.figure()
ax = fig.add_subplot(projection='3d')
colorbar()
# Adjust the main plot to make room for the colorbar
fig.subplots_adjust(left=0., bottom=0.2, right=0.85, top=1.)
# Make a horizontal slider to control the time.
ax_time = fig.add_axes([0.11, 0.1, 0.65, 0.05])
slider = Slider(
ax=ax_time,
label='Time (s)',
valmin=valstep[0],
valmax=valstep[-1],
valstep=valstep,
valinit=valstep[0],
)
# Set the initial plot at the first available time step
update_plot(valstep[0])
# Set the slider update function
slider.on_changed(update_plot)
# Show the plot
plt.show()