Skip to content

Commit

Permalink
Merge branch 'dataframe-mc1'
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Jan 27, 2025
2 parents ef99732 + 0e24d51 commit 4d73469
Showing 62 changed files with 4,146 additions and 423 deletions.
2 changes: 1 addition & 1 deletion clients/python/sliderule/sliderule.py
Original file line number Diff line number Diff line change
@@ -592,7 +592,7 @@ def procoutputfile(parm, rsps):
return geopandas.pd.read_parquet(path)
elif output["format"] == "geoparquet":
# Return Parquet File as DataFrame
return geopandas.pd.read_parquet(path)
return geopandas.read_parquet(path)
elif output["format"] == "feather":
# Return Feather File as DataFrame
return geopandas.pd.read_feather(path)
2 changes: 1 addition & 1 deletion datasets/bathy/endpoints/atl24g.lua
Original file line number Diff line number Diff line change
@@ -236,7 +236,7 @@ local failed_processing_run = false
for beam,dataframe in pairs(dataframes) do
local failed_dataframe = false
if dataframe:finished(ctimeout(), rspq) then
if not dataframes[beam]:isvalid() then
if dataframes[beam]:inerror() then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> on %s failed to create valid bathy dataframe for spot %d", rspq, resource, dataframe:meta("spot")))
failed_dataframe = true
elseif dataframes[beam]:length() > 0 then
296 changes: 296 additions & 0 deletions datasets/bathy/endpoints/atl24p.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
--
-- ENDPOINT: /source/atl24s
--
local json = require("json")
local dataframe = require("dataframe")
local earthdata = require("earthdata")
local runner = require("container_runtime")
local rqst = json.decode(arg[1])
local parms = bathy.parms(rqst["parms"], rqst["key_space"], "icesat2", rqst["resource"])
local userlog = msg.publish(rspq) -- create user log publisher (alerts)
local outputs = {} -- table of all outputs that go into oceaneyes
local starttime = time.gps() -- used for timeout handling
local rdelta = 5 * 24 * 60 * 60 * 1000 -- 5 days * (24 hours/day * 60 minutes/hour * 60 seconds/minute * 1000 milliseconds/second)
local rdate = string.format("%04d-%02d-%02dT00:00:00Z", parms["year"], parms["month"], parms["day"])
local rgps = time.gmt2gps(rdate)
local t0 = string.format('%04d-%02d-%02dT%02d:%02d:%02dZ', time.gps2date(rgps - rdelta)) -- start time for NDWI and ATL09
local t1 = string.format('%04d-%02d-%02dT%02d:%02d:%02dZ', time.gps2date(rgps + rdelta)) -- stop time for NDWI and ATL09
local tempfile = "atl24.bin"

-------------------------------------------------------
-- Function - acquire lock (only used when not proxied)
-------------------------------------------------------
local function acquireLock(timeout, _starttime)
local transaction_id = core.INVALID_TX_ID
while transaction_id == core.INVALID_TX_ID do
transaction_id = core.orchselflock("sliderule", timeout, 3)
if transaction_id == core.INVALID_TX_ID then
local lock_retry_wait_time = (time.gps() - _starttime) / 1000.0
if lock_retry_wait_time >= timeout then
userlog:alert(core.ERROR, core.RTE_TIMEOUT, string.format("request <%s> failed to acquire lock... exiting", rspq))
return -- nothing yet to clean up
else
userlog:alert(core.INFO, core.RTE_TIMEOUT, string.format("request <%s> failed to acquire lock... retrying with %f seconds left", rspq, timeout - lock_retry_wait_time))
sys.wait(30) -- seconds
end
end
end
return transaction_id
end

-------------------------------------------------------
-- Function - get Kd resource filename
-------------------------------------------------------
local function getKd(_parms)
local _,doy = time.gps2gmt(rgps)
local doy_8d_start = ((doy - 1) & ~7) + 1
local doy_8d_stop = doy_8d_start + 7
local gps_start = time.gmt2gps(string.format("%04d:%03d:00:00:00", _parms["year"], doy_8d_start))
local gps_stop = time.gmt2gps(string.format("%04d:%03d:00:00:00", _parms["year"], doy_8d_stop))
local year_start, month_start, day_start = time.gps2date(gps_start)
local year_stop, month_stop, day_stop = time.gps2date(gps_stop)
if year_start ~= year_stop then
year_stop = year_start
month_stop = 12
day_stop = 31
end
local viirs_filename = string.format("JPSS1_VIIRS.%04d%02d%02d_%04d%02d%02d.L3m.8D.KD.Kd_490.4km.nc.dap.nc4",
year_start, month_start, day_start,
year_stop, month_stop, day_stop
)
return viirs_filename, time.gps()
end

