Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean #9

Open
wants to merge 16 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions R/copernicus-cds.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,16 @@ copernicus_cds_parallel <- function(
)
{
ncores <- parallel::detectCores() - 1

cl <- parallel::makeCluster(ncores)
on.exit(parallel::stopCluster(cl))

msg <- sprintf(
"Importing %d variable(s) (%s) from Copernicus CDS parallel (ncores = %d)",
length(variables),
kwb.utils::stringList(variables),
ncores
)

kwb.utils::catAndRun(
messageText = msg,
messageText = sprintf(
"Importing %d variable(s) (%s) from Copernicus CDS parallel (ncores = %d)",
length(variables),
kwb.utils::stringList(variables),
ncores
),
expr = parallel::parLapply(cl, variables, function(variable) {
try(copernicus_cds(
variable,
Expand Down
21 changes: 11 additions & 10 deletions R/create_periods_in_year.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,26 @@
create_periods_in_year <- function(year, n_periods = 4L)
{
stopifnot(length(n_periods) == 1L, n_periods > 0L)
stopifnot(is.numeric(year), length(year) == 1L)

as_date <- function(x) as.Date(sprintf("%04d-%s", as.integer(year), x))
year <- as.integer(year)

as_date <- function(x) as.Date(sprintf("%04d-%s", year, x))
n_dates <- n_periods + 1L
current_date <- Sys.Date()

end_date <- if(as.integer(year) != as.integer(format(Sys.Date(), format = "%Y"))) {
"12-31"
} else {
format(current_date, format = "%m-%d")
}
today <- Sys.Date()

dates <- seq.Date(as_date("01-01"), as_date(end_date), length.out = n_dates)
dates <- seq.Date(
from = as_date("01-01"),
to = ifelse(is_this_year(year), today, as_date("12-31")),
length.out = n_dates
)

starts <- dates[-n_dates]
ends <- kwb.utils::startsToEnds(starts, lastStop = dates[n_dates])

data.frame(
start = as.character(starts),
end = as.character(kwb.utils::startsToEnds(starts, lastStop = dates[n_dates]))
end = as.character(ends)
)

}
26 changes: 14 additions & 12 deletions R/create_rgee_environment.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
#' @export
#' @importFrom reticulate condaenv_exists
#' @importFrom kwb.python conda_py_install
create_ad4gd_env <- function(force = FALSE, debug = FALSE) {
create_ad4gd_env <- function(force = FALSE, debug = FALSE)
{
if (!reticulate::condaenv_exists("ad4gd") || force) {

if(!reticulate::condaenv_exists("ad4gd") | force) {
kwb.python::conda_py_install(env_name = "ad4gd",
pkgs = list(conda = c("python=3.12.2",
"numpy"),
py = "earthengine-api==0.1.370"))
} else {
if(debug) {
message(paste0("Conda environment 'ad4gd' already exists. Use ",
"'force' = TRUE, to reinstall if required"))
}
kwb.python::conda_py_install(env_name = "ad4gd", pkgs = list(
conda = c("python=3.12.2", "numpy"),
py = "earthengine-api==0.1.370"
))

} else if (debug) {

message(
"Conda environment 'ad4gd' already exists. ",
"Use 'force' = TRUE to reinstall, if required."
)
}
}

30 changes: 14 additions & 16 deletions R/flatten_results.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,23 @@
#' @importFrom dplyr bind_rows
#' @importFrom tidyr unnest
#' @importFrom sf st_polygon st_sfc st_set_crs
flatten_results <- function(sat_data_list,
cols_unnest = "satellite_data_metadata") {

if(sum(c("satellite_data", "satellite_metadata") %in% cols_unnest) == 2) {
names_sep <- "."
} else {
names_sep <- NULL
}
flatten_results <- function(
sat_data_list,
cols_unnest = "satellite_data_metadata"
)
{
cols_satellite <- c("satellite_data", "satellite_metadata")

sat_data_df_nested <- sat_data_list %>%
dplyr::bind_rows()


lapply(seq_len(nrow(sat_data_df_nested)),
function(i) {
sat_data_df_nested[i, ] %>%
tidyr::unnest(tidyselect::all_of(cols_unnest),
names_sep = names_sep)
}) %>%
seq_len(nrow(sat_data_df_nested)) %>%
lapply(function(i) {
tidyr::unnest(
sat_data_df_nested[i, ],
tidyselect::all_of(cols_unnest),
names_sep = if (all(cols_satellite %in% cols_unnest)) "." # else NULL
)
}) %>%
dplyr::bind_rows()

}
Original file line number Diff line number Diff line change
@@ -1,38 +1,44 @@
#' Google Earth Engine: get data for years in parallel
#'
#' @param years years vector of years for which satellite data should be downloaded
#' @param years years vector of years for which satellite data should be
#' downloaded
#' @param lakes lakes sf data frame witch shapes of lakes
#' @param image_collection image collection (default: "COPERNICUS/S2_SR_HARMONIZED")
#' @param image_collection image collection (default:
#' "COPERNICUS/S2_SR_HARMONIZED")
#' @param bands bands (defualt: NULL), for selection provide in the following
#' format: as.list(c("QA60", paste0("B", 1:6)))
#' @param point_on_surface use sf::st_point_on_surface or polygon? (default: FALSE)
#' format: as.list(c("QA60", paste0("B", 1:6)))
#' @param point_on_surface use sf::st_point_on_surface or polygon? (default:
#' FALSE)
#' @param spatial_fun spatial aggregation function (default: "mean")
#' @param scale scale parameter (default: 10), for details, see
#' \url{https://developers.google.com/earth-engine/guides/scale}
#' @param via via (default: "getInfo"), other options use google cloud (google drive
#' or google cloud storage)
#' @param col_lakename col_lakename ("GEWNAME", used by Berlin authority for surface
#' water bodies) use "SEE_NAME" for Brandenburg lakes
#' @param set_lakenames_as_list_indices should lake names of "col_lakename" be used
#' for naming result list? (default: TRUE)
#' \url{https://developers.google.com/earth-engine/guides/scale}
#' @param via via (default: "getInfo"), other options use google cloud (google
#' drive or google cloud storage)
#' @param col_lakename col_lakename ("GEWNAME", used by Berlin authority for
#' surface water bodies) use "SEE_NAME" for Brandenburg lakes
#' @param set_lakenames_as_list_indices should lake names of "col_lakename" be
#' used for naming result list? (default: TRUE)
#' @param debug show debug messages (default: TRUE)
#' @param debug_dir directory where to save (default: tempdir())
#' @param ee_print show debug messages for "ee" (default: FALSE)
#' @param export_rds save sat data into rds object for each lake (default: TRUE)
#' @param export_dir directory where to save data for each lake (default: tempdir())
#' @param export_dir directory where to save data for each lake (default:
#' tempdir())
#' @param ncores number of cores for parallel processinfg (default:
#' parallel::detectCores() - 1)
#' @param n_year_splits number of year splits per request. Required in case request
#' uses too much images > 400-500 per year (default: NULL, determined automatically within
#' function. In case it should be overwritten by the user provide a meaningful integer number)
#' @param return_list should results be provided as R list? (default: FALSE). If FALSE,
#' the rds_path to the exported data is provided in case export_rds is set to TRUE
#' @return list with data and metadata, each of them tibbles (if return_list = TRUE),
#' If FALSE, the rds_path to the exported data is provided in case export_rds is
#' set to TRUE. In case an error occurs NULL is returned
#' parallel::detectCores() - 1)
#' @param n_year_splits number of year splits per request. Required in case
#' request uses too much images > 400-500 per year (default: NULL, determined
#' automatically within function. In case it should be overwritten by the user
#' provide a meaningful integer number)
#' @param return_list should results be provided as R list? (default: FALSE). If
#' FALSE, the rds_path to the exported data is provided in case export_rds is
#' set to TRUE
#' @return list with data and metadata, each of them tibbles (if return_list =
#' TRUE), If FALSE, the rds_path to the exported data is provided in case
#' export_rds is set to TRUE. In case an error occurs NULL is returned
#' @export
#' @importFrom parallel detectCores makeCluster stopCluster parLapply clusterEvalQ
#' clusterExport
#' @importFrom parallel detectCores makeCluster stopCluster parLapply
#' clusterEvalQ clusterExport
#' @importFrom reticulate use_condaenv
#' @importFrom rgee ee_Initialize
#' @importFrom fs path_join
Expand All @@ -58,16 +64,16 @@ gee_get_data_for_years_parallel <- function(
export_dir = tempdir(),
ncores = parallel::detectCores() - 1,
n_year_splits = NULL,
return_list = FALSE) {


return_list = FALSE
)
{
geos <- tolower(sf::st_geometry_type(lakes))

shape_type <- if(all(geos == "point")) {
shape_type <- if (all(geos == "point")) {
"point"
} else if (all(geos == "polygon") & point_on_surface == FALSE) {
} else if (all(geos == "polygon") && isFALSE(point_on_surface)) {
"polygon"
} else if (all(geos == "polygon") & point_on_surface == TRUE) {
} else if (all(geos == "polygon") && isTRUE(point_on_surface)) {
"point-on-surface"
} else {
"unclear"
Expand All @@ -78,26 +84,30 @@ gee_get_data_for_years_parallel <- function(

stopifnot(spatial_fun %in% names(rgee::ee$Reducer))

stopifnot(ncores > 1)
stopifnot(ncores > 1L)
stopifnot(ncores <= parallel::detectCores())

if (ncores > nrow(lakes)) {
ncores <- nrow(lakes)
n_lakes <- nrow(lakes)

if (ncores > n_lakes) {
ncores <- n_lakes
}

# Prepare parallel processing
cl <- parallel::makeCluster(ncores,
outfile = fs::path_join(c(debug_dir,
"debug_parallel.txt")))
cl <- parallel::makeCluster(
ncores,
outfile = fs::path_join(c(debug_dir, "debug_parallel.txt"))
)

on.exit(parallel::stopCluster(cl))

my_fun <- function(idx) {
if(debug) {
if (debug) {
lakename <- lakes[[col_lakename]][idx]
tfile <- fs::path_join(c(debug_dir,
sprintf("debug_parallel_%03d_%s.txt",
idx,
lakename)))
tfile <- fs::path_join(c(
debug_dir,
sprintf("debug_parallel_%03d_%s.txt", idx, lakename))
)
sink(tfile, append = FALSE)
}

Expand All @@ -113,43 +123,44 @@ gee_get_data_for_years_parallel <- function(
col_lakename = col_lakename,
debug = debug,
ee_print = ee_print,
n_year_splits = n_year_splits)
)
n_year_splits = n_year_splits
))

if(any(class(res) == "try-error")) {
not_failed <- FALSE
} else {
not_failed <- TRUE
}
success <- !inherits(res, "try-error")


if(debug) sink()
if (debug) {
sink()
}

return_obj <- NULL

if(export_rds && not_failed) {
rds_name <- sprintf("%s_%s_%s_scale-%dm_%4d-%4d.rds",
lakes[[col_lakename]][idx],
shape_type,
spatial_fun,
scale,
min(years),
max(years))
if (export_rds && success) {

rds_name <- sprintf(
"%s_%s_%s_scale-%dm_%4d-%4d.rds",
lakes[[col_lakename]][idx],
shape_type,
spatial_fun,
scale,
min(years),
max(years)
)

rds_path <- fs::path_join(c(export_dir, rds_name))

kwb.utils::catAndRun(sprintf("Exporting dataset to '%s'", rds_path),
expr = { saveRDS(res, file = rds_path) }
kwb.utils::catAndRun(
sprintf("Exporting dataset to '%s'", rds_path),
expr = saveRDS(res, file = rds_path)
)

return_obj <- rds_path
}

if(return_list | !(export_rds && not_failed)) {
return_obj <- res
}
if (return_list || !(export_rds && success)) {
return_obj <- res
}

return(return_obj)
return_obj
}

# Initialize necessary packages and environments on each cluster
Expand All @@ -164,31 +175,35 @@ gee_get_data_for_years_parallel <- function(

# Prepare parallel processing
doParallel::registerDoParallel(cl)

library(foreach)

# Run the parallel processing
sat_data <- kwb.utils::catAndRun(
kwb.utils::catAndRun(
sprintf(
"Downloading satellite data for %d lakes in parallel on %d cores",
nrow(lakes),
n_lakes,
ncores
),
dbg = debug,
expr = {
sat_data <- foreach::foreach(idx = seq_len(nrow(lakes)),
.combine = "c") %dopar% {
my_fun(idx)
}

# Stop parallel processing
doParallel::stopImplicitCluster()
sat_data <- foreach::foreach(
idx = seq_len(n_lakes),
.combine = "c"
) %dopar% {
my_fun(idx)
}

if(set_lakenames_as_list_indices) {
sat_data <- setNames(sat_data, lakes[[col_lakename]])
}
# Stop parallel processing
doParallel::stopImplicitCluster()

sat_data
},
dbg = debug
if (set_lakenames_as_list_indices) {
names(sat_data) <- lakes[[col_lakename]]
}

sat_data
}
)

}
Loading
Loading