Skip to content

Commit

Permalink
Merge pull request #4 from mi-erasmusmc/arrow_support
Browse files Browse the repository at this point in the history
Arrow support
  • Loading branch information
solis9753 authored Dec 21, 2023
2 parents 446c0f8 + cfe635b commit 66dfa1c
Show file tree
Hide file tree
Showing 72 changed files with 18,649 additions and 355 deletions.
10 changes: 6 additions & 4 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
Package: PredictiveValueFPs
Title: A reproducible package to perform studies for the predictive value of frequent patterns
Version: 0.0.0.9000
Version: 0.1.0
Authors@R:
person("Solon", "Ioannou", , "s.ioannou@ersmusmc.nl", role = c("aut", "cre"),
person("Solon", "Ioannou", , "s.ioannou@erasmusmc.nl", role = c("aut", "cre"),
comment = c(ORCID = "YOUR-ORCID-ID"))
Description: What the package does (one paragraph).
License: MIT + file LICENSE
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.1
RoxygenNote: 7.2.2
Imports:
dplyr,
magrittr,
PatientLevelPrediction,
arules,
arulesSequences,
AssociationRuleMining,
stringr
stringr,
ParallelLogger,
mRMRe
Suggests:
testthat (>= 3.0.0)
Config/testthat/edition: 3
7 changes: 7 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,33 @@
export("%>%")
export(appendFrequentPatterns)
export(appendFrequentPatternsSettings)
export(applyFeatureSelection)
export(createResultsDatabase)
export(execute)
export(executeRunPlp)
export(extractAtemporalData)
export(extractEPs)
export(extractFPs)
export(extractTemporalData)
export(filterCovariateData)
export(filterPlpData)
export(filterPlpDataEmergentPatterns)
export(generateFPObjects)
export(loadBakedData)
export(mineEmergentPatterns)
export(mineEmergentPatternsSettings)
export(mineFrequentPatterns)
export(mineFrequentPatternsSettings)
export(mineTotalEmergentPatterns)
export(mineTotalFrequentPatterns)
export(performFeatureSelection)
export(predictBaseline)
export(predictFPs)
export(prepareData)
export(prepareFeatureEngineering)
export(prepareFrequentPatternsObjectSettings)
export(prepareFrequentPatternsObjects)
export(prepareRunPlp)
export(recalibrateProbabilities)
export(saveBakedData)
importFrom(magrittr,"%>%")
1 change: 1 addition & 0 deletions PredictiveValueFPs.Rproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ LaTeX: pdfLaTeX
BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source
PackageCheckArgs: --no-multiarch --with-keep.source
PackageRoxygenize: rd,collate,namespace
130 changes: 130 additions & 0 deletions R/CreateCohorts.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2023 Observational Health Data Sciences and Informatics
#
# This file is part of PredictiveValueFPs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

checkForInputFileEncoding <- function(fileName) {
encoding <- readr::guess_encoding(file = fileName, n_max = min(1e+07))

if (!encoding$encoding[1] %in% c("UTF-8", "ASCII")) {
stop("Illegal encoding found in file ",
basename(fileName),
". Should be 'ASCII' or 'UTF-8', found:",
paste(paste0(encoding$encoding, " (", encoding$confidence, ")"), collapse = ", "))
}
invisible(TRUE)
}

.createCohorts <- function(connection,
cdmDatabaseSchema,
vocabularyDatabaseSchema = cdmDatabaseSchema,
cohortDatabaseSchema,
cohortTable,
oracleTempSchema = NULL,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"),
outputFolder) {
if (!is.null(oracleTempSchema) && oracleTempSchema != "") {
warning("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.")
tempEmulationSchema <- oracleTempSchema
}

# Create study cohort table structure:
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "CreateCohortTable.sql",
packageName = "PredictiveValueFPs",
dbms = attr(connection, "dbms"),
tempEmulationSchema = tempEmulationSchema,
cohort_database_schema = cohortDatabaseSchema,
cohort_table = cohortTable)
DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE)