-------------------------------------------------------
-- Function - generate parameters for NDWI from HLS
-------------------------------------------------------
local function getNdwiParms(_parms, resource)
local geo_parms = nil
-- build hls polygon
local hls_poly = _parms["poly"]
if not hls_poly then
local original_name_filter = _parms["name_filter"]
_parms["name_filter"] = "*" .. resource
local rc, rsps = earthdata.cmr(_parms, nil, true)
if rc == earthdata.SUCCESS then
hls_poly = rsps[resource] and rsps[resource]["poly"]
end
_parms["name_filter"] = original_name_filter
end
-- build hls raster object
local hls_parms = {
asset = "landsat-hls",
t0 = t0,
t1 = t1,
use_poi_time = true,
bands = {"NDWI"},
poly = hls_poly
}
local rc1, rsps1 = earthdata.stac(hls_parms)
if rc1 == earthdata.SUCCESS then
hls_parms["catalog"] = json.encode(rsps1)
geo_parms = geo.parms(hls_parms)
end
return geo_parms, time.gps()
end

-------------------------------------------------------
-- Function - get ATL09 resources
-------------------------------------------------------
local function getAtl09(resource)
local resource09 = nil
local atl09_parms = {
asset = "icesat2-atl09",
t0 = t0,
t1 = t1,
name_filter = '*_' .. string.format("%04d", parms["rgt"]) .. '????_*'
}
local atl09_max_retries = 3
local atl09_attempt = 1
while true do
local rc2, rsps2 = earthdata.search(atl09_parms)
if rc2 == earthdata.SUCCESS then
if #rsps2 == 1 then
resource09 = rsps2[1]
break -- success
else
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> returned an invalid number of resources for ATL09 CMR request for %s: %d", rspq, resource, #rsps2))
break -- failure
end
else
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed attempt %d to make ATL09 CMR request <%d>: %s", rspq, atl09_attempt, rc2, rsps2))
atl09_attempt = atl09_attempt + 1
if atl09_attempt > atl09_max_retries then
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed to make ATL09 CMR request for %s... aborting!", rspq, resource))
break -- failure
end
end
end
return resource09, time.gps()
end

-------------------------------------------------------
--Function - cleanup
-------------------------------------------------------
local function cleanup(_crenv, _transaction_id, failure, reason)
if failure then
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> failed: %s", rspq, reason))
end
runner.cleanup(_crenv)
if _transaction_id then
core.orchunlock({_transaction_id})
end
end

-------------------------------------------------------
-- Function - ctimeout
-------------------------------------------------------
local function ctimeout(timeout, _starttime)
local current_timeout = (timeout * 1000) - (time.gps() - _starttime)
if current_timeout < 0 then current_timeout = 0 end
return math.tointeger(current_timeout)
end

-------------------------------------------------------
-- Proxy/Execute/Send
-------------------------------------------------------

-- query earthdata for resources to process
local earthdata_status = earthdata.query(parms, rspq, userlog)
if earthdata_status ~= earthdata.SUCCESS then
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> earthdata queury failed : %d", rspq, earthdata_status))
return
end

-- proxy request
if not parms["resource"] then
local df = dataframe.proxy("atl24p", parms, rspq, userlog)
dataframe.send(df, parms, rspq, userlog)
return
end

-- get initial resources
local transaction_id = nil
local resource = parms["resource"]
local viirs_filename = getKd(parms)
local resource09 = getAtl09(resource)

-- acquire lock when not proxied
if parms["key_space"] == core.INVALID_KEY then
transaction_id = acquireLock(parms["node_timeout"], starttime)
end

-- create container runtime environment
local crenv = runner.setup()

