-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransformer.py
983 lines (845 loc) · 41.4 KB
/
transformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
#!/usr/bin/env python3
"""Base of plot-level RGB transformer
"""
import argparse
import datetime
import logging
import math
import numbers
import os
import random
import time
from typing import Optional, Union
import osgeo
import numpy as np
from agpypeline import algorithm, entrypoint
from agpypeline.environment import Environment
from agpypeline.checkmd import CheckMD
from osgeo import gdal, ogr, osr
import algorithm_rgb
from configuration import ConfigurationRgbBase
# Known image file extensions
KNOWN_IMAGE_FILE_EXTS = ['.tif', '.tiff', '.jpg']
# Number of tries to open a CSV file before we give up
MAX_CSV_FILE_OPEN_TRIES = 10
# Maximum number of seconds a single wait for file open can take
MAX_FILE_OPEN_SLEEP_SEC = 30
# Array of trait names that should have array values associated with them
TRAIT_NAME_ARRAY_VALUE = ['canopy_cover', 'site']
# Mapping of default trait names to fixed values
TRAIT_NAME_MAP = {
'local_datetime': None,
'access_level': '2',
'species': '',
'site': '',
'citation_author': '',
'citation_year': '',
'citation_title': '',
'method': ''
}
# Trait names arrays
CSV_TRAIT_NAMES = ['species', 'site', 'timestamp', 'citation_author', 'citation_year', 'citation_title']
GEO_TRAIT_NAMES = ['site', 'trait', 'lat', 'lon', 'dp_time', 'source', 'value', 'timestamp']
BETYDB_TRAIT_NAMES = ['local_datetime', 'access_level', 'species', 'site', 'citation_author', 'citation_year', 'citation_title',
'method']
# Used to generate random numbers
RANDOM_GENERATOR = None
# The LAT-LON EPSG code to use
LAT_LON_EPSG_CODE = 4326
# Names of files generated
FILE_NAME_CSV = "rgb_plot.csv"
FILE_NAME_GEO_CSV = "rgb_plot_geo.csv"
FILE_NAME_BETYDB_CSV = "rgb_plot_betydb.csv"
# The number of significant digits to keep
SIGNIFICANT_DIGITS = 3
class __internal__:
"""Class containing functions for this file only
"""
# pylint: disable=too-many-public-methods
def __init__(self):
"""Perform class level initialization
"""
@staticmethod
def get_algorithm_definition_bool(variable_name: str, default_value: bool = False) -> bool:
"""Returns the value of the algorithm definition as a boolean value
Arguments:
variable_name: the name of the variable to look up
default_value: the default value to return if the variable is not defined or is None
"""
value = False
if hasattr(algorithm_rgb, variable_name):
temp_name = getattr(algorithm_rgb, variable_name)
if temp_name:
value = True
elif temp_name is not None:
value = False
return value if value else default_value
@staticmethod
def get_algorithm_definition_str(variable_name: str, default_value: str = '') -> str:
"""Returns the value of the string variable found in algorithm_rgb
Arguments:
variable_name: the name of the definition to find
default_value: the default value to return if the variable isn't defined, is not a string, or has an empty value
Notes:
If the variable can't be determined, the default value is returned
"""
value = None
if hasattr(algorithm_rgb, variable_name):
temp_name = getattr(algorithm_rgb, variable_name)
if isinstance(temp_name, str):
value = temp_name.strip()
return value if value else default_value
@staticmethod
def get_algorithm_name() -> str:
"""Convenience function for returning the name of the algorithm
"""
return __internal__.get_algorithm_definition_str('ALGORITHM_NAME', 'unknown algorithm')
@staticmethod
def get_algorithm_variable_list(definition_name: str) -> list:
"""Returns a list containing the variable information defined by the algorithm
Arguments:
definition_name: name of the variable definition to look up
Return:
A list of variable strings
Note:
Assumes that multiple variable-related strings are comma separated
"""
if not hasattr(algorithm_rgb, definition_name):
raise RuntimeError("Unable to find %s defined in algorithm_rgb code" % definition_name)
names = getattr(algorithm_rgb, definition_name).strip()
if not names:
raise RuntimeError("Empty %s definition specified in algorithm_rgb code" % definition_name)
return names.split(',')
@staticmethod
def get_algorithm_variable_labels() -> list:
"""Returns a list containing all the variable names defined by the algorithm
Return:
A list of variable names
"""
return_labels = []
if hasattr(algorithm_rgb, 'VARIABLE_LABELS'):
labels = getattr(algorithm_rgb, 'VARIABLE_LABELS').strip()
if labels:
return_labels = labels.split(',')
return return_labels
@staticmethod
def recursive_metadata_search(metadata_list: list, search_key: str, special_key: str = None) -> str:
"""Performs a depth-first search for the key in the metadata and returns the found value
Arguments:
metadata_list: the metadata in which to look
search_key: the key to look for in the metadata
special_key: optional special key to look up the key under. If specified and found, the found value takes precedence
Return:
Returns the found key value, or an empty string
Notes:
The metadata is searched recursively for the key. If a key is found under the special key, it will be
returned regardless of whether there's a key found elsewhere in the metadata
"""
top_found_name = ''
return_found_name = None
for metadata in metadata_list:
for key in metadata:
if key == search_key:
top_found_name = metadata[key]
if special_key and key == special_key:
if isinstance(metadata[key], dict):
temp_found_name = __internal__.recursive_metadata_search([metadata[key]], search_key, special_key)
if temp_found_name:
return_found_name = str(temp_found_name)
break
elif isinstance(metadata[key], dict):
temp_found_name = __internal__.recursive_metadata_search([metadata[key]], search_key, special_key)
if temp_found_name:
top_found_name = str(temp_found_name)
return return_found_name if return_found_name is not None else top_found_name
@staticmethod
def find_metadata_value(metadata_list: list, key_terms: list) -> str:
"""Returns the first found value associated with a key
Arguments:
metadata_list: the metadata to search
key_terms: the keys to look for
Returns:
Returns the found value or an empty string
"""
for one_key in key_terms:
value = __internal__.recursive_metadata_search(metadata_list, one_key)
if value:
return value
return ''
@staticmethod
def prepare_algorithm_metadata() -> tuple:
"""Prepares metadata with algorithm information
Return:
Returns a tuple with the name of the algorithm and a dictionary with information on the algorithm
"""
return (__internal__.get_algorithm_definition_str('ALGORITHM_NAME', 'unknown'),
{
'version': __internal__.get_algorithm_definition_str('VERSION', 'x.y'),
'traits': __internal__.get_algorithm_definition_str('VARIABLE_NAMES', ''),
'units': __internal__.get_algorithm_definition_str('VARIABLE_UNITS', ''),
'labels': __internal__.get_algorithm_definition_str('VARIABLE_LABELS', '')
})
@staticmethod
def image_get_geobounds(filename: str) -> list:
"""Uses gdal functionality to retrieve rectilinear boundaries from the file
Args:
filename: path of the file to get the boundaries from
Returns:
The upper-left and calculated lower-right boundaries of the image in a list upon success.
The values are returned in following order: min_y, max_y, min_x, max_x. A list of numpy.nan
is returned if the boundaries can't be determined
"""
try:
src = gdal.Open(filename)
ulx, xres, _, uly, _, yres = src.GetGeoTransform()
lrx = ulx + (src.RasterXSize * xres)
lry = uly + (src.RasterYSize * yres)
min_y = min(uly, lry)
max_y = max(uly, lry)
min_x = min(ulx, lrx)
max_x = max(ulx, lrx)
return [min_y, max_y, min_x, max_x]
except Exception as ex:
logging.warning("[image_get_geobounds] Exception caught processing file: %s", filename)
logging.warning("[image_get_geobounds] Exception: %s", str(ex))
return [np.nan, np.nan, np.nan, np.nan]
@staticmethod
def get_epsg(filename: str) -> Optional[str]:
"""Returns the EPSG of the geo-referenced image file
Args:
filename(str): path of the file to retrieve the EPSG code from
Return:
Returns the found EPSG code, or None if it's not found or an error ocurred
"""
try:
src = gdal.Open(filename)
proj = osr.SpatialReference(wkt=src.GetProjection())
return proj.GetAttrValue('AUTHORITY', 1)
except Exception as ex:
logging.warning("[get_epsg] Exception caught processing file: %s", filename)
logging.warning("[get_epsg] Exception: %s", str(ex))
return None
@staticmethod
def get_centroid_latlon(filename: str) -> Optional[ogr.Geometry]:
"""Returns the centroid of the geo-referenced image file as an OGR point
Arguments:
filename: the path to the file to get the centroid from
Returns:
Returns the centroid of the geometry loaded from the file in lat-lon coordinates
Exceptions:
RuntimeError is raised if the image is not a geo referenced image with an EPSG code,
the EPSG code is not supported, or another problems occurs
"""
bounds = __internal__.image_get_geobounds(filename)
if bounds[0] == np.nan:
msg = "File is not a geo-referenced image file: %s" % filename
logging.info(msg)
return None
epsg = __internal__.get_epsg(filename)
if epsg is None:
msg = "EPSG is not found in image file: '%s'" % filename
logging.info(msg)
return None
ring = ogr.Geometry(ogr.wkbLinearRing)
ring.AddPoint(bounds[2], bounds[1]) # Upper left
ring.AddPoint(bounds[3], bounds[1]) # Upper right
ring.AddPoint(bounds[3], bounds[0]) # lower right
ring.AddPoint(bounds[2], bounds[0]) # lower left
ring.AddPoint(bounds[2], bounds[1]) # Closing the polygon
poly = ogr.Geometry(ogr.wkbPolygon)
poly.AddGeometry(ring)
ref_sys = osr.SpatialReference()
if ref_sys.ImportFromEPSG(int(epsg)) == ogr.OGRERR_NONE:
poly.AssignSpatialReference(ref_sys)
else:
msg = "Failed to import EPSG %s for image file %s" % (str(epsg), filename)
logging.error(msg)
raise RuntimeError(msg)
if int(osgeo.__version__[0]) >= 3:
# GDAL 3 changes axis order: https://github.com/OSGeo/gdal/issues/1546
# pylint: disable=no-member
ref_sys.SetAxisMappingStrategy(osgeo.osr.OAMS_TRADITIONAL_GIS_ORDER)
# Convert the polygon to lat-lon
dest_spatial = osr.SpatialReference()
if dest_spatial.ImportFromEPSG(int(LAT_LON_EPSG_CODE)) != ogr.OGRERR_NONE:
msg = "Failed to import EPSG %s for conversion to lat-lon" % str(LAT_LON_EPSG_CODE)
logging.error(msg)
raise RuntimeError(msg)
if int(osgeo.__version__[0]) >= 3:
# GDAL 3 changes axis order: https://github.com/OSGeo/gdal/issues/1546
# pylint: disable=no-member
dest_spatial.SetAxisMappingStrategy(osgeo.osr.OAMS_TRADITIONAL_GIS_ORDER)
transform = osr.CoordinateTransformation(ref_sys, dest_spatial)
new_src = poly.Clone()
if new_src:
new_src.Transform(transform)
else:
msg = "Failed to transform file polygon to lat-lon %s" % filename
logging.error(msg)
raise RuntimeError(msg)
return new_src.Centroid()
@staticmethod
def get_time_stamps(iso_timestamp: str, args: argparse.Namespace) -> list:
"""Returns the date and the local time (offset is stripped) derived from the passed in timestamp
Args:
iso_timestamp: the timestamp string
args: the command line parameters
Return:
A list consisting of the date (YYYY-MM-DD) and a local timestamp (YYYY-MM-DDTHH:MM:SS)
"""
if 'timestamp' in args and args.timestamp:
timestamp = datetime.datetime.fromisoformat(args.timestamp)
elif iso_timestamp:
timestamp = datetime.datetime.fromisoformat(iso_timestamp)
else:
return ['', '']
return [timestamp.strftime('%Y-%m-%d'), timestamp.strftime('%Y-%m-%dT%H:%M:%S')]
@staticmethod
def get_open_backoff(prev: float = None) -> float:
"""Returns the number of seconds to back off from opening a file
Args:
prev(int or float): the previous return value from this function
Return:
Returns the number of seconds (including fractional seconds) to wait
Note that the return value is deterministic, and always the same, when None is
passed in
"""
# pylint: disable=global-statement
global RANDOM_GENERATOR
# Simple case
if prev is None:
return 1
# Get a random number generator
if RANDOM_GENERATOR is None:
try:
RANDOM_GENERATOR = random.SystemRandom()
finally:
# Set this so we don't try again
RANDOM_GENERATOR = 0
# Get a random number
if RANDOM_GENERATOR:
multiplier = RANDOM_GENERATOR.random() # pylint: disable=no-member
else:
multiplier = random.random()
# Calculate how long to sleep
sleep = math.trunc(float(prev) * multiplier * 100) / 10.0
if sleep > MAX_FILE_OPEN_SLEEP_SEC:
sleep = max(0.1, math.trunc(multiplier * 100) / 10)
return sleep
@staticmethod
def write_csv_file(filename: str, header: str, data: str) -> bool:
"""Attempts to write out the data to the specified file. Will write the
header information if it's the first call to write to the file.
If the file is not available, it waits as configured until it becomes
available, or returns an error.
Args:
filename: path to the file to write to
header: Optional CSV formatted header to write to the file; can be set to None
data: CSV formatted data to write to the file
Return:
Returns True if the file was written to and False otherwise
"""
if not filename or not data:
logging.error("Empty parameter passed to write_geo_csv")
return False
csv_file = None
backoff_secs = None
for tries in range(0, MAX_CSV_FILE_OPEN_TRIES):
try:
# pylint: disable=consider-using-with
csv_file = open(filename, 'a+', encoding='utf-8')
except Exception as ex:
# Ignore an exception here since we handle it below
logging.debug("Exception caught while trying to open CSV file: %s", filename)
logging.debug("Exception: %s", str(ex))
if csv_file:
break
# If we can't open the file, back off and try again (unless it's our last try)
if tries < MAX_CSV_FILE_OPEN_TRIES - 1:
backoff_secs = __internal__.get_open_backoff(backoff_secs)
logging.info("Sleeping for %s seconds before trying to open CSV file again", str(backoff_secs))
time.sleep(backoff_secs)
if not csv_file:
logging.error("Unable to open CSV file for writing: '%s'", filename)
return False
wrote_file = False
try:
# Check if we need to write a header
if os.fstat(csv_file.fileno()).st_size <= 0:
csv_file.write(header + "\n")
# Write out data
csv_file.write(data + "\n")
wrote_file = True
except Exception as ex:
logging.error("Exception while writing CSV file: '%s'", filename)
logging.error("Exception: %s", str(ex))
# Re-raise the exception
raise ex from None
finally:
csv_file.close()
# Return whether or not we wrote to the file
return wrote_file
@staticmethod
def get_csv_fields(variable_names: list) -> list:
"""Returns the list of CSV field names as a list
Arguments:
variable_names: a list of trait variable names to add to the returned list
"""
return CSV_TRAIT_NAMES + list(variable_names)
@staticmethod
def get_geo_fields() -> list:
"""Returns the supported field names as a list
"""
return GEO_TRAIT_NAMES
@staticmethod
def get_bety_fields(variable_names: list) -> list:
"""Returns the supported field names as a list
Arguments:
variable_names: a list of trait variable names to add to the returned list
"""
return BETYDB_TRAIT_NAMES + list(variable_names)
@staticmethod
def get_default_trait(trait_name: str) -> Union[list, str]:
"""Returns the default value for the trait name
Args:
trait_name(str): the name of the trait to return the default value for
Return:
If the default value for a trait is configured, that value is returned. Otherwise
an empty string is returned.
"""
if trait_name in TRAIT_NAME_ARRAY_VALUE:
return [] # Return an empty list when the name matches
if trait_name in TRAIT_NAME_MAP:
return TRAIT_NAME_MAP[trait_name]
return ""
@staticmethod
def get_csv_header_fields() -> list:
"""Returns the list of header fields incorporating variable names, units, and labels
Return:
A list of strings that can be used as the header to a CSV file
"""
header_fields = []
variable_names = __internal__.get_algorithm_variable_list('VARIABLE_NAMES')
variable_units = __internal__.get_algorithm_variable_list('VARIABLE_UNITS')
variable_units_len = len(variable_units)
variable_labels = __internal__.get_algorithm_variable_labels()
variable_labels_len = len(variable_labels)
if variable_units_len != len(variable_names):
logging.warning("The number of variable units doesn't match the number of variable names")
logging.warning("Continuing with defined variable units")
if variable_labels_len and variable_labels_len != len(variable_names):
logging.warning("The number of variable labels doesn't match the number of variable names")
logging.warning("Continuing with defined variable labels")
logging.debug("Variable names: %s", str(variable_names))
logging.debug("Variable labels: %s", str(variable_labels))
logging.debug("Variable units: %s", str(variable_units))
for idx, field_name in enumerate(variable_names):
field_header = field_name
if idx < variable_labels_len:
field_header += ' %s' % variable_labels[idx]
if idx < variable_units_len:
field_header += ' (%s)' % variable_units[idx]
header_fields.append(field_header)
logging.debug("Header fields: %s", str(CSV_TRAIT_NAMES + header_fields))
return CSV_TRAIT_NAMES + header_fields
@staticmethod
def get_csv_traits_table(variable_names: list) -> tuple:
"""Returns the field names and default trait values
Arguments:
variable_names: a list of additional trait variable names
Returns:
A tuple containing the list of field names and a dictionary of default field values
"""
# Compiled traits table
fields = __internal__.get_csv_fields(variable_names)
traits = {}
for field_name in fields:
traits[field_name] = __internal__.get_default_trait(field_name)
if hasattr(algorithm_rgb, 'CITATION_AUTHOR') and getattr(algorithm_rgb, 'CITATION_AUTHOR'):
traits['citation_author'] = '"' + getattr(algorithm_rgb, 'CITATION_AUTHOR') + '"'
if hasattr(algorithm_rgb, 'CITATION_TITLE') and getattr(algorithm_rgb, 'CITATION_TITLE'):
traits['citation_title'] = '"' + getattr(algorithm_rgb, 'CITATION_TITLE') + '"'
if hasattr(algorithm_rgb, 'CITATION_YEAR') and getattr(algorithm_rgb, 'CITATION_YEAR'):
traits['citation_year'] = '"' + getattr(algorithm_rgb, 'CITATION_YEAR') + '"'
return fields, traits
@staticmethod
def get_geo_traits_table() -> tuple:
"""Returns the field names and default trait values
Returns:
A tuple containing the list of field names and a dictionary of default field values
"""
fields = __internal__.get_geo_fields()
traits = {}
for field_name in fields:
traits[field_name] = ""
return fields, traits
@staticmethod
def get_bety_traits_table(variable_names: list) -> tuple:
"""Returns the field names and default trait values
Arguments:
variable_names: a list of additional trait variable names
Returns:
A tuple containing the list of field names and a dictionary of default field values
"""
# Compiled traits table
fields = __internal__.get_bety_fields(variable_names)
traits = {}
for field_name in fields:
traits[field_name] = __internal__.get_default_trait(field_name)
if hasattr(algorithm_rgb, 'CITATION_AUTHOR') and getattr(algorithm_rgb, 'CITATION_AUTHOR'):
traits['citation_author'] = '"' + getattr(algorithm_rgb, 'CITATION_AUTHOR') + '"'
if hasattr(algorithm_rgb, 'CITATION_TITLE') and getattr(algorithm_rgb, 'CITATION_TITLE'):
traits['citation_title'] = '"' + getattr(algorithm_rgb, 'CITATION_TITLE') + '"'
if hasattr(algorithm_rgb, 'CITATION_YEAR') and getattr(algorithm_rgb, 'CITATION_YEAR'):
traits['citation_year'] = '"' + getattr(algorithm_rgb, 'CITATION_YEAR') + '"'
if hasattr(algorithm_rgb, 'ALGORITHM_METHOD') and getattr(algorithm_rgb, 'ALGORITHM_METHOD'):
traits['method'] = '"' + getattr(algorithm_rgb, 'ALGORITHM_METHOD') + '"'
return fields, traits
@staticmethod
def generate_traits_list(fields: list, traits: dict) -> list:
"""Returns an array of trait values
Args:
fields: the list of fields to look up and return
traits: contains the set of trait values to return
Return:
Returns an array of trait values taken from the traits parameter
Notes:
If a trait isn't found, it's assigned an empty string
"""
# compose the summary traits
trait_list = []
for field_name in fields:
if field_name in traits:
trait_list.append(traits[field_name])
else:
trait_list.append(__internal__.get_default_trait(field_name))
return trait_list
@staticmethod
def filter_file_list_by_ext(source_files: list, known_exts: list) -> list:
"""Returns the list of known files by extension
Arguments:
source_files: the list of source files to look through
known_exts: the list of known extensions
Return:
Returns the list of files identified as image files
"""
return_list = []
# Skip files we don't know about
for one_file in source_files:
ext = os.path.splitext(one_file)[1]
if ext in known_exts:
return_list.append(one_file)
return return_list
@staticmethod
def determine_csv_path(path_list: list) -> Optional[str]:
"""Iterates over the list of paths and returns the first valid one
Arguments:
path_list: the list of paths to iterate over
Return:
The first found path that exists, or None if no paths are found
"""
if not path_list:
return None
for one_path in path_list:
logging.debug("Checking csv path: %s", str(one_path))
if not one_path:
continue
logging.debug("Checking csv path exists: %s", str(one_path))
if os.path.exists(one_path) and os.path.isdir(one_path):
logging.debug("Returning CSV path: %s", str(one_path))
return one_path
logging.debug("Unable to find a CSV path")
return None
@staticmethod
def get_csv_file_names(csv_path: str) -> list:
"""Returns the list of csv file paths
Arguments:
csv_path: the base path for the csv files
Return:
Returns the list of file paths: default CSV file, Geostreams CSV file, BETYdb CSV file
"""
return [os.path.join(csv_path, FILE_NAME_CSV),
os.path.join(csv_path, FILE_NAME_GEO_CSV),
os.path.join(csv_path, FILE_NAME_BETYDB_CSV)]
@staticmethod
def validate_calc_value(calc_value, variable_names: list) -> list:
"""Returns a list of the validated value(s) as compared against type and length of variable names
Arguments:
calc_value: the calculated value(s) to validate (int, float, str, dict, list, etc.)
variable_names: the list of the names of expected variables
Return:
Returns the validated values as a list
Exceptions:
RuntimeError is raised if the calc_value is not a supported type or the number of values doesn't match
the expected number (as determined by variable_names)
"""
if isinstance(calc_value, set):
raise RuntimeError("A 'set' type of data was returned and isn't supported. Please use a list or a tuple instead")
# Special case handling for special return dict (values and other stuff)
if isinstance(calc_value, dict) and 'values' in calc_value:
values_result = calc_value['values']
else:
values_result = calc_value
# Get the values into list form
values = []
len_variable_names = len(variable_names)
if isinstance(values_result, dict):
# Assume the dictionary is going to have field names with their values
# We check whether we have the correct number of fields later. This also
# filters out extra fields
values = []
for key in variable_names:
if key in values_result:
values.append(values_result[key])
elif not isinstance(values_result, (list, tuple)):
values = [values_result]
else:
values = values_result
# Sanity check our values
len_calc_value = len(values)
if not len_calc_value == len_variable_names:
raise RuntimeError("Incorrect number of values returned. Expected " + str(len_variable_names) +
" and received " + str(len_calc_value))
return values
@staticmethod
def write_trait_csv(filename: str, header: str, fields: list, traits: dict) -> None:
"""Writes the trait data to the specified CSV file
Arguments:
filename: the name of the file to write to
header: the file header to be written as needed
fields: the list of field names to save to the file
traits: the trait values to write
"""
trait_list = __internal__.generate_traits_list(fields, traits)
csv_data = ','.join(map(str, trait_list))
__internal__.write_csv_file(filename, header, csv_data)
@staticmethod
def get_plot_species(plot_name: str, full_md: list) -> str:
"""Attempts to find the plot name and return its associated species
Arguments:
plot_name: the name of the plot to find the species of
full_md: the full list of metadata
Returns:
Returns the found species or "" (empty string) if the plot was not found
Notes:
Returns the first match found. If not found, the return value will be one of the following (in
priority order): the case-insensitive plot name match, the command line species argument, ""
"""
possible = None
optional = None
# Disable pylint nested block depth check to avoid 2*N looping (save lower case possibility vs. 2 loops
# with one check in each)
# pylint: disable=too-many-nested-blocks
for one_md in full_md:
if 'species' in one_md:
optional = one_md['species']
if 'plots' in one_md:
for one_plot in one_md['plots']:
# Try to find the plot name in 'plots' in a case sensitive way, followed by case insensitive
if 'name' in one_plot:
if str(one_plot['name']) == plot_name:
if 'species' in one_plot:
return one_plot['species']
elif str(one_plot['name']).lower() == plot_name.lower():
if 'species' in one_plot:
possible = one_plot['species']
# Check if we found a possibility, but not an exact match
if possible is not None:
return possible
return optional if optional is not None else ''
class RgbPlotBase(algorithm.Algorithm):
"""Used as base for simplified RGB transformers"""
def add_parameters(self, parser: argparse.ArgumentParser) -> None:
"""Adds parameters
Arguments:
parser: instance of argparse
"""
supported_files = [FILE_NAME_CSV + ': basic CSV file with calculated values']
if __internal__.get_algorithm_definition_bool('WRITE_GEOSTREAMS_CSV', False):
supported_files.append(FILE_NAME_BETYDB_CSV + ': TERRA REF Geostreams compatible CSV file')
if __internal__.get_algorithm_definition_bool('WRITE_BETYDB_CSV', False):
supported_files.append(FILE_NAME_BETYDB_CSV + ': BETYdb compatible CSV file')
parser.description = 'Plot level RGB algorithm: ' + __internal__.get_algorithm_name() + \
' version ' + __internal__.get_algorithm_definition_str('VERSION', 'x.y')
parser.add_argument('--csv_path', help='the path to use when generating the CSV files')
parser.add_argument('--timestamp', help='the timestamp to use in ISO 8601 format (eg:YYYY-MM-DDTHH:MM:SS')
parser.add_argument('--geostreams_csv', action='store_true',
help='override to always create the TERRA REF Geostreams-compatible CSV file')
parser.add_argument('--betydb_csv', action='store_true', help='override to always create the BETYdb-compatible CSV file')
parser.epilog = 'The following files are created in the specified csv path by default: ' + \
'\n ' + '\n '.join(supported_files) + '\n' + \
' author ' + __internal__.get_algorithm_definition_str('ALGORITHM_AUTHOR', 'mystery author') + \
' ' + __internal__.get_algorithm_definition_str('ALGORITHM_AUTHOR_EMAIL', '(no email)')
def check_continue(self, environment: Environment, check_md: CheckMD, transformer_md: dict, full_md: list) -> tuple:
"""Checks if conditions are right for continuing processing
Arguments:
environment: instance of environment class
check_md: request specific metadata
transformer_md: metadata associated with previous runs of the transformer
full_md: the full set of metadata available to the transformer
Return:
Returns a list containing the return code for continuing or not, and
an error message if there's an error
"""
# pylint: disable=unused-argument
# Look for at least one image file in the list provided
found_image = False
for one_file in check_md.get_list_files():
ext = os.path.splitext(one_file)[1]
if ext in KNOWN_IMAGE_FILE_EXTS:
found_image = True
break
if not found_image:
logging.debug("Image not found in list of files. Supported types are: %s", ", ".join(KNOWN_IMAGE_FILE_EXTS))
return (0,) if found_image else (-1000, "Unable to find an image in the list of files")
def perform_process(self, environment: Environment, check_md: CheckMD, transformer_md: dict, full_md: list) -> dict:
"""Performs the processing of the data
Arguments:
environment: instance of environment class
check_md: request specific metadata
transformer_md: metadata associated with previous runs of the transformer
full_md: the full set of metadata available to the transformer
Return:
Returns a dictionary with the results of processing
"""
# pylint: disable=unused-argument
# The following pylint disables are here because to satisfy them would make the code unreadable
# pylint: disable=too-many-statements, too-many-locals, too-many-branches
# Environment checking
if not hasattr(algorithm_rgb, 'calculate'):
msg = "The 'calculate()' function was not found in algorithm_rgb.py"
logging.error(msg)
return {'code': -1001, 'error': msg}
logging.debug("Working with check_md: %s", str(check_md))
# Setup local variables
variable_names = __internal__.get_algorithm_variable_list('VARIABLE_NAMES')
csv_file, geostreams_csv_file, betydb_csv_file = __internal__.get_csv_file_names(
__internal__.determine_csv_path([environment.args.csv_path, check_md.working_folder]))
logging.debug("Calculated default CSV path: %s", csv_file)
logging.debug("Calculated geostreams CSV path: %s", geostreams_csv_file)
logging.debug("Calculated BETYdb CSV path: %s", betydb_csv_file)
datestamp, localtime = __internal__.get_time_stamps(check_md.timestamp, environment.args)
write_geostreams_csv = environment.args.geostreams_csv or __internal__.get_algorithm_definition_bool('WRITE_GEOSTREAMS_CSV', False)
write_betydb_csv = environment.args.betydb_csv or __internal__.get_algorithm_definition_bool('WRITE_BETYDB_CSV', False)
logging.info("Writing geostreams csv file: %s", "True" if write_geostreams_csv else "False")
logging.info("Writing BETYdb csv file: %s", "True" if write_betydb_csv else "False")
# Get default values and adjust as needed
(csv_fields, csv_traits) = __internal__.get_csv_traits_table(variable_names)
(geo_fields, geo_traits) = __internal__.get_geo_traits_table()
(bety_fields, bety_traits) = __internal__.get_bety_traits_table(variable_names)
csv_header = ','.join(map(str, __internal__.get_csv_header_fields()))
geo_csv_header = ','.join(map(str, geo_fields))
bety_csv_header = ','.join(map(str, bety_fields))
# Process the image files
num_image_files = 0
entries_written = 0
additional_files_list = []
significant_digits_format = '.' + str(SIGNIFICANT_DIGITS) + 'g'
for one_file in __internal__.filter_file_list_by_ext(check_md.get_list_files(), KNOWN_IMAGE_FILE_EXTS):
plot_name = None
try:
num_image_files += 1
# Setup
plot_name = os.path.basename(os.path.dirname(one_file))
centroid = __internal__.get_centroid_latlon(one_file)
image_pix = np.rollaxis(np.array(gdal.Open(one_file).ReadAsArray()), 0, 3)
# Make the call and check the results
calc_value = algorithm_rgb.calculate(image_pix)
logging.debug("Calculated value is %s for file: %s", str(calc_value), one_file)
if calc_value is None:
continue
if isinstance(calc_value, dict) and 'file' in calc_value and calc_value['file']:
additional_files_list.extend(calc_value['file'])
values = __internal__.validate_calc_value(calc_value, variable_names)
logging.debug("Verified values are %s", str(values))
geo_traits['site'] = plot_name
geo_traits['dp_time'] = localtime
geo_traits['source'] = one_file
geo_traits['timestamp'] = datestamp
if centroid is not None:
geo_traits['lat'] = str(centroid.GetY())
geo_traits['lon'] = str(centroid.GetX())
else:
geo_traits['lat'] = ''
geo_traits['lon'] = ''
# Write the data points geographically and otherwise
for idx, trait_name in enumerate(variable_names):
# Get numbers truncated to significant digits
if isinstance(values[idx], numbers.Number):
value_str = format(values[idx], significant_digits_format)
else:
value_str = str(values[idx])
# Geostreams can only handle one field at a time so we write out one row per field/value pair
geo_traits['trait'] = trait_name
geo_traits['value'] = value_str
if write_geostreams_csv:
__internal__.write_trait_csv(geostreams_csv_file, geo_csv_header, geo_fields, geo_traits)
# csv and BETYdb can handle wide rows with multiple values so we just set the field
# values here and write the single row after the loop
csv_traits[variable_names[idx]] = value_str
bety_traits[variable_names[idx]] = value_str
csv_traits['site'] = plot_name
csv_traits['timestamp'] = datestamp
csv_traits['species'] = __internal__.get_plot_species(plot_name, full_md)
if centroid is not None:
csv_traits['lat'] = str(centroid.GetY())
csv_traits['lon'] = str(centroid.GetX())
else:
csv_traits['lat'] = ''
csv_traits['lon'] = ''
__internal__.write_trait_csv(csv_file, csv_header, csv_fields, csv_traits)
bety_traits['site'] = plot_name
bety_traits['local_datetime'] = localtime
bety_traits['species'] = __internal__.get_plot_species(plot_name, full_md)
if write_betydb_csv:
__internal__.write_trait_csv(betydb_csv_file, bety_csv_header, bety_fields, bety_traits)
entries_written += 1
except Exception as ex:
logging.error("Error generating %s for %s", __internal__.get_algorithm_name(), str(plot_name))
logging.error("Exception: %s", str(ex))
continue
if num_image_files == 0:
logging.warning("No images were detected for processing")
if entries_written == 0:
logging.warning("No entries were written to CSV files")
# Prepare the return information
algorithm_name, algorithm_md = __internal__.prepare_algorithm_metadata()
algorithm_md['files_processed'] = str(num_image_files)
algorithm_md['lines_written'] = str(entries_written)
if write_geostreams_csv:
algorithm_md['wrote_geostreams'] = "Yes"
if write_betydb_csv:
algorithm_md['wrote_betydb'] = "Yes"
file_md = []
if entries_written:
file_md.append({
'path': csv_file,
'key': 'csv'
})
if write_geostreams_csv:
file_md.append({
'path': geostreams_csv_file,
'key': 'csv'
})
if write_betydb_csv:
file_md.append({
'path': betydb_csv_file,
'key': 'csv'
})
for one_file in additional_files_list:
file_path = str(one_file)
if not os.path.exists(file_path):
logging.warning("Additional return file not found to return: '%s'", file_path)
continue
logging.info("Adding additional file to results: '%s'", file_path)
file_md.append({
'path': file_path,
'key': os.path.splitext(file_path)[1].lstrip('.')
})
return {'code': 0,
'file': file_md,
algorithm_name: algorithm_md
}
if __name__ == "__main__":
CONFIGURATION = ConfigurationRgbBase()
entrypoint.entrypoint(CONFIGURATION, RgbPlotBase())