-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplanck_dust_correlation.py
153 lines (130 loc) · 6.35 KB
/
planck_dust_correlation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import astropy.units as u
import healpy as hp
import numpy as np
import numpy.random.mtrand
from astropy.coordinates import SkyCoord, Longitude, Latitude
from mpi4py import MPI
from mpi_helper import r_print
from python_compat import range
comm = MPI.COMM_WORLD
enable_intersection = True
ar_map_nside = 2048
# use a different random seed for each MPI rank, but still use predictable results for easier debugging.
_ = np.random.mtrand.RandomState(comm.rank)
def ra_dec2ang(ra, dec):
return (90. - dec) * np.pi / 180., ra / 180. * np.pi
def main_loop(ar_map, ar_map_unc, max_bin_angle, max_angular_separation, outer_disc_mean, outer_disc):
ar_product = np.zeros(shape=num_bins)
ar_weights = np.zeros(shape=num_bins)
ar_counts = np.zeros(shape=num_bins)
chosen_indices = np.random.choice(outer_disc, size=100, replace=False)
for index in chosen_indices:
vec_a = hp.pix2vec(ar_map_nside, index)
inner_disc = hp.query_disc(ar_map_nside, vec=vec_a, radius=max_angular_separation.to(u.rad).value)
if enable_intersection:
inner_disc = np.intersect1d(inner_disc, outer_disc, assume_unique=True)
vec_b = hp.pix2vec(ar_map_nside, inner_disc)
ar_ang_dist_with_zero = hp.rotator.angdist(vec_b, vec_a)
a = index
b = inner_disc[ar_ang_dist_with_zero > 0]
ar_ang_dist = ar_ang_dist_with_zero[ar_ang_dist_with_zero > 0]
ar_bins_float = ar_ang_dist / max_bin_angle * num_bins # type: np.ndarray
ar_bins = ar_bins_float.astype(int)
# TODO: filter NaNs earlier so that they don't decrease the correlation
ar_pair_product = np.nan_to_num((ar_map[a] - outer_disc_mean) * (ar_map[b] - outer_disc_mean))
ar_pair_weights = (ar_map_unc[a] * ar_map_unc[b]) ** (-2) # type: np.ndarray
ar_product += np.bincount(ar_bins, weights=ar_pair_product * ar_pair_weights, minlength=num_bins)
ar_weights += np.bincount(ar_bins, weights=ar_pair_weights, minlength=num_bins)
ar_counts += np.bincount(ar_bins, minlength=num_bins)
return ar_product, ar_weights, ar_counts
# load Planck map on the root node
ar_map_shape = None
ar_map_0 = None
ar_map_unc_0 = None
ar_map_0_log = None
if comm.rank == 0:
ar_map_0 = hp.fitsfunc.read_map("../../data/COM_CompMap_Dust-DL07-AvMaps_2048_R2.00.fits", field=0)
ar_map_unc_0 = hp.fitsfunc.read_map("../../data/COM_CompMap_Dust-DL07-AvMaps_2048_R2.00.fits", field=1)
# ar_map_0_log = np.log(ar_map_0)
np.clip(ar_map_unc_0, 1e-2, np.inf, ar_map_unc_0)
# optionally add a mock signal to the map
mock = False
if mock:
ar_mock = ar_map_0
nside_signal = 32
radius = hp.nside2resol(nside_signal) / 2 / np.sqrt(2)
for i in range(hp.nside2npix(nside_signal)):
vec1 = hp.pix2vec(nside_signal, i)
mask = hp.query_disc(ar_map_nside, vec=vec1, radius=radius)
ar_mock[mask] *= 100
ar_mock /= np.sqrt(100)
# send the map to all other nodes
ar_map_local = comm.bcast(ar_map_0)
ar_map_unc_local = comm.bcast(ar_map_unc_0)
# initialize correlation bins
num_bins = 100
ar_product_total = np.zeros(shape=(10, num_bins))
ar_weights_total = np.zeros(shape=(10, num_bins))
ar_counts_total = np.zeros(shape=(10, num_bins))
# sky scan parameters
num_directions = 18
stripe_step_deg = 10
for current_direction_index in range(num_directions):
# start with a location within the BOSS field
center_ra = 180.
center_dec = 30.
center_coord = SkyCoord(ra=center_ra * u.degree, dec=center_dec * u.degree)
center_galactic = center_coord.galactic
# scan the sky in galactic coordinates:
galactic_l = Longitude(center_galactic.l + 0 * u.degree)
# currently we just change the latitude
galactic_b = Latitude(
center_galactic.b - (current_direction_index - num_directions * 0.0) * stripe_step_deg * u.degree)
r_print("galactic l-value:", galactic_l.value)
r_print("galactic b-value:", galactic_b.value)
# convert galactic coordinates to healpy theta/phi and create a unit vector:
center_theta, center_phi = ra_dec2ang(ra=galactic_l.value, dec=galactic_b.value)
vec = hp.ang2vec(theta=center_theta, phi=center_phi)
r_print("unit vector:", vec)
# get all pixels in the current disc
current_disc_radius = 10 / 180. * np.pi
disc = hp.query_disc(ar_map_nside, vec=vec, radius=current_disc_radius)
r_print("disc has ", disc.shape[0], " pixels")
max_angle_fixed = 5. / 180. * np.pi
disc_mean = np.nanmean(ar_map_local[disc])
ar_dec, ar_ra = hp.pix2ang(ar_map_nside, disc)
global_max_angular_separation = 5. * u.degree # type: u.Quantity
# work in iterations so that we don't use too much memory
for i in range(2):
# initialize bins for reduce operation
ar_product_reduce = np.zeros(shape=num_bins)
ar_weights_reduce = np.zeros(shape=num_bins)
ar_counts_reduce = np.zeros(shape=num_bins)
ar_product_local, ar_weights_local, ar_counts_local = main_loop(
ar_map=ar_map_local, ar_map_unc=ar_map_unc_local,
max_bin_angle=max_angle_fixed, max_angular_separation=global_max_angular_separation,
outer_disc_mean=disc_mean, outer_disc=disc
)
# sum up bins from all nodes
comm.Reduce(
[ar_product_local, MPI.DOUBLE],
[ar_product_reduce, MPI.DOUBLE],
op=MPI.SUM, root=0)
comm.Reduce(
[ar_weights_local, MPI.DOUBLE],
[ar_weights_reduce, MPI.DOUBLE],
op=MPI.SUM, root=0)
comm.Reduce(
[ar_counts_local, MPI.DOUBLE],
[ar_counts_reduce, MPI.DOUBLE],
op=MPI.SUM, root=0)
if comm.rank == 0:
r_print("Finished direction ", current_direction_index, ", Iteration ", i)
ar_product_total[current_direction_index] += ar_product_reduce
ar_weights_total[current_direction_index] += ar_weights_reduce
r_print("total weight: ", ar_weights_total[current_direction_index].sum())
r_print("total count: ", ar_counts_total[current_direction_index].sum())
angular_separation_bins = np.arange(num_bins, dtype=float) / num_bins * max_angle_fixed * 180. / np.pi
np.savez(
'../../data/planck_dust_correlation.npz', angular_separation_bins=angular_separation_bins,
ar_product_total=ar_product_total, ar_weights_total=ar_weights_total)