-- create dataframe inputs
local bathymask = bathy.mask()
local atl03h5 = h5.object(parms["asset"], resource)
local atl09h5 = h5.object(parms["asset09"], resource09)
local kd490 = bathy.kd(parms, viirs_filename)
local refraction = bathy.refraction(parms)
local uncertainty = bathy.uncertainty(parms, kd490)
local seasurface = parms["find_sea_surface"] and bathy.seasurface(parms) or nil
local qtrees = parms:classifier(bathy.QTREES) and bathy.qtrees(parms) or nil
local coastnet = parms:classifier(bathy.COASTNET) and bathy.coastnet(parms) or nil
local openoceanspp = parms:classifier(bathy.OPENOCEANSPP) and bathy.openoceanspp(parms) or nil
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> creating dataframes...", rspq))

-- build dataframes for each beam
local dataframes = {}
for _, beam in ipairs(parms["beams"]) do
dataframes[beam] = bathy.dataframe(beam, parms, bathymask, atl03h5, atl09h5, rspq)
if not dataframes[beam] then
userlog:alert(core.CRITICAL, core.RTE_ERROR, string.format("request <%s> on %s failed to create bathy dataframe for beam %s", rspq, resource, beam))
else
dataframes[beam]:run(seasurface)
dataframes[beam]:run(qtrees)
dataframes[beam]:run(coastnet)
dataframes[beam]:run(openoceanspp)
dataframes[beam]:run(refraction)
dataframes[beam]:run(uncertainty)
dataframes[beam]:run(core.TERMINATE)
end
end

-- wait for dataframes to complete and write to file
for beam,beam_df in pairs(dataframes) do
local df_finished = beam_df:finished(ctimeout(parms["node_timeout"], starttime), rspq)
if not df_finished then
userlog:alert(core.ERROR, core.RTE_TIMEOUT, string.format("request <%s> on %s timed out waiting for dataframe to complete on spot %d", rspq, resource, beam_df:meta("spot")))
elseif dataframes[beam]:inerror() then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> on %s failed to create valid bathy dataframe for spot %d", rspq, resource, beam_df:meta("spot")))
elseif dataframes[beam]:length() == 0 then
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> on %s created an empty bathy dataframe for spot %d", rspq, resource, beam_df:meta("spot")))
else
local spot = beam_df:meta("spot")
local arrow_df = arrow.dataframe(parms, beam_df)
local output_filename = string.format("%s/bathy_spot_%d.parquet", crenv.host_sandbox_directory, spot)
arrow_df:export(output_filename, arrow.PARQUET)
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> on %s created dataframe for spot %s", rspq, resource, spot))
outputs[beam] = string.format("%s/bathy_spot_%d.parquet", crenv.container_sandbox_mount, spot)
end
-- cleanup to save memory
beam_df:destroy()
end

-------------------------------------------------------
-- clean up objects to cut down on memory usage
-------------------------------------------------------
atl03h5:destroy()
if atl09h5 then atl09h5:destroy() end
kd490:destroy()