# Insert rule names in cohort_inclusion table:
pathToCsv <- system.file("cohorts", "InclusionRules.csv", package = "PredictiveValueFPs")
checkForInputFileEncoding(pathToCsv)
inclusionRules <- readr::read_csv(pathToCsv, col_types = readr::cols())
inclusionRules <- data.frame(cohort_definition_id = inclusionRules$cohortId,
rule_sequence = inclusionRules$ruleSequence,
name = inclusionRules$ruleName)
DatabaseConnector::insertTable(connection = connection,
tableName = "#cohort_inclusion",
data = inclusionRules,
dropTableIfExists = FALSE,
createTable = FALSE,
tempTable = TRUE,
tempEmulationSchema = tempEmulationSchema)


# Instantiate cohorts:
pathToCsv <- system.file("settings", "CohortsToCreate.csv", package = "PredictiveValueFPs")
checkForInputFileEncoding(pathToCsv)
cohortsToCreate <- readr::read_csv(pathToCsv, col_types = readr::cols())
for (i in 1:nrow(cohortsToCreate)) {
writeLines(paste("Creating cohort:", cohortsToCreate$name[i]))
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = paste0(cohortsToCreate$name[i], ".sql"),
packageName = "PredictiveValueFPs",
dbms = attr(connection, "dbms"),
tempEmulationSchema = tempEmulationSchema,
cdm_database_schema = cdmDatabaseSchema,
vocabulary_database_schema = vocabularyDatabaseSchema,

results_database_schema.cohort_inclusion = "#cohort_inclusion",
results_database_schema.cohort_inclusion_result = "#cohort_inc_result",
results_database_schema.cohort_inclusion_stats = "#cohort_inc_stats",
results_database_schema.cohort_summary_stats = "#cohort_summary_stats",

target_database_schema = cohortDatabaseSchema,
target_cohort_table = cohortTable,
target_cohort_id = cohortsToCreate$cohortId[i])
DatabaseConnector::executeSql(connection, sql)
}

# Fetch cohort counts:
sql <- "SELECT cohort_definition_id, COUNT(*) AS count FROM @cohort_database_schema.@cohort_table GROUP BY cohort_definition_id"
sql <- SqlRender::render(sql,
cohort_database_schema = cohortDatabaseSchema,
cohort_table = cohortTable)
sql <- SqlRender::translate(sql, targetDialect = attr(connection, "dbms"))
counts <- DatabaseConnector::querySql(connection, sql)
names(counts) <- SqlRender::snakeCaseToCamelCase(names(counts))
counts <- merge(counts, data.frame(cohortDefinitionId = cohortsToCreate$cohortId,
cohortName = cohortsToCreate$name))
readr::write_excel_csv(x = counts, file = file.path(outputFolder, "CohortCounts.csv"), na = "")


# Fetch inclusion rule stats and drop tables:
fetchStats <- function(tableName) {
sql <- "SELECT * FROM #@table_name"
sql <- SqlRender::render(sql, table_name = tableName)
sql <- SqlRender::translate(sql = sql,
targetDialect = attr(connection, "dbms"),
tempEmulationSchema = tempEmulationSchema)
stats <- DatabaseConnector::querySql(connection, sql)
names(stats) <- SqlRender::snakeCaseToCamelCase(names(stats))
fileName <- file.path(outputFolder, paste0(SqlRender::snakeCaseToCamelCase(tableName), ".csv"))
readr::write_csv(x = stats, path = fileName)

sql <- "TRUNCATE TABLE #@table_name; DROP TABLE #@table_name;"
sql <- SqlRender::render(sql, table_name = tableName)
sql <- SqlRender::translate(sql = sql,
targetDialect = attr(connection, "dbms"),
tempEmulationSchema = tempEmulationSchema)
DatabaseConnector::executeSql(connection, sql)
}
fetchStats("cohort_inclusion")
fetchStats("cohort_inc_result")
fetchStats("cohort_inc_stats")
fetchStats("cohort_summary_stats")

}

