-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextractor_help_function.py
489 lines (413 loc) · 16.8 KB
/
extractor_help_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
import os
import sys
import numpy as np
import pandas as pd
import time
import json
from datetime import datetime
from config import configs
import contextlib
import urllib.request
from io import BytesIO
from typing import Any, Tuple, List
from zipfile import ZipFile
import warnings
warnings.filterwarnings("ignore")
import urllib3,requests
from multiprocessing.pool import ThreadPool as Pool
urllib3.disable_warnings()
pd.set_option('display.max_rows', 10000)
### http://download.geonames.org/export/zip/
STORAGE_DIR = os.environ.get(
"PGEOCODE_DATA_DIR", os.path.join(os.path.expanduser("~"), "pgeocode_data")
)
# A list of download locations. If the first URL fails, following ones will
# be used.
DOWNLOAD_URL = [
"https://download.geonames.org/export/zip/{country}.zip",
"https://symerio.github.io/postal-codes-data/data/geonames/{country}.txt",
]
DATA_FIELDS = [
"country_code",
"postal_code",
"place_name",
"state_name",
"state_code",
"county_name",
"county_code",
"community_name",
"community_code",
"latitude",
"longitude",
"accuracy",
]
COUNTRIES_VALID = ["AD","AR","AS","AT","AU","AX","BD","BE","BG","BM","BR","BY","CA","CH","CO","CR","CZ",
"DE","DK","DO","DZ","ES","FI","FO","FR","GB","GF","GG","GL","GP","GT","GU","HR","HU","IE",
"IM","IN","IS","IT","JE","JP","LI","LK","LT","LU","LV","MC","MD","MH","MK","MP","MQ","MT",
"MX","MY","NC","NL","NO","NZ","PH","PK","PL","PM","PR","PT","RE","RO","RU","SE","SI","SJ",
"SK","SM","TH","TR","UA","US","UY","VA","VI","WF","YT","ZA","CL","KR"]
@contextlib.contextmanager
def _open_extract_url(url: str, country: str) -> Any:
"""Download contents for a URL
If the file has a .zip extension, open it and extract the country
Returns the opened file object.
"""
with urllib.request.urlopen(url) as res:
with BytesIO(res.read()) as reader:
if url.endswith(".zip"):
with ZipFile(reader) as fh_zip:
with fh_zip.open(country.upper() + ".txt") as fh:
yield fh
else:
yield reader
@contextlib.contextmanager
def _open_extract_cycle_url(urls: List[str], country: str) -> Any:
"""Same as _open_extract_url but cycle through URLs until one works
We start by opening the first URL in the list, and if fails
move to the next, until one works or the end of list is reached.
"""
if not isinstance(urls, list) or not len(urls):
raise ValueError(f"urls={urls} must be a list with at least one URL")
err_msg = f"Provided download URLs failed {{err}}: {urls}"
for idx, val in enumerate(urls):
try:
with _open_extract_url(val, country) as fh:
yield fh
# Found a working URL, exit the loop.
break
except urllib.error.HTTPError as err: # type: ignore
if idx == len(urls) - 1:
raise
warnings.warn(
f"Download from {val} failed with: {err}. "
"Trying next URL in DOWNLOAD_URL list.",
UserWarning,
)
else:
raise ValueError(err_msg)
class Nominatim:
"""Query geographical location from a city name or a postal code
Parameters
----------
country: str, default='fr'
country code. See the documentation for a list of supported countries.
unique: bool, default=True
Create unique postcode index, merging all places with the same postcode
into a single entry
"""
def __init__(self, country: str = "fr", unique: bool = True):
country = country.upper()
if country not in COUNTRIES_VALID:
raise ValueError(
(
"country={} is not a known country code. "
"See the README for a list of supported "
"countries"
).format(country)
)
if country == "AR":
warnings.warn(
"The Argentina data file contains 4-digit postal "
"codes which were replaced with a new system "
"in 1999."
)
self.country = country
self._data_path, self._data = self._get_data(country)
if unique:
self._data_frame = self._index_postal_codes()
else:
self._data_frame = self._data
self.unique = unique
@staticmethod
def _get_data(country: str) -> Tuple[str, pd.DataFrame]:
"""Load the data from disk; otherwise download and save it"""
data_path = os.path.join(STORAGE_DIR, country.upper() + ".txt")
if os.path.exists(data_path):
data = pd.read_csv(data_path, dtype={"postal_code": str})
else:
download_urls = [
val.format(country=country) for val in DOWNLOAD_URL
]
with _open_extract_cycle_url(download_urls, country) as fh:
data = pd.read_csv(
fh,
sep="\t",
header=None,
names=DATA_FIELDS,
dtype={"postal_code": str},
)
if not os.path.exists(STORAGE_DIR):
os.mkdir(STORAGE_DIR)
data.to_csv(data_path, index=None)
return data_path, data
def _index_postal_codes(self) -> pd.DataFrame:
""" Create a dataframe with unique postal codes """
data_path_unique = self._data_path.replace(".txt", "-index.txt")
if os.path.exists(data_path_unique):
data_unique = pd.read_csv(
data_path_unique, dtype={"postal_code": str}
)
else:
# group together places with the same postal code
df_unique_cp_group = self._data.groupby("postal_code")
data_unique = df_unique_cp_group[["latitude", "longitude"]].mean()
valid_keys = set(DATA_FIELDS).difference(
["place_name", "lattitude", "longitude", "postal_code"]
)
data_unique["place_name"] = df_unique_cp_group["place_name"].apply(
lambda x: ", ".join([str(el) for el in x])
)
for key in valid_keys:
data_unique[key] = df_unique_cp_group[key].first()
data_unique = data_unique.reset_index()[DATA_FIELDS]
data_unique.to_csv(data_path_unique, index=None)
return data_unique
def _normalize_postal_code(self, codes: pd.DataFrame) -> pd.DataFrame:
"""Normalize postal codes to the values contained in the database
For instance, take into account only first letters when applicable.
Takes in a pd.DataFrame
"""
codes["postal_code"] = codes.postal_code.str.upper()
if self.country in ["GB", "IE", "CA"]:
codes["postal_code"] = codes.postal_code.str.split().str.get(0)
else:
pass
return codes
def query_postal_code(self, codes):
"""Get locations information from postal codes
Parameters
----------
codes: array, list or int
an array of strings containing postal codes
Returns
-------
df : pandas.DataFrame
a pandas.DataFrame with the relevant information
"""
if isinstance(codes, int):
codes = str(codes)
if isinstance(codes, str):
codes = [codes]
single_entry = True
else:
single_entry = False
if not isinstance(codes, pd.DataFrame):
codes = pd.DataFrame(codes, columns=["postal_code"])
codes = self._normalize_postal_code(codes)
response = pd.merge(
codes, self._data_frame, on="postal_code", how="left"
)
if self.unique and single_entry:
response = response.iloc[0]
return response
def query_location(self, name):
"""Get locations information from a community/minicipality name"""
pass
class GeoDistance(Nominatim):
"""Distance calculation from a city name or a postal code
Parameters
----------
data_path: str
path to the dataset
error: str, default='ignore'
how to handle not found elements. One of
'ignore' (return NaNs), 'error' (raise an exception),
'nearest' (find from nearest valid points)
"""
def __init__(self, country: str = "fr", errors: str = "ignore"):
super().__init__(country)
def query_postal_code(self, x, y):
"""Get distance (in km) between postal codes
Parameters
----------
x: array, list or int
a list of postal codes
y: array, list or int
a list of postal codes
Returns
-------
d : array or int
the calculated distances
"""
if isinstance(x, int):
x = str(x)
if isinstance(y, int):
y = str(y)
if isinstance(x, str):
x = [x]
single_x_entry = True
else:
single_x_entry = False
df_x = super().query_postal_code(x)
if isinstance(y, str):
y = [y]
single_y_entry = True
else:
single_y_entry = False
df_y = super().query_postal_code(y)
x_coords = df_x[["latitude", "longitude"]].values
y_coords = df_y[["latitude", "longitude"]].values
if x_coords.shape[0] == y_coords.shape[0]:
pass
elif x_coords.shape[0] == 1:
x_coords = np.repeat(x_coords, y_coords.shape[0], axis=0)
elif y_coords.shape[0] == 1:
y_coords = np.repeat(y_coords, x_coords.shape[0], axis=0)
else:
raise ValueError("x and y must have the same number of elements")
dist = haversine_distance(x_coords, y_coords)
if single_x_entry and single_y_entry:
return dist[0]
else:
return dist
# Copied from geopy
# IUGG mean earth radius in kilometers, from
# https://en.wikipedia.org/wiki/Earth_radius#Mean_radius. Using a
# sphere with this radius results in an error of up to about 0.5%.
EARTH_RADIUS = 6371.009
def haversine_distance(x, y):
"""Haversine (great circle) distance
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
Parameters
----------
x : array, shape=(n_samples, 2)
the first list of coordinates (degrees)
y : array: shape=(n_samples, 2)
the second list of coordinates (degress)
Returns
-------
d : array, shape=(n_samples,)
the distance between corrdinates (km)
References
----------
https://en.wikipedia.org/wiki/Great-circle_distance
"""
x_rad = np.radians(x)
y_rad = np.radians(y)
d = y_rad - x_rad
dlat, dlon = d.T
x_lat = x_rad[:, 0]
y_lat = y_rad[:, 0]
a = (
np.sin(dlat / 2.0) ** 2
+ np.cos(x_lat) * np.cos(y_lat) * np.sin(dlon / 2.0) ** 2
)
c = 2 * np.arcsin(np.sqrt(a))
return EARTH_RADIUS * c
'''
*Version: 1.0 Published: 2020/02/11* Source: [NASA POWER](https://power.larc.nasa.gov/)
POWER API Multipoint Download (CSV)
This is an overview of the process to request data from multiple data points from the POWER API.
'''
def downloading_nasa_data(Collection):
# query_url, file_path = Collection
query_url, country, postCode = Collection
main_response = requests.get(url=query_url, verify=False)
json_response = json.loads(main_response.text)
dataDf = pd.DataFrame.from_dict(json_response['properties']['parameter'])
dataDf = dataDf.reset_index().rename(columns={"index": "Date"})
dataDf.Date = pd.to_datetime(dataDf.Date, format='%Y%m%d').astype(str)
dataDf['Country'] = country
dataDf['PostCode'] = postCode
time.sleep(0)
return dataDf
class Process():
def __init__(self, latitude_longitude, startDate, endDate, parameters):
self.processes = 5 # Please do not go more than 10 concurrent requests.
self.query_url = r"https://power.larc.nasa.gov/api/temporal/daily/point?parameters={parameters}&community=RE&longitude={longitude}&latitude={latitude}&start={startDate}&end={endDate}&format=JSON"
self.latitude_longitude = latitude_longitude
self.startDate = startDate
self.endDate = endDate
self.parameters = parameters
self.messages = []
def execute(self):
points = []
for latitude, longitude, country, postCode in self.latitude_longitude:
each_query_url = self.query_url.format(startDate=self.startDate.replace('-', ''),
endDate=self.endDate.replace('-', ''), longitude=longitude,
latitude=latitude, parameters=self.parameters)
points.append((each_query_url, country, postCode))
pool = Pool(self.processes)
x = pool.imap_unordered(downloading_nasa_data, points)
dfs = []
for i, df in enumerate(x, 1):
dfs.append(df)
sys.stderr.write('\rExporting {0:0.2%}'.format(i / len(points)))
return dfs
def getCoordinates(citiesDf):
# import pgeocode
allGeos = pd.DataFrame()
for country in citiesDf.Country.unique():
if country in COUNTRIES_VALID:
nomi = Nominatim(country)
geo = nomi.query_postal_code(citiesDf[citiesDf.Country == country].PostCode.tolist())
geo['postal_code'] = citiesDf[citiesDf.Country == country].PostCode.tolist()
geo = geo.groupby(["country_code", "place_name", "postal_code"], as_index=False).head(1).reset_index(
drop=True)
allGeos = allGeos.append(geo[["postal_code", "country_code", "latitude", "longitude"]])
allGeos = allGeos.dropna()
allGeos = allGeos.rename(columns={'postal_code': 'PostCode', 'country_code': 'Country'})
print("Total countries:", citiesDf.Country.nunique())
print("Total postal_codes:", citiesDf.PostCode.nunique())
print()
print("Total countries with geo locations:", allGeos.Country.nunique())
print("Total postal_codes with geo locations:", allGeos.PostCode.nunique())
return allGeos
### Get Weather
def getNasaPower(startdate, enddate, lat, lon, parameters):
### Weather temperature indicators:
### TS Earth - Skin Temperature
### T2M_MIN - Minimum Temperature at 2 Meters
### T2M_MAX - Maximum Temperature at 2 Meters
### T2M - Temperature at 2 Meters
data = []
#url = 'https://power.larc.nasa.gov/cgi-bin/v1/DataAccess.py?&request=execute&identifier=SinglePoint¶meters=TS,T2M_MAX,T2M_MIN,T2M&startDate={2}&endDate={3}&userCommunity=SSE&tempAverage=DAILY&outputList=CSV&lat={0}&lon={1}'
url = r"https://power.larc.nasa.gov/api/temporal/daily/point?parameters={4}&community=RE&longitude={0}&latitude={1}&start={2}&end={3}&format=JSON"
url2 = url.format(lat, lon, startdate.replace("-", ""), enddate.replace("-", ""), parameters)
try:
response = requests.get(url2)
if len(response.text) > 1:
data = json.loads(response.text)
#data = data['features'][0]['properties']['parameter']
data = data['properties']['parameter']
time.sleep(0.01)
except:
pass
return data
### Convert to dataframe
def convertToDF(data):
dataDf = pd.DataFrame()
for item in data:
temp = pd.DataFrame(item)
dataDf = dataDf.append(temp)
dataDf = dataDf.reset_index()
dataDf = dataDf.rename(columns={"index": "Date"})
dataDf.Date = pd.to_datetime(dataDf.Date, format='%Y%m%d').astype(str)
return dataDf
def getWeather_multiporcess(geoCountry, country):
st1 = datetime.now()
if not configs.usePostCodes:
geoCountry['PostCode'] = 'unknown'
latitude_longitude = list(
geoCountry[['latitude', 'longitude', 'Country', 'PostCode']].itertuples(index=False, name=None))
temprerature_data_list_df = Process(latitude_longitude=latitude_longitude, startDate=configs.startdate,
endDate=configs.enddate, parameters=configs.parameters).execute()
temperature_data_df = pd.concat(temprerature_data_list_df)
print(" ", country, "Time: ", datetime.now() - st1, ' Requested: ', geoCountry.shape[0], " Received: ",
len(temprerature_data_list_df))
return temperature_data_df
def getWeather_in_loop(geoCountry, country):
res = []
for i, row in geoCountry.head(1).iterrows():
st2 = datetime.now()
temp = getNasaPower(startdate=configs.startdate, enddate=configs.enddate,
lat=row.latitude, lon=row.longitude, parameters=configs.parameters)
temp['Country'] = country
if configs.usePostCodes:
temp['PostCode'] = str(row.PostCode)
if len(temp) > 0:
res.append(temp)
print(row.Country, row.PostCode, "Time: ", datetime.now() - st2, ' Requested: ', len(temp))
temperature_data_df = convertToDF(data=res)
return temperature_data_df