-------------------------------------------------------
-- set atl24 output filename
-------------------------------------------------------
local atl24_filename = parms["output"]["path"]
local pos_last_delim = string.reverse(atl24_filename):find("/") or -(#atl24_filename + 2)
outputs["atl24_filename"] = string.sub(atl24_filename, #atl24_filename - pos_last_delim + 2)
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> generating file %s", rspq, outputs["atl24_filename"]))

-------------------------------------------------------
-- set additional outputs
-------------------------------------------------------
outputs["profile"] = {}
outputs["format"] = parms["output"]["format"]
outputs["filename"] = crenv.container_sandbox_mount.."/"..tempfile
outputs["ensemble"] = parms["ensemble"] or {ensemble_model_filename=string.format("%s/%s", cre.HOST_DIRECTORY, bathy.ENSEMBLE_MODEL)}

-------------------------------------------------------
-- run oceaneyes
-------------------------------------------------------
local container_parms = {
container_image = "oceaneyes",
container_name = "oceaneyes",
container_command = string.format("/bin/bash /runner.sh %s/settings.json", crenv.container_sandbox_mount),
timeout = ctimeout(parms["node_timeout"], starttime)
}
local container = runner.execute(crenv, container_parms, { ["settings.json"] = outputs }, rspq)
runner.wait(container, ctimeout(parms["node_timeout"], starttime))

-------------------------------------------------------
-- check for container failure
-------------------------------------------------------
local f, _ = io.open(crenv.host_sandbox_directory.."/"..tempfile..".empty", "r")
if f then
f:close()
cleanup(crenv, transaction_id, true, string.format("container indicated empty output: %s", resource))
return
end

-------------------------------------------------------
-- send dataframe back to user
-------------------------------------------------------
local df = arrow.dataframe(crenv.host_sandbox_directory.."/"..tempfile, arrow.PARQUET)
df:send(df, parms, rspq, userlog)

-------------------------------------------------------
-- exit
-------------------------------------------------------
cleanup(crenv, transaction_id)


-- write arrow dataframe function that reads parquet (or csv, or geoparquet, etc) file and creates a dataframe from it
-- write core rxdataframe function that receives dataframes being sent
-- support all the nodes sending back parquet files that don't get assembled by the proxy node (need to call it something)
67 changes: 10 additions & 57 deletions datasets/bathy/endpoints/atl24s.lua
Original file line number Diff line number Diff line change
@@ -1,70 +1,23 @@
--
-- ENDPOINT: /source/atl24s
--
local endpoint = "atl24s"
local json = require("json")
local earthdata = require("earth_data_query")
local dataframe = require("dataframe")
local earthdata = require("earthdata")
local rqst = json.decode(arg[1])
local parms = bathy.parms(rqst["parms"], 0, "icesat2", rqst["resource"])
local resource = parms["resource"]
local resources = parms["resources"]
local parms = bathy.parms(rqst["parms"], rqst["key_space"], "icesat2", rqst["resource"])
local userlog = msg.publish(rspq) -- create user log publisher (alerts)

------------------------------------
--- Proxy Request
------------------------------------
if resources == nil then
if earthdata.query(parms, rspq, userlog) == earthdata.SUCCESS then

-- Populate Catalogs via STAC and TNM Requests --
local geo_parms = parms[geo.PARMS]
if geo_parms then
for dataset,raster_parms in pairs(geo_parms) do
if not raster_parms["catalog"] then
userlog:alert(core.INFO, core.RTE_INFO, string.format("proxy request <%s> querying resources for %s", rspq, dataset))
local rc, rsps = earthdata.search(raster_parms, parms["poly"])
if rc == earthdata.SUCCESS then
parms:setcatalog(dataset,json.encode(rsps))
userlog:alert(core.INFO, core.RTE_INFO, string.format("proxy request <%s> returned %d resources for %s", rspq, rsps and #rsps["features"] or 0, dataset))
elseif rc ~= earthdata.UNSUPPORTED then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> failed to get catalog for %s <%d>: %s", rspq, dataset, rc, rsps))
end
end
end
end
local df = nil

-- Populate Resources via CMR Request --
if not resources then
local rc, rsps = earthdata.cmr(parms:export())
if rc == earthdata.SUCCESS then
resources = rsps
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> retrieved %d resources from CMR", rspq, #resources))
else
userlog:alert(core.CRITICAL, core.RTE_SIMPLIFY, string.format("request <%s> failed to make CMR request <%d>: %s", rspq, rc, rsps))
return
end
if parms["resource"] then
else
df = dataframe.proxy("atl24s", parms, rspq, userlog)
end

-- Proxy Request --
local locks_per_node = (parms["poly"] and not parms["ignore_poly_for_cmr"]) and 1 or core.MAX_LOCKS_PER_NODE
local endpoint_proxy = core.proxy(endpoint, resources, json.encode(rqst["parms"]), parms["node_timeout"], locks_per_node, rspq, false, parms["cluster_size_hint"])

-- Wait Until Proxy Completes --
local timeout = parms["rqst_timeout"]
local duration = 0
local interval = 10 < timeout and 10 or timeout -- seconds
while (userlog:numsubs() > 0) and not endpoint_proxy:waiton(interval * 1000) do
duration = duration + interval
if timeout >= 0 and duration >= timeout then
userlog:alert(core.ERROR, core.RTE_TIMEOUT, string.format("request <%s> timed-out after %d seconds waiting for endpoint proxy", rspq, duration))
do return end
end
if df then
dataframe.send(df, parms, rspq, userlog)
end

------------------------------------
--- Granule Request
------------------------------------
else

print(resource)

end
Loading

0 comments on commit 4d73469

Please sign in to comment.