-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSDS_earthcache_api.py
139 lines (116 loc) · 4.96 KB
/
SDS_earthcache_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
CREDIT ACKNOWLEDGEMENT: some of the code used in this file has been borrowed from
the following public github repository: https://github.com/chris010970/earthcache
"""
"""
This is the main class that someone would want to use for things like:
- directly creating a pipeline
- running a search
- creating a pipeline from a ran search
- downloading the images from a pipeline
- checking the status of a pipeline
- viewing the first image from the downloaded images
- calculating the price of a pipeline
"""
import os
import pandas as pd
import sys
from osgeo import gdal
import matplotlib.pyplot as plt
import numpy as np
import SDS_earthcache_client
# always call this function first before using anything else
# does an initial setup
def initialize(api_key):
repo = 'CoastSat'
root_path = os.getcwd()[ 0 : os.getcwd().find( repo ) + len ( repo )]
cfg_path = os.path.join( root_path, 'earthcache-cfg' )
global client
client = SDS_earthcache_client.EcClient(cfg_path, api_key)
# use this function to directly create a pipeline
# the 'image_type_id' parameter refers to what type of
# images you want for the output
# these are the available id's: https://support.skywatch.com/hc/en-us/articles/7297565063067-Available-Outputs
def retrieve_images_earthcache(api_key, name, aoi, start_date, end_date, image_type_id, **kwargs):
pipeline_name = name
# we can change the resolution here
resolution = [ 'low' ]
global client
status, result = client.createPipeline(
name=pipeline_name,
start_date=start_date,
end_date=end_date,
aoi=aoi,
output={
"id": image_type_id,
"format": "geotiff",
"mosaic": "stitched"
},
**kwargs
)
print(status, result)
pipeline_id = client.getPipelineIdFromName(pipeline_name)
status, result = client.getPipeline( pipeline_id )
print(status, result)
# use this function to check the status of a given pipeline
# 200 is a good result!
def checkStatus(pipeline_name):
global client
pipeline_id = client.getPipelineIdFromName(pipeline_name)
status, result = client.getIntervalResults( pipeline_id )
print(status, result)
# call this function to get the images once the pipeline is ready
# make sure to pass in the name of the pipeline
# returns a list of the images
def download_images(pipeline_name):
global client
id = client.getPipelineIdFromName(pipeline_name)
status, result = client.getIntervalResults(id)
if(status == 404):
print("results not found, most likely an invalid id!")
return
root_path = pipeline_name + '/images'
images = []
# convert to dataframe
df = pd.DataFrame( result[ 'data' ] )
for row in df.itertuples():
out_path = os.path.join( root_path, row.id )
images.append( client.getImages( row.results, out_path ) )
return images
# a function to actually view the images (needs testing!)
# be sure to call download_images first ^
# and then pass in the returned result of that to this function
# not entirely sure how this function works
def view_first_image(images):
ds = gdal.Open( images[ 0 ][ 0 ] )
data = ds.ReadAsArray()
# this is just showing the first image in the list
np.amin( data[ 0, : , : ]), np.amax( data[ 0, : , : ])
plt.imshow( data[ 0, :, :] )
plt.show()
# posts a search request with the given parameters
# full list of parameters: https://api-docs.earthcache.com/#tag/post
# returns status, result, search_df, search_id
# best usage:
# status, result, search_df, search_id = SDS_earthcache_api.search(__, __, __, ___)
def search(aoi, window, resolution, **kwargs):
# allows you to create a pipeline directly from a previously run search
# need to call the function above first though (aka need to search first)
# so that you can save the search_id and search_results
# and pass them in as parameters to this function
# returns the status and result
# best usage:
# status, result = SDS_earthcache_api.create_pipeline_from_search(___, ___)
# https://api-docs.earthcache.com/#tag/pipelines/operation/PipelineCreate
def create_pipeline_from_search(search_id, search_results):
global client
status, result = client.createPipelineFromSearch(search_id, search_results)
return status, result
# still needs to be tested!
# Calculate cost of area and intervals of a pipeline,
# and the probability of collection of any tasking intervals
# https://api-docs.earthcache.com/#tag/pipelinePost
def calculatePrice(self, resolution, location, start_date, end_date):
global client
status, result = client.calculatePrice(resolution, location, start_date, end_date)
return status, result