155 changes: 155 additions & 0 deletions R/FeatureSelection.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
##' @export
#applyFeatureSelection <- function(bakedPlpData, numberOfFeatures = 100, analysisSettings, outputFolder){
#
# outcomeId = analysisSettings$outcomeId
# analysisId = analysisSettings$analysisId
# analysisName = analysisSettings$analysisName
# atemporalPlpData = analysisSettings$atemporalPlpData
# fileName = stringr::str_remove(analysisName, "predicting_")
#
# plpData_directory <- file.path(outputFolder, analysisId, "data", "inputs", "plpData")
# minSup = attributes(bakedPlpData$plpData$Train$covariateData)$minimumSupport
# namePatternLength = attributes(bakedPlpData$plpData$Train$covariateData)$patternLength
# nameMinSup = gsub(pattern = "\\.", replacement = "_", x = minSup)
#
#
# ParallelLogger::logInfo("Starting Feature Selection...")
# t1 <- Sys.time()
# df <- bakedPlpData$plpData$Train$covariateData$covariates %>%
# dplyr::collect() %>%
# tidyr::pivot_wider(id_cols = rowId, names_from = covariateId, values_from = covariateValue, values_fill = 0) %>%
# dplyr::inner_join(., bakedPlpData$population %>% select(rowId, outcomeCount), by = "rowId") %>%
# dplyr::select(- rowId) %>%
# dplyr::select(outcomeCount, everything())%>%
# dplyr::mutate(outcomeCount = base::ordered(base::factor(outcomeCount), levels = c("0", "1")))
#
# dd <- mRMRe::mRMR.data(data = df[,-1],
# strata = df[,1] %>% pull())
#
# classic <- mRMRe::mRMR.classic(data = dd,
# target_indices = c(1), # Position of label variables
# feature_count = as.numeric(numberOfFeatures) # how many features to select
# )
#
# selectedFeatures <- mRMRe::solutions(classic)
# featureNames <- classic@feature_names[unlist(selectedFeatures)]
# tt <- t1 - Sys.time()
# ParallelLogger::logInfo("Done Feature Selection.")
# ParallelLogger::logInfo(paste("Total run time for Feature Selection:", tt[[1]], attr(tt, "units")))
#
# ParallelLogger::logInfo("Keeping selected Features in Andromeda...")
# newBakedPlpData <- bakedPlpData
# newBakedPlpData$plpData$Train$covariateData$covariates <- newBakedPlpData$plpData$Train$covariateData$covariates %>%
# filter(covariateId %in% featureNames)
# newBakedPlpData$plpData$Train$covariateData$covariateRef <- newBakedPlpData$plpData$Train$covariateData$covariateRef %>%
# filter(covariateId %in% featureNames)
# attr(newBakedPlpData$plpData$Train$covariateData, "featuresSelected") <- paste0("FeatureSelected_", numberOfFeatures)
# ParallelLogger::logInfo("Done.")
#
# saveBakedData(object = newBakedPlpData, file = file.path(plpData_directory, paste0(fileName, "_MS_", nameMinSup, "_PL_", namePatternLength, "_FeatureSelected_", numberOfFeatures, "_plpData")))
#
# invisible(TRUE)
#}
#
##' @export
#performFeatureSelection <- function(inputDirectory, outputDirectory, analysisSettings, numberOfFeatures){
# # inputDirectory = file.path("Arrow3", "predicting_gi", "data", "inputs", "plpData")
# # numberOfFeatures = 25
# directories <- list.dirs(inputDirectory, recursive = FALSE)
# nonFeatureSelection <- stringr::str_detect(string = basename(directories), pattern = "_FeatureSelected_", negate = TRUE)
# directories <- directories[nonFeatureSelection]
# newFileNames <- basename(directories) %>%
# stringr::str_remove(., pattern = "_plpData") %>%
# paste0(., "_FeatureSelected_", numberOfFeatures, "_plpData")
#
# for (i in seq_along(directories)) {
# analysisExists <- file.exists(file.path(inputDirectory, newFileNames[i]))
# if(!analysisExists){
# bakedPlpData <- loadBakedData(file.path(directories[i]))
# applyFeatureSelection(bakedPlpData = bakedPlpData,
# numberOfFeatures = numberOfFeatures,
# analysisSettings = analysisSettings,
# outputFolder = outputDirectory)
# } else {
# ParallelLogger::logInfo(paste0("Object ", newFileNames[i], " exists."))
# }
# }
# invisible(TRUE)
#}

