-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathcd_metric.py
267 lines (228 loc) · 11.1 KB
/
cd_metric.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
"""Pseudo-likelihood inspired by collective detection / direct positioning."""
class CDMetric:
"""Calculate approximate pseudo-likelihood and its upper bound.
Simplified pseudo-likelihood (CD metric) to speed up calculations.
CD = collective detection.
Based on a 4D hypothesis,i.e., 3D position and coarse time.
Automatic omptimization of the 5th variable, the common bias.
Doppler shifts taken into account.
Speed-up relies on proper intialization when the constructor is called.
Methods for the pseudo-likelihood of a hypothesis and for an estimated
upper bound of the pseudo-likelihoods in a cube with a given diameter.
Author: Jonas Beuchert
"""
def __init__(self, signal, sampling_freq, IF, vis_sat_GPS=None,
doppler_GPS=None, vis_sat_Galileo=None, doppler_Galileo=None,
exponent=2, ms_to_process=1):
"""Initialise likelihood calculation.
Convert signal from intermediate frequency (IF) to baseband considering
Doppler shifts. Create local replicas of C/A codes of visible
satellites. Correlate baseband signals with C/A codes. Store results to
speed-up likelihood calculations.
Consider GPS only, Galileo only, or GPS and Galileo.
Inputs:
signal - GNSS snapshot
sampling_freq - Sampling frequency [Hz]
IF - Intermediate frequency [Hz]
vis_sat_GPS - Indices of GPS satellites (PRNs), which are expected to
be visible, or None if Galileo only
doppler_GPS - Doppler shifts of the visible GPS satellites [Hz], or
None if Galileo only
vis_sat_Galileo - Indices of Galileo satellites (PRNs), which are
expected to be visible, or None if GPS only
doppler_Galileo - Doppler shifts of the visible Galileo satellites
[Hz] or None if Galileo only
exponent - Exponent of pseudo-likelihodd function, default=2
ms_to_process - Number of milliseconds to use (non-coherent
integration), default=1
"""
import eph_util as ep
import numpy as np
# Number of visible satellites
if vis_sat_GPS is not None and doppler_GPS is not None:
nSatGPS = vis_sat_GPS.shape[0]
else:
nSatGPS = 0
if vis_sat_Galileo is not None and doppler_Galileo is not None:
nSatGalileo = vis_sat_Galileo.shape[0]
else:
nSatGalileo = 0
if nSatGPS == 0 and nSatGalileo == 0:
raise Exception("Missing input data.")
nSat = nSatGPS + nSatGalileo
# Store sampling frequency
self.samplingFreq = sampling_freq
# C/A code frequency [Hz]
codeFreqBasis = 1023000.0
# C/A code length [samples]
if nSatGalileo > 0:
codeLength = 4092.0
else:
codeLength = 1023.0
# Find number of samples per spreading code
samplesPerCode = np.int(np.round((sampling_freq / (codeFreqBasis /
codeLength))))
# Find number of samples per millisecond
samplesPerMs = np.int((np.round(sampling_freq * 1e-3)))
# Find number of samples in signal snapshot
samplesInSignal = samplesPerMs * ms_to_process
# Create vector of data with correct length (1 ms)
signal = signal[0: samplesInSignal]
signal = np.reshape(signal, (ms_to_process, samplesPerMs))
# Find time constants
ts = 1.0 / self.samplingFreq # Sampling period [s]
tc = 1.0 / codeFreqBasis # C/A chip period [s]
# Generate all C/A codes and sample them according to sampling freq.
# For Galileo, seperately generate data and pilot channel
# Prepare output matrix to speed up function
caCodesTable = np.empty((nSat+nSatGalileo, samplesPerCode))
for k in range(nSatGPS):
PRN = vis_sat_GPS[k] # Index (PRN) of GPS satellite
# Generate CA code for given PRN
caCode = ep.generate_ca_code(PRN)
if nSatGalileo > 0:
# Adjust length to Galileo code (4 ms)
caCode = np.tile(caCode, 4)
# Digitizing
# Make index array to read C/A code values
codeValueIndex = np.ceil(ts * np.arange(1, samplesPerCode + 1) / tc
).astype(int) - 1
# Correct the last index (due to number rounding issues)
codeValueIndex[-1] = codeLength - 1
# Make the digitized version of the C/A code
caCodesTable[k] = caCode[codeValueIndex]
for k in range(nSatGalileo):
PRN = vis_sat_Galileo[k] # Index (PRN) of Galileo sat
# Make the digitized version of the E1B code (data)
caCodesTable[nSatGPS + k] = ep.generate_e1_code(PRN,
sampling_freq)
# Make the digitized version of the E1C code (pilot)
caCodesTable[nSat + k] = ep.generate_e1_code(PRN, sampling_freq,
pilot=True)
# Repeat C/A code table to match number of milliseconds in snapshot
caCodesTable = np.tile(caCodesTable, (ms_to_process, 1))
# Find phase points of the local carrier wave
phasePoints = np.arange(samplesPerMs) * 2.0 * np.pi * ts
# Shift raw signal to baseband
# Prepare the output matrix to speed up function
# (Zero padding if samplesPerCode > 1 ms)
baseBandSignal = np.zeros(((nSat+nSatGalileo)*ms_to_process,
samplesPerCode), dtype=np.complex64)
for ms_idx in range(ms_to_process):
for k in range(nSat):
# Generate carrier wave frequency
if k < nSatGPS:
frq = IF + doppler_GPS[k] # Do not ignore Doppler shift
else:
frq = IF + doppler_Galileo[k - nSatGPS]
# Generate local sine and cosine
carrier = np.exp(1j * frq * phasePoints)
# "Remove carrier" from the signal
baseband_signal_curr = carrier * signal[ms_idx]
if k < nSatGPS:
# GPS L1
baseBandSignal[ms_idx*(nSat+nSatGalileo)+k,
:samplesPerMs] = baseband_signal_curr
else:
chunk_idx = np.mod(ms_idx, 4)
# Galileo E1B (data)
baseBandSignal[ms_idx*(nSat+nSatGalileo)+k,
samplesPerMs*chunk_idx:
samplesPerMs*(chunk_idx+1)] \
= baseband_signal_curr
# Galileo E1C (pilot)
baseBandSignal[ms_idx*(nSat+nSatGalileo)+k+nSatGalileo,
samplesPerMs*chunk_idx:
samplesPerMs*(chunk_idx+1)] \
= baseband_signal_curr
# Correlate signals (to square or not to square?)
corrTable = np.abs(np.fft.ifft(
np.fft.fft(caCodesTable) * np.conj(np.fft.fft(baseBandSignal)))
)**exponent
# Sum correlograms for each satellite
self.corrTable = np.zeros((nSat+nSatGalileo, samplesPerCode))
for ms_idx in range(ms_to_process):
self.corrTable = (self.corrTable
+ corrTable[(ms_idx*(nSat+nSatGalileo)):
((ms_idx+1)*(nSat+nSatGalileo))])
# Sum over Galileo channels
self.corrTable = np.vstack((self.corrTable[:nSatGPS],
self.corrTable[nSatGPS:nSat]
+ self.corrTable[nSat:nSat+nSatGalileo]))
self.corrTableUncert = []
self.uncertList = []
def likelihood(self, code_phase):
"""Pseudo-likelihood of given set of C/A code phases.
Input:
code_phase - Expected C/A code phases of visible sats [s]
Output:
cd - Likelihood for observing signal given C/A code phases
"""
import numpy as np
# Pseudo-likelihood initialization
cd = 0.0
# Pseudo-likelihood calculation
# For all listed PRN numbers ...
for k in range(code_phase.shape[0]):
# Shift correlation of C/A code replica with signal by code
# phase and sum correlograms
cd = cd + np.roll(self.corrTable[k], shift=-np.int(np.round(
code_phase[k] * self.samplingFreq)))
# Account for (unknown) common bias
return np.max(cd)
def max_likelihood(self, code_phase, diagonal):
"""Pseudo-likelihood of given set of C/A code phases.
Input:
code_phase - Expected C/A code phases of visible sats [s]
diagonal - Diameter of 3D spatial cube that covers uncertainty [m]
Output:
cd - Approximated upper bound of pseudo-likelihood in uncertainty
cube
"""
import numpy as np
# Max. pseudo-likelihood initialization
cd = 0.0
# Speed of light [m/s]
c = 299792458.0
# Maximum absolute deviation of C/A code phase in search space
# with respect to center [samples]
n = np.int(np.round(diagonal / c * self.samplingFreq / 2.0))
# If uncertainty level is evaluated for the 1st time, create
# table for later use
if not self.uncertList or n < self.uncertList[-1]:
# Remember that this uncertainty level has been evaluated
self.uncertList.append(n)
uncertInd = len(self.uncertList) - 1
# Max-filter correlograms
nSats = code_phase.shape[0]
temp_table = np.empty_like(self.corrTable)
for k in range(nSats):
temp_table[k] = self.max_filter(self.corrTable[k], n)
self.corrTableUncert.append(temp_table)
else:
# If uncertainty level has been evaluated already, find
# index in table with max-filtered correlograms
uncertInd = self.uncertList.index(n)
for k in range(code_phase.shape[0]):
# Shift max-filtered correlogram by code phase and sum
cd = cd + np.roll(self.corrTableUncert[uncertInd][k], - np.int(
np.round(code_phase[k] * self.samplingFreq)))
# cd = np.array([np.roll(table_row, shift)
# for table_row, shift in
# zip(self.corrTableUncert[uncertInd],
# - np.round(code_phase * self.samplingFreq).astype(
# int))
# ])
# cd = np.sum(cd, axis=0)
# Account for (unknown) common bias
return np.max(cd)
def max_filter(self, x, n):
"""1D max-filter with wrapping.
Inputs:
x - Raw signal (row vector)
n - Number of strides in each direction
Output:
filteredX - Maximum-filtered signal
"""
import scipy.ndimage as si
return si.maximum_filter(x, 2 * n + 1, mode="wrap")