#' @export
applyFeatureSelection <- function(bakedPlpData, numberOfFeatures = 20, analysisSettings, outputFolder){

outcomeId = analysisSettings$outcomeId
analysisId = analysisSettings$analysisId
analysisName = analysisSettings$analysisName
atemporalPlpData = analysisSettings$atemporalPlpData
fileName = stringr::str_remove(analysisName, "predicting_")

plpData_directory <- file.path(outputFolder, analysisId, "data", "inputs", "plpData")
minSup = attributes(bakedPlpData$plpData$Train$covariateData)$minimumSupport
namePatternLength = attributes(bakedPlpData$plpData$Train$covariateData)$patternLength
nameMinSup = gsub(pattern = "\\.", replacement = "_", x = minSup)


ParallelLogger::logInfo("Starting Feature Selection...")
t1 <- Sys.time()
sparseData <- bakedPlpData$plpData$Train %>%
PatientLevelPrediction::toSparseM(., .$labels)

denseMatrix <- as.matrix(sparseData$dataMatrix)
dataFrame <- as.data.frame(denseMatrix)
y <- sparseData$labels$outcomeCount

selection <- praznik::NJMIM(X = dataFrame,
Y = y,
k = numberOfFeatures)

featureNames <- sparseData$covariateMap %>%
dplyr::filter(columnId %in% selection$selection) %>%
dplyr::pull(covariateId)

tt <- t1 - Sys.time()
ParallelLogger::logInfo("Done Feature Selection.")
ParallelLogger::logInfo(paste("Total run time for Feature Selection:", tt[[1]], attr(tt, "units")))

ParallelLogger::logInfo("Keeping selected Features in Andromeda...")
newBakedPlpData <- bakedPlpData
newBakedPlpData$plpData$Train$covariateData$covariates <- newBakedPlpData$plpData$Train$covariateData$covariates %>%
filter(covariateId %in% featureNames)
newBakedPlpData$plpData$Train$covariateData$covariateRef <- newBakedPlpData$plpData$Train$covariateData$covariateRef %>%
filter(covariateId %in% featureNames)
attr(newBakedPlpData$plpData$Train$covariateData, "featuresSelected") <- paste0("FeatureSelected_", numberOfFeatures)
ParallelLogger::logInfo("Done.")

saveBakedData(object = newBakedPlpData, file = file.path(plpData_directory, paste0(fileName, "_MS_", nameMinSup, "_PL_", namePatternLength, "_FeatureSelected_", numberOfFeatures, "_plpData")))

invisible(TRUE)
}

#' @export
performFeatureSelection <- function(inputDirectory, outputDirectory, analysisSettings, numberOfFeatures){
# inputDirectory = file.path("Arrow3", "predicting_gi", "data", "inputs", "plpData")
# numberOfFeatures = 25
directories <- list.dirs(inputDirectory, recursive = FALSE)
nonFeatureSelection <- stringr::str_detect(string = basename(directories), pattern = "_FeatureSelected_", negate = TRUE)
directories <- directories[nonFeatureSelection]
newFileNames <- basename(directories) %>%
stringr::str_remove(., pattern = "_plpData") %>%
paste0(., "_FeatureSelected_", numberOfFeatures, "_plpData")

for (i in seq_along(directories)) {
analysisExists <- file.exists(file.path(inputDirectory, newFileNames[i]))
if(!analysisExists){
bakedPlpData <- loadBakedData(file.path(directories[i]))
applyFeatureSelection(bakedPlpData = bakedPlpData,
numberOfFeatures = numberOfFeatures,
analysisSettings = analysisSettings,
outputFolder = outputDirectory)
} else {
ParallelLogger::logInfo(paste0("Object ", newFileNames[i], " exists."))
}
}
invisible(TRUE)
}

Loading

0 comments on commit 66dfa1c

Please sign in